Merge github.com:google/grpc into async-api-new

pull/381/head
Craig Tiller 10 years ago
commit c230a7451d
  1. 18
      src/csharp/GrpcCore/Call.cs
  2. 5
      src/csharp/GrpcCore/GrpcCore.csproj
  3. 31
      src/csharp/GrpcCore/IMarshaller.cs
  4. 8
      src/csharp/GrpcCore/Internal/AsyncCall.cs
  5. 9
      src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
  6. 38
      src/csharp/GrpcCore/Internal/ServerWritingObserver.cs
  7. 2
      src/csharp/GrpcCore/Internal/StreamingInputObserver.cs
  8. 64
      src/csharp/GrpcCore/Method.cs
  9. 89
      src/csharp/GrpcCore/Server.cs
  10. 93
      src/csharp/GrpcCore/ServerCallHandler.cs
  11. 25
      src/csharp/GrpcCore/ServerCalls.cs
  12. 39
      src/csharp/GrpcCoreTests/ClientServerTest.cs
  13. 2
      src/csharp/GrpcCoreTests/ServerTest.cs

@ -8,10 +8,8 @@ namespace Google.GRPC.Core
readonly string methodName;
readonly Func<TRequest, byte[]> requestSerializer;
readonly Func<byte[], TResponse> responseDeserializer;
readonly TimeSpan timeout;
readonly Channel channel;
// TODO: channel param should be removed in the future.
public Call(string methodName,
Func<TRequest, byte[]> requestSerializer,
Func<byte[], TResponse> responseDeserializer,
@ -20,24 +18,22 @@ namespace Google.GRPC.Core
this.methodName = methodName;
this.requestSerializer = requestSerializer;
this.responseDeserializer = responseDeserializer;
this.timeout = timeout;
this.channel = channel;
}
public Channel Channel
public Call(Method<TRequest, TResponse> method, Channel channel)
{
get
{
return this.channel;
}
this.methodName = method.Name;
this.requestSerializer = method.RequestMarshaller.Serialize;
this.responseDeserializer = method.ResponseMarshaller.Deserialize;
this.channel = channel;
}
public TimeSpan Timeout
public Channel Channel
{
get
{
return this.timeout;
return this.channel;
}
}

@ -54,6 +54,11 @@
<Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Internal\ServerSafeHandle.cs" />
<Compile Include="Internal\StreamingInputObserver.cs" />
<Compile Include="Method.cs" />
<Compile Include="IMarshaller.cs" />
<Compile Include="ServerCalls.cs" />
<Compile Include="ServerCallHandler.cs" />
<Compile Include="Internal\ServerWritingObserver.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -0,0 +1,31 @@
using System;
namespace Google.GRPC.Core
{
/// <summary>
/// For serializing and deserializing messages.
/// </summary>
public interface IMarshaller<T>
{
byte[] Serialize(T value);
T Deserialize(byte[] payload);
}
/// <summary>
/// UTF-8 Marshalling for string. Useful for testing.
/// </summary>
internal class StringMarshaller : IMarshaller<string> {
public byte[] Serialize(string value)
{
return System.Text.Encoding.UTF8.GetBytes(value);
}
public string Deserialize(byte[] payload)
{
return System.Text.Encoding.UTF8.GetString(payload);
}
}
}

@ -86,6 +86,14 @@ namespace Google.GRPC.Core.Internal
return StartRead().Task;
}
public Task Halfclosed
{
get
{
return halfcloseTcs.Task;
}
}
public Task<Status> Finished
{
get

@ -30,8 +30,8 @@ namespace Google.GRPC.Core.Internal
[DllImport("libgrpc.so")]
static extern void grpc_server_shutdown(ServerSafeHandle server);
[DllImport("libgrpc.so")]
static extern void grpc_server_shutdown_and_notify(ServerSafeHandle server, IntPtr tag);
[DllImport("libgrpc.so", EntryPoint = "grpc_server_shutdown_and_notify")]
static extern void grpc_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
[DllImport("libgrpc.so")]
static extern void grpc_server_destroy(IntPtr server);
@ -62,6 +62,11 @@ namespace Google.GRPC.Core.Internal
grpc_server_shutdown(this);
}
public void ShutdownAndNotify(EventCallbackDelegate callback)
{
grpc_server_shutdown_and_notify_CALLBACK(this, callback);
}
public GRPCCallError RequestCall(EventCallbackDelegate callback)
{
return grpc_server_request_call_old_CALLBACK(this, callback);

@ -0,0 +1,38 @@
using System;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
/// <summary>
/// Observer that writes all arriving messages to a call abstraction (in blocking fashion)
/// and then halfcloses the call. Used for server-side call handling.
/// </summary>
internal class ServerWritingObserver<TWrite, TRead> : IObserver<TWrite>
{
readonly AsyncCall<TWrite, TRead> call;
public ServerWritingObserver(AsyncCall<TWrite, TRead> call)
{
this.call = call;
}
public void OnCompleted()
{
// TODO: how bad is the Wait here?
call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
}
public void OnError(Exception error)
{
// TODO: handle this...
throw new InvalidOperationException("This should never be called.");
}
public void OnNext(TWrite value)
{
// TODO: how bad is the Wait here?
call.WriteAsync(value).Wait();
}
}
}

@ -1,7 +1,7 @@
using System;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core
namespace Google.GRPC.Core.Internal
{
internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
{

@ -0,0 +1,64 @@
using System;
namespace Google.GRPC.Core
{
public enum MethodType
{
Unary,
ClientStreaming,
ServerStreaming,
DuplexStreaming
}
/// <summary>
/// A description of a service method.
/// </summary>
public class Method<TRequest, TResponse>
{
readonly MethodType type;
readonly string name;
readonly IMarshaller<TRequest> requestMarshaller;
readonly IMarshaller<TResponse> responseMarshaller;
public Method(MethodType type, string name, IMarshaller<TRequest> requestMarshaller, IMarshaller<TResponse> responseMarshaller)
{
this.type = type;
this.name = name;
this.requestMarshaller = requestMarshaller;
this.responseMarshaller = responseMarshaller;
}
public MethodType Type
{
get
{
return this.type;
}
}
public string Name
{
get
{
return this.name;
}
}
public IMarshaller<TRequest> RequestMarshaller
{
get
{
return this.requestMarshaller;
}
}
public IMarshaller<TResponse> ResponseMarshaller
{
get
{
return this.responseMarshaller;
}
}
}
}

@ -1,7 +1,9 @@
using System;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core
@ -15,10 +17,15 @@ namespace Google.GRPC.Core
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly EventCallbackDelegate newRpcHandler;
readonly EventCallbackDelegate serverShutdownHandler;
readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle;
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
static Server() {
GrpcEnvironment.EnsureInitialized();
}
@ -28,8 +35,14 @@ namespace Google.GRPC.Core
// TODO: what is the tag for server shutdown?
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
this.newRpcHandler = HandleNewRpc;
this.serverShutdownHandler = HandleServerShutdown;
}
// only call before Start(), this will be in server builder in the future.
internal void AddCallHandler(string methodName, IServerCallHandler handler) {
callHandlers.Add(methodName, handler);
}
// only call before Start()
public int AddPort(string addr) {
return handle.AddPort(addr);
}
@ -37,49 +50,57 @@ namespace Google.GRPC.Core
public void Start()
{
handle.Start();
// TODO: this basically means the server is single threaded....
StartHandlingRpcs();
}
public void RunRpc()
/// <summary>
/// Requests and handles single RPC call.
/// </summary>
internal void RunRpc()
{
AllowOneRpc();
try {
var rpcInfo = newRpcQueue.Take();
Console.WriteLine("Server received RPC " + rpcInfo.Method);
AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
(payload) => payload, (payload) => payload);
asyncCall.InitializeServer(rpcInfo.Call);
try
{
var rpcInfo = newRpcQueue.Take();
asyncCall.Accept(GetCompletionQueue());
Console.WriteLine("Server received RPC " + rpcInfo.Method);
while(true) {
byte[] payload = asyncCall.ReadAsync().Result;
if (payload == null)
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
{
break;
}
callHandler = new NoSuchMethodCallHandler();
}
callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue());
}
asyncCall.WriteAsync(new byte[] { }).Wait();
// TODO: what should be the details?
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
asyncCall.Finished.Wait();
} catch(Exception e) {
catch(Exception e)
{
Console.WriteLine("Exception while handling RPC: " + e);
}
}
// TODO: implement disposal properly...
public void Shutdown() {
handle.Shutdown();
/// <summary>
/// Requests server shutdown and when there are no more calls being serviced,
/// cleans up used resources.
/// </summary>
/// <returns>The async.</returns>
public async Task ShutdownAsync() {
handle.ShutdownAndNotify(serverShutdownHandler);
await shutdownTcs.Task;
handle.Dispose();
}
public void Kill() {
handle.Dispose();
}
//handle.Dispose();
private async Task StartHandlingRpcs() {
while (true)
{
await Task.Factory.StartNew(RunRpc);
}
}
private void AllowOneRpc()
@ -100,6 +121,18 @@ namespace Google.GRPC.Core
}
}
private void HandleServerShutdown(IntPtr eventPtr)
{
try
{
shutdownTcs.SetResult(null);
}
catch (Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private static void AssertCallOk(GRPCCallError callError)
{
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");

@ -0,0 +1,93 @@
using System;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core
{
internal interface IServerCallHandler
{
void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq);
}
internal class UnaryRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler
{
readonly Method<TRequest, TResponse> method;
readonly UnaryRequestServerMethod<TRequest, TResponse> handler;
public UnaryRequestServerCallHandler(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler)
{
this.method = method;
this.handler = handler;
}
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCall<TResponse, TRequest>(
(msg) => method.ResponseMarshaller.Serialize(msg),
(payload) => method.RequestMarshaller.Deserialize(payload));
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
var request = asyncCall.ReadAsync().Result;
var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
handler(request, responseObserver);
asyncCall.Halfclosed.Wait();
// TODO: wait until writing is finished
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
asyncCall.Finished.Wait();
}
}
internal class StreamingRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler
{
readonly Method<TRequest, TResponse> method;
readonly StreamingRequestServerMethod<TRequest, TResponse> handler;
public StreamingRequestServerCallHandler(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler)
{
this.method = method;
this.handler = handler;
}
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCall<TResponse, TRequest>(
(msg) => method.ResponseMarshaller.Serialize(msg),
(payload) => method.RequestMarshaller.Deserialize(payload));
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
var requestObserver = handler(responseObserver);
// feed the requests
asyncCall.StartReadingToStream(requestObserver);
asyncCall.Halfclosed.Wait();
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
asyncCall.Finished.Wait();
}
}
internal class NoSuchMethodCallHandler : IServerCallHandler
{
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
// We don't care about the payload type here.
AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
(payload) => payload, (payload) => payload);
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
asyncCall.Finished.Wait();
}
}
}

@ -0,0 +1,25 @@
using System;
namespace Google.GRPC.Core
{
// TODO: perhaps add also serverSideStreaming and clientSideStreaming
public delegate void UnaryRequestServerMethod<TRequest, TResponse> (TRequest request, IObserver<TResponse> responseObserver);
public delegate IObserver<TRequest> StreamingRequestServerMethod<TRequest, TResponse> (IObserver<TResponse> responseObserver);
internal static class ServerCalls {
public static IServerCallHandler UnaryRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler)
{
return new UnaryRequestServerCallHandler<TRequest, TResponse>(method, handler);
}
public static IServerCallHandler StreamingRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler)
{
return new StreamingRequestServerCallHandler<TRequest, TResponse>(method, handler);
}
}
}

@ -8,41 +8,48 @@ namespace Google.GRPC.Core.Tests
{
public class ClientServerTest
{
string request = "REQUEST";
string serverAddr = "localhost:" + Utils.PickUnusedPort();
private Method<string, string> unaryEchoStringMethod = new Method<string, string>(
MethodType.Unary,
"/tests.Test/UnaryEchoString",
new StringMarshaller(),
new StringMarshaller());
[Test]
public void EmptyCall()
{
Server server = new Server();
server.AddCallHandler(unaryEchoStringMethod.Name,
ServerCalls.UnaryRequestCall(unaryEchoStringMethod, HandleUnaryEchoString));
server.AddPort(serverAddr);
server.Start();
Task.Factory.StartNew(
() => {
server.RunRpc();
}
);
using (Channel channel = new Channel(serverAddr))
{
CreateCall(channel);
string response = Calls.BlockingUnaryCall(CreateCall(channel), request, default(CancellationToken));
Console.WriteLine("Received response: " + response);
var call = CreateUnaryEchoStringCall(channel);
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)));
Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken)));
}
server.Shutdown();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
private Call<string, string> CreateCall(Channel channel)
private Call<string, string> CreateUnaryEchoStringCall(Channel channel)
{
return new Call<string, string>("/tests.Test/EmptyCall",
(s) => System.Text.Encoding.ASCII.GetBytes(s),
(b) => System.Text.Encoding.ASCII.GetString(b),
Timeout.InfiniteTimeSpan, channel);
return new Call<string, string>(unaryEchoStringMethod, channel);
}
private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) {
responseObserver.OnNext(request);
responseObserver.OnCompleted();
}
}
}

@ -12,7 +12,7 @@ namespace Google.GRPC.Core.Tests
Server server = new Server();
server.AddPort("localhost:" + Utils.PickUnusedPort());
server.Start();
server.Shutdown();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}

Loading…
Cancel
Save