revert accidental change

pull/1369/head
Craig Tiller 10 years ago
parent 1a727fde47
commit 6f7030b9d7
  1. 360
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  2. 171
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  3. 357
      src/csharp/Grpc.Core/Server.cs
  4. 36
      src/csharp/Grpc.Examples.MathServer/MathServer.cs
  5. 7
      src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj

@ -1,11 +1,11 @@
#region Copyright notice and license #region Copyright notice and license
// Copyright 2015, Google Inc. // Copyright 2015, Google Inc.
// All rights reserved. // All rights reserved.
// //
// Redistribution and use in source and binary forms, with or without // Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are // modification, are permitted provided that the following conditions are
// met: // met:
// //
// * Redistributions of source code must retain the above copyright // * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer. // notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above // * Redistributions in binary form must reproduce the above
@ -15,7 +15,7 @@
// * Neither the name of Google Inc. nor the names of its // * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from // contributors may be used to endorse or promote products derived from
// this software without specific prior written permission. // this software without specific prior written permission.
// //
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
@ -33,204 +33,160 @@ using System;
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using Grpc.Core; using Grpc.Core;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal {
internal delegate void CompletionCallbackDelegate(GRPCOpError error, namespace Grpc.Core.Internal
IntPtr batchContextPtr); {
internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr);
/// <summary>
/// grpc_call from <grpc/grpc.h> /// <summary>
/// </summary> /// grpc_call from <grpc/grpc.h>
internal class CallSafeHandle : SafeHandleZeroIsInvalid { /// </summary>
const uint GRPC_WRITE_BUFFER_HINT = 1; internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")] static extern CallSafeHandle const uint GRPC_WRITE_BUFFER_HINT = 1;
grpcsharp_channel_create_call(ChannelSafeHandle channel,
CompletionQueueSafeHandle cq, string method, [DllImport("grpc_csharp_ext.dll")]
string host, Timespec deadline); static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError [DllImport("grpc_csharp_ext.dll")]
grpcsharp_call_cancel(CallSafeHandle call); static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError [DllImport("grpc_csharp_ext.dll")]
grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
string description);
[DllImport("grpc_csharp_ext.dll")]
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
grpcsharp_call_start_unary( [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
CallSafeHandle call, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback, [DllImport("grpc_csharp_ext.dll")]
byte[] send_buffer, UIntPtr send_buffer_len, static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
MetadataArraySafeHandle metadataArray); [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] static extern void
grpcsharp_call_blocking_unary( [DllImport("grpc_csharp_ext.dll")]
CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
callback, MetadataArraySafeHandle metadataArray);
byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
grpcsharp_call_start_client_streaming( byte[] send_buffer, UIntPtr send_buffer_len,
CallSafeHandle call, MetadataArraySafeHandle metadataArray);
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback, [DllImport("grpc_csharp_ext.dll")]
MetadataArraySafeHandle metadataArray); static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError MetadataArraySafeHandle metadataArray);
grpcsharp_call_start_server_streaming(
CallSafeHandle call, [DllImport("grpc_csharp_ext.dll")]
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
callback, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len, byte[] send_buffer, UIntPtr send_buffer_len);
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
grpcsharp_call_start_duplex_streaming( [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate [DllImport("grpc_csharp_ext.dll")]
callback, static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
grpcsharp_call_send_message( [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate [DllImport("grpc_csharp_ext.dll")]
callback, static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
byte[] send_buffer, UIntPtr send_buffer_len); [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError [DllImport("grpc_csharp_ext.dll")]
grpcsharp_call_send_close_from_client( static extern void grpcsharp_call_destroy(IntPtr call);
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate private CallSafeHandle()
callback); {
}
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_send_status_from_server( public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
CallSafeHandle call, {
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
callback, }
StatusCode statusCode, string statusMessage);
public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError {
grpcsharp_call_recv_message( AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
CallSafeHandle call, }
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback); public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray);
grpcsharp_call_start_serverside( }
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
callback); {
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray));
[DllImport("grpc_csharp_ext.dll")] static extern void }
grpcsharp_call_destroy(IntPtr call);
public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
private {
CallSafeHandle() {} AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
}
public
static CallSafeHandle Create(ChannelSafeHandle channel, public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
CompletionQueueSafeHandle cq, string method, {
string host, Timespec deadline) { AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray));
return grpcsharp_channel_create_call(channel, cq, method, host, deadline); }
}
public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
public {
void StartUnary(byte[] payload, CompletionCallbackDelegate callback, AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length)));
MetadataArraySafeHandle metadataArray) { }
AssertCallOk(grpcsharp_call_start_unary(
this, callback, payload, new UIntPtr((ulong)payload.Length), public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
metadataArray)); {
} AssertCallOk(grpcsharp_call_send_close_from_client(this, callback));
}
public
void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback)
CompletionCallbackDelegate callback, {
MetadataArraySafeHandle metadataArray) { AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail));
grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, }
new UIntPtr((ulong)payload.Length),
metadataArray); public void StartReceiveMessage(CompletionCallbackDelegate callback)
} {
AssertCallOk(grpcsharp_call_recv_message(this, callback));
public }
void StartClientStreaming(CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray) { public void StartServerSide(CompletionCallbackDelegate callback)
AssertCallOk( {
grpcsharp_call_start_client_streaming(this, callback, metadataArray)); AssertCallOk(grpcsharp_call_start_serverside(this, callback));
} }
public public void Cancel()
void StartServerStreaming(byte[] payload, {
CompletionCallbackDelegate callback, AssertCallOk(grpcsharp_call_cancel(this));
MetadataArraySafeHandle metadataArray) { }
AssertCallOk(grpcsharp_call_start_server_streaming(
this, callback, payload, new UIntPtr((ulong)payload.Length), public void CancelWithStatus(Status status)
metadataArray)); {
} AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail));
}
public
void StartDuplexStreaming(CompletionCallbackDelegate callback, protected override bool ReleaseHandle()
MetadataArraySafeHandle metadataArray) { {
AssertCallOk( grpcsharp_call_destroy(handle);
grpcsharp_call_start_duplex_streaming(this, callback, metadataArray)); return true;
} }
public private static void AssertCallOk(GRPCCallError callError)
void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) { {
AssertCallOk(grpcsharp_call_send_message( Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
this, callback, payload, new UIntPtr((ulong)payload.Length))); }
}
private static uint GetFlags(bool buffered)
public {
void StartSendCloseFromClient(CompletionCallbackDelegate callback) { return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
AssertCallOk(grpcsharp_call_send_close_from_client(this, callback)); }
}
public
void StartSendStatusFromServer(Status status,
CompletionCallbackDelegate callback) {
AssertCallOk(grpcsharp_call_send_status_from_server(
this, callback, status.StatusCode, status.Detail));
}
public
void StartReceiveMessage(CompletionCallbackDelegate callback) {
AssertCallOk(grpcsharp_call_recv_message(this, callback));
}
public
void StartServerSide(CompletionCallbackDelegate callback) {
AssertCallOk(grpcsharp_call_start_serverside(this, callback));
}
public
void Cancel() { AssertCallOk(grpcsharp_call_cancel(this)); }
public
void CancelWithStatus(Status status) {
AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode,
status.Detail));
}
protected
override bool ReleaseHandle() {
grpcsharp_call_destroy(handle);
return true;
}
private
static void AssertCallOk(GRPCCallError callError) {
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK,
"Status not GRPC_CALL_OK");
}
private
static uint GetFlags(bool buffered) {
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
} }
}
} }

@ -35,90 +35,91 @@ using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal {
// TODO: we need to make sure that the delegates are not collected before namespace Grpc.Core.Internal
// invoked. {
internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr); // TODO: we need to make sure that the delegates are not collected before invoked.
internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr);
/// <summary>
/// grpc_server from grpc/grpc.h /// <summary>
/// </summary> /// grpc_server from grpc/grpc.h
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid { /// </summary>
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid
grpcsharp_server_request_call( {
ServerSafeHandle server, CompletionQueueSafeHandle cq, [DllImport("grpc_csharp_ext.dll")]
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
callback);
[DllImport("grpc_csharp_ext.dll")]
[DllImport("grpc_csharp_ext.dll")] static extern ServerSafeHandle static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
[DllImport("grpc_csharp_ext.dll")]
[DllImport("grpc_csharp_ext.dll")] static extern int static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
[DllImport("grpc_csharp_ext.dll")]
[DllImport("grpc_csharp_ext.dll")] static extern int static extern int grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr,
ServerCredentialsSafeHandle creds); [DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_start(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")] static extern void
grpcsharp_server_start(ServerSafeHandle server); [DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_shutdown(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")] static extern void
grpcsharp_server_shutdown(ServerSafeHandle server); // TODO: get rid of the old callback style
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")]
// TODO: get rid of the old callback style static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback);
[DllImport(
"grpc_csharp_ext.dll", [DllImport("grpc_csharp_ext.dll")]
EntryPoint = "grpcsharp_server_shutdown_and_notify")] static extern void static extern void grpcsharp_server_destroy(IntPtr server);
grpcsharp_server_shutdown_and_notify_CALLBACK(
ServerSafeHandle server, private ServerSafeHandle()
[MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate {
callback); }
[DllImport("grpc_csharp_ext.dll")] static extern void public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, IntPtr args)
grpcsharp_server_destroy(IntPtr server); {
return grpcsharp_server_create(cq, args);
private }
ServerSafeHandle() {}
public int AddListeningPort(string addr)
public {
static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, return grpcsharp_server_add_http2_port(this, addr);
IntPtr args) { }
return grpcsharp_server_create(cq, args);
} public int AddListeningPort(string addr, ServerCredentialsSafeHandle credentials)
{
public return grpcsharp_server_add_secure_http2_port(this, addr, credentials);
int AddListeningPort(string addr) { }
return grpcsharp_server_add_http2_port(this, addr);
} public void Start()
{
public grpcsharp_server_start(this);
int AddListeningPort(string addr, ServerCredentialsSafeHandle credentials) { }
return grpcsharp_server_add_secure_http2_port(this, addr, credentials);
} public void Shutdown()
{
public grpcsharp_server_shutdown(this);
void Start() { grpcsharp_server_start(this); } }
public public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback)
void Shutdown() { grpcsharp_server_shutdown(this); } {
grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback);
public }
void ShutdownAndNotify(ServerShutdownCallbackDelegate callback) {
grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
} {
AssertCallOk(grpcsharp_server_request_call(this, cq, callback));
public }
GRPCCallError RequestCall(CompletionQueueSafeHandle cq,
CompletionCallbackDelegate callback) { protected override bool ReleaseHandle()
return grpcsharp_server_request_call(this, cq, callback); {
} grpcsharp_server_destroy(handle);
return true;
protected }
override bool ReleaseHandle() {
grpcsharp_server_destroy(handle); private static void AssertCallOk(GRPCCallError callError)
return true; {
Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
} }
}
} }

@ -38,187 +38,216 @@ using System.Diagnostics;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Threading.Tasks; using System.Threading.Tasks;
using Grpc.Core.Internal; using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core { namespace Grpc.Core
/// <summary> {
/// Server is implemented only to be able to do
/// in-process testing.
/// </summary>
public
class Server {
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly ServerShutdownCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler;
readonly BlockingCollection<NewRpcInfo> newRpcQueue =
new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle;
readonly Dictionary<string, IServerCallHandler> callHandlers =
new Dictionary<string, IServerCallHandler>();
readonly TaskCompletionSource<object> shutdownTcs =
new TaskCompletionSource<object>();
public
Server() {
this.handle =
ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
this.newServerRpcHandler = HandleNewServerRpc;
this.serverShutdownHandler = HandleServerShutdown;
}
// only call this before Start()
public
void AddServiceDefinition(ServerServiceDefinition serviceDefinition) {
foreach (var entry in serviceDefinition.CallHandlers) {
callHandlers.Add(entry.Key, entry.Value);
}
}
// only call before Start()
public
int AddListeningPort(string addr) { return handle.AddListeningPort(addr); }
// only call before Start()
public
int AddListeningPort(string addr, ServerCredentials credentials) {
using(var nativeCredentials = credentials.ToNativeCredentials()) {
return handle.AddListeningPort(addr, nativeCredentials);
}
}
public
void Start() {
handle.Start();
// TODO: this basically means the server is single threaded....
StartHandlingRpcs();
}
/// <summary> /// <summary>
/// Requests and handles single RPC call. /// A gRPC server.
/// </summary> /// </summary>
internal void RunRpc() { public class Server
AllowOneRpc(); {
// TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while
try { // native callbacks are in the completion queue.
var rpcInfo = newRpcQueue.Take(); readonly ServerShutdownCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler;
// Console.WriteLine("Server received RPC " + rpcInfo.Method);
readonly ServerSafeHandle handle;
IServerCallHandler callHandler; readonly object myLock = new object();
if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) {
callHandler = new NoSuchMethodCallHandler(); readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
bool startRequested;
bool shutdownRequested;
public Server()
{
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
this.newServerRpcHandler = HandleNewServerRpc;
this.serverShutdownHandler = HandleServerShutdown;
} }
callHandler.StartCall(rpcInfo.Method, rpcInfo.Call,
GetCompletionQueue());
} catch (Exception e) {
Console.WriteLine("Exception while handling RPC: " + e);
}
}
/// <summary> /// <summary>
/// Requests server shutdown and when there are no more calls being /// Adds a service definition to the server. This is how you register
/// serviced, /// handlers for a service with the server.
/// cleans up used resources. /// Only call this before Start().
/// </summary> /// </summary>
/// <returns>The async.</returns> public void AddServiceDefinition(ServerServiceDefinition serviceDefinition)
public {
async Task ShutdownAsync() { lock (myLock)
handle.ShutdownAndNotify(serverShutdownHandler); {
await shutdownTcs.Task; Preconditions.CheckState(!startRequested);
handle.Dispose(); foreach (var entry in serviceDefinition.CallHandlers)
} {
callHandlers.Add(entry.Key, entry.Value);
/// <summary> }
/// To allow awaiting termination of the server. }
/// </summary> }
public
Task ShutdownTask {
get { return shutdownTcs.Task; }
}
public /// <summary>
void Kill() { handle.Dispose(); } /// Add a non-secure port on which server should listen.
/// Only call this before Start().
/// </summary>
public int AddListeningPort(string addr)
{
lock (myLock)
{
Preconditions.CheckState(!startRequested);
return handle.AddListeningPort(addr);
}
}
private /// <summary>
async Task StartHandlingRpcs() { /// Add a secure port on which server should listen.
while (true) { /// Only call this before Start().
await Task.Factory.StartNew(RunRpc); /// </summary>
} public int AddListeningPort(string addr, ServerCredentials credentials)
} {
lock (myLock)
{
Preconditions.CheckState(!startRequested);
using (var nativeCredentials = credentials.ToNativeCredentials())
{
return handle.AddListeningPort(addr, nativeCredentials);
}
}
}
private /// <summary>
void AllowOneRpc() { /// Starts the server.
AssertCallOk( /// </summary>
handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); public void Start()
} {
lock (myLock)
{
Preconditions.CheckState(!startRequested);
startRequested = true;
handle.Start();
AllowOneRpc();
}
}
private /// <summary>
void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) { /// Requests server shutdown and when there are no more calls being serviced,
try { /// cleans up used resources. The returned task finishes when shutdown procedure
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); /// is complete.
/// </summary>
public async Task ShutdownAsync()
{
lock (myLock)
{
Preconditions.CheckState(startRequested);
Preconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
handle.ShutdownAndNotify(serverShutdownHandler);
await shutdownTcs.Task;
handle.Dispose();
}
if (error != GRPCOpError.GRPC_OP_OK) { /// <summary>
// TODO: handle error /// To allow awaiting termination of the server.
/// </summary>
public Task ShutdownTask
{
get
{
return shutdownTcs.Task;
}
} }
var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), public void Kill()
ctx.GetServerRpcNewMethod()); {
handle.Dispose();
}
// after server shutdown, the callback returns with null call /// <summary>
if (!rpcInfo.Call.IsInvalid) { /// Allows one new RPC call to be received by server.
newRpcQueue.Add(rpcInfo); /// </summary>
private void AllowOneRpc()
{
lock (myLock)
{
if (!shutdownRequested)
{
handle.RequestCall(GetCompletionQueue(), newServerRpcHandler);
}
}
} }
} catch (Exception e) {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private /// <summary>
void HandleServerShutdown(IntPtr eventPtr) { /// Selects corresponding handler for given call and handles the call.
try { /// </summary>
shutdownTcs.SetResult(null); private void InvokeCallHandler(CallSafeHandle call, string method)
} catch (Exception e) { {
Console.WriteLine("Caught exception in a native handler: " + e); try
} {
} IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(method, out callHandler))
{
callHandler = new NoSuchMethodCallHandler();
}
callHandler.StartCall(method, call, GetCompletionQueue());
}
catch (Exception e)
{
Console.WriteLine("Exception while handling RPC: " + e);
}
}
private /// <summary>
static void AssertCallOk(GRPCCallError callError) { /// Handles the native callback.
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, /// </summary>
"Status not GRPC_CALL_OK"); private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr)
} {
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
if (error != GRPCOpError.GRPC_OP_OK)
{
// TODO: handle error
}
CallSafeHandle call = ctx.GetServerRpcNewCall();
string method = ctx.GetServerRpcNewMethod();
// after server shutdown, the callback returns with null call
if (!call.IsInvalid)
{
Task.Run(() => InvokeCallHandler(call, method));
}
AllowOneRpc();
}
catch (Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private /// <summary>
static CompletionQueueSafeHandle GetCompletionQueue() { /// Handles native callback.
return GrpcEnvironment.ThreadPool.CompletionQueue; /// </summary>
} /// <param name="eventPtr"></param>
private void HandleServerShutdown(IntPtr eventPtr)
{
try
{
shutdownTcs.SetResult(null);
}
catch (Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private private static CompletionQueueSafeHandle GetCompletionQueue()
struct NewRpcInfo { {
private return GrpcEnvironment.ThreadPool.CompletionQueue;
CallSafeHandle call; }
private
string method;
public
NewRpcInfo(CallSafeHandle call, string method) {
this.call = call;
this.method = method;
}
public
CallSafeHandle Call {
get { return this.call; }
}
public
string Method {
get { return this.method; }
}
} }
}
} }

@ -34,26 +34,28 @@ using System.Runtime.InteropServices;
using System.Threading; using System.Threading;
using Grpc.Core; using Grpc.Core;
namespace math { namespace math
class MainClass { {
public class MainClass
static void Main(string[] args) { {
String host = "0.0.0.0"; public static void Main(string[] args)
{
string host = "0.0.0.0";
GrpcEnvironment.Initialize(); GrpcEnvironment.Initialize();
Server server = new Server(); Server server = new Server();
server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl())); server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host + ":23456"); int port = server.AddListeningPort(host + ":23456");
server.Start(); server.Start();
Console.WriteLine("MathServer listening on port " + port); Console.WriteLine("MathServer listening on port " + port);
Console.WriteLine("Press any key to stop the server..."); Console.WriteLine("Press any key to stop the server...");
Console.ReadKey(); Console.ReadKey();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown(); GrpcEnvironment.Shutdown();
} }
} }
} }

@ -16,8 +16,7 @@
<DebugType>full</DebugType> <DebugType>full</DebugType>
<Optimize>false</Optimize> <Optimize>false</Optimize>
<OutputPath>bin\Debug</OutputPath> <OutputPath>bin\Debug</OutputPath>
<DefineConstants>DEBUG; <DefineConstants>DEBUG;</DefineConstants>
</DefineConstants>
<ErrorReport>prompt</ErrorReport> <ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel> <WarningLevel>4</WarningLevel>
<Externalconsole>true</Externalconsole> <Externalconsole>true</Externalconsole>
@ -81,5 +80,7 @@
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None> </None>
</ItemGroup> </ItemGroup>
<ItemGroup /> <ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
</Project> </Project>
Loading…
Cancel
Save