Merge pull request #968 from jtattermusch/csharp_refactoring

C# refactoring and code cleanup
pull/974/head
Michael Lumish 10 years ago
commit 4947638c62
  1. 1
      src/csharp/.gitignore
  2. 2
      src/csharp/Grpc.Core.Tests/PInvokeTest.cs
  3. 2
      src/csharp/Grpc.Core/ChannelArgs.cs
  4. 7
      src/csharp/Grpc.Core/Grpc.Core.csproj
  5. 577
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  6. 407
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  7. 125
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  8. 95
      src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
  9. 56
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  10. 41
      src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs
  11. 15
      src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
  12. 45
      src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs
  13. 48
      src/csharp/Grpc.Core/Internal/Timespec.cs
  14. 48
      src/csharp/Grpc.Core/OperationFailedException.cs
  15. 34
      src/csharp/Grpc.Core/ServerCallHandler.cs
  16. 62
      src/csharp/Grpc.Core/Status.cs
  17. 113
      src/csharp/Grpc.Core/Utils/Preconditions.cs
  18. 27
      src/csharp/Grpc.Examples.MathClient/MathClient.cs
  19. 128
      src/csharp/Grpc.Examples/MathExamples.cs

@ -1,4 +1,5 @@
*.userprefs
StyleCop.Cache
test-results
packages
Grpc.v12.suo

@ -127,8 +127,6 @@ namespace Grpc.Core.Tests
[Test]
public void NopPInvokeBenchmark()
{
CompletionCallbackDelegate handler = Handler;
BenchmarkUtil.RunBenchmark(
1000000, 100000000,
() => {

@ -99,7 +99,7 @@ namespace Grpc.Core
}
return nativeArgs;
}
catch (Exception e)
catch (Exception)
{
if (nativeArgs != null)
{

@ -51,7 +51,6 @@
<Compile Include="Internal\SafeHandleZeroIsInvalid.cs" />
<Compile Include="Internal\Timespec.cs" />
<Compile Include="Internal\GrpcThreadPool.cs" />
<Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Internal\ServerSafeHandle.cs" />
<Compile Include="Method.cs" />
<Compile Include="ServerCalls.cs" />
@ -69,6 +68,12 @@
<Compile Include="Credentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
<Compile Include="ChannelArgs.cs" />
<Compile Include="Internal\AsyncCompletion.cs" />
<Compile Include="Internal\AsyncCallBase.cs" />
<Compile Include="Internal\AsyncCallServer.cs" />
<Compile Include="OperationFailedException.cs" />
<Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Utils\Preconditions.cs" />
</ItemGroup>
<Choose>
<!-- Under older versions of Monodevelop, Choose is not supported and is just

@ -43,84 +43,47 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// Handles native call lifecycle and provides convenience methods.
/// Handles client side native call lifecycle.
/// </summary>
internal class AsyncCall<TWrite, TRead>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
readonly CompletionCallbackDelegate unaryResponseHandler;
readonly CompletionCallbackDelegate finishedHandler;
readonly CompletionCallbackDelegate writeFinishedHandler;
readonly CompletionCallbackDelegate readFinishedHandler;
readonly CompletionCallbackDelegate halfclosedHandler;
readonly CompletionCallbackDelegate finishedServersideHandler;
object myLock = new object();
GCHandle gchandle;
CallSafeHandle call;
bool disposed;
bool server;
bool started;
bool errorOccured;
bool cancelRequested;
bool readingDone;
bool halfcloseRequested;
bool halfclosed;
bool finished;
// Completion of a pending write if not null.
TaskCompletionSource<object> writeTcs;
// Completion of a pending read if not null.
TaskCompletionSource<TRead> readTcs;
// Completion of a pending halfclose if not null.
TaskCompletionSource<object> halfcloseTcs;
// Completion of a pending unary response if not null.
TaskCompletionSource<TRead> unaryResponseTcs;
TaskCompletionSource<TResponse> unaryResponseTcs;
// Set after status is received on client. Only used for server streaming and duplex streaming calls.
// Set after status is received. Only used for streaming response calls.
Nullable<Status> finishedStatus;
TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
// For streaming, the reads will be delivered to this observer.
IObserver<TRead> readObserver;
bool readObserverCompleted; // True if readObserver has already been completed.
public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
this.serializer = serializer;
this.deserializer = deserializer;
this.unaryResponseHandler = HandleUnaryResponse;
this.finishedHandler = HandleFinished;
this.writeFinishedHandler = HandleWriteFinished;
this.readFinishedHandler = HandleReadFinished;
this.halfclosedHandler = HandleHalfclosed;
this.finishedServersideHandler = HandleFinishedServerside;
this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse);
this.finishedHandler = CreateBatchCompletionCallback(HandleFinished);
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName)
{
InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false);
var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture);
InitializeInternal(call);
}
public void InitializeServer(CallSafeHandle call)
{
InitializeInternal(call, true);
}
public TRead UnaryCall(Channel channel, String methodName, TWrite msg)
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
// it is reusing fair amount of code in this class, so we are leaving it here.
// TODO: for other calls, you need to call Initialize, this methods calls initialize
// on its own, so there's a usage inconsistency.
/// <summary>
/// Blocking unary request - unary response call.
/// </summary>
public TResponse UnaryCall(Channel channel, String methodName, TRequest msg)
{
using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
// TODO: handle serialization error...
byte[] payload = serializer(msg);
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TRead>();
unaryResponseTcs = new TaskCompletionSource<TResponse>();
lock (myLock)
{
@ -143,508 +106,200 @@ namespace Grpc.Core.Internal
}
}
public Task<TRead> UnaryCallAsync(TWrite msg)
/// <summary>
/// Starts a unary request - unary response call.
/// </summary>
public Task<TResponse> UnaryCallAsync(TRequest msg)
{
lock (myLock)
{
Preconditions.CheckNotNull(call);
started = true;
halfcloseRequested = true;
readingDone = true;
// TODO: handle serialization error...
byte[] payload = serializer(msg);
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TRead>();
unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartUnary(payload, unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
public Task<TRead> ClientStreamingCallAsync()
/// <summary>
/// Starts a streamed request - unary response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
public Task<TResponse> ClientStreamingCallAsync()
{
lock (myLock)
{
Preconditions.CheckNotNull(call);
started = true;
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TRead>();
unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartClientStreaming(unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver)
/// <summary>
/// Starts a unary request - streamed response call.
/// </summary>
public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver)
{
lock (myLock)
{
Preconditions.CheckNotNull(call);
started = true;
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
this.readObserver = readObserver;
// TODO: handle serialization error...
byte[] payload = serializer(msg);
byte[] payload = UnsafeSerialize(msg);
call.StartServerStreaming(payload, finishedHandler);
ReceiveMessageAsync();
StartReceiveMessage();
}
}
public void StartDuplexStreamingCall(IObserver<TRead> readObserver)
/// <summary>
/// Starts a streaming request - streaming response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
public void StartDuplexStreamingCall(IObserver<TResponse> readObserver)
{
lock (myLock)
{
Preconditions.CheckNotNull(call);
started = true;
this.readObserver = readObserver;
call.StartDuplexStreaming(finishedHandler);
ReceiveMessageAsync();
StartReceiveMessage();
}
}
public Task ServerSideUnaryRequestCallAsync()
{
lock (myLock)
{
started = true;
call.StartServerSide(finishedServersideHandler);
return finishedServersideTcs.Task;
}
}
public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver)
{
lock (myLock)
{
started = true;
call.StartServerSide(finishedServersideHandler);
if (this.readObserver != null)
{
throw new InvalidOperationException("Already registered an observer.");
}
this.readObserver = readObserver;
ReceiveMessageAsync();
return finishedServersideTcs.Task;
}
}
public Task SendMessageAsync(TWrite msg)
/// <summary>
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendMessage(TRequest msg, AsyncCompletionDelegate completionDelegate)
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNoError();
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
if (writeTcs != null)
{
throw new InvalidOperationException("Only one write can be pending at a time");
}
// TODO: wrap serialization...
byte[] payload = serializer(msg);
call.StartSendMessage(payload, writeFinishedHandler);
writeTcs = new TaskCompletionSource<object>();
return writeTcs.Task;
}
StartSendMessageInternal(msg, completionDelegate);
}
public Task SendCloseFromClientAsync()
/// <summary>
/// Sends halfclose, indicating client is done with streaming requests.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate)
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNoError();
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendCloseFromClient(halfclosedHandler);
halfcloseRequested = true;
halfcloseTcs = new TaskCompletionSource<object>();
return halfcloseTcs.Task;
}
}
public Task SendStatusFromServerAsync(Status status)
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNoError();
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.StartSendStatusFromServer(status, halfclosedHandler);
halfcloseRequested = true;
halfcloseTcs = new TaskCompletionSource<object>();
return halfcloseTcs.Task;
sendCompletionDelegate = completionDelegate;
}
}
public Task<TRead> ReceiveMessageAsync()
/// <summary>
/// On client-side, we only fire readObserver.OnCompleted once all messages have been read
/// and status has been received.
/// </summary>
protected override void CompleteReadObserver()
{
lock (myLock)
if (readingDone && finishedStatus.HasValue)
{
CheckNotDisposed();
CheckStarted();
CheckNoError();
if (readingDone)
{
throw new InvalidOperationException("Already read the last message.");
}
if (readTcs != null)
bool shouldComplete;
lock (myLock)
{
throw new InvalidOperationException("Only one read can be pending at a time");
shouldComplete = !readObserverCompleted;
readObserverCompleted = true;
}
call.StartReceiveMessage(readFinishedHandler);
readTcs = new TaskCompletionSource<TRead>();
return readTcs.Task;
}
}
public void Cancel()
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
cancelRequested = true;
}
// grpc_call_cancel is threadsafe
call.Cancel();
}
public void CancelWithStatus(Status status)
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
cancelRequested = true;
}
// grpc_call_cancel_with_status is threadsafe
call.CancelWithStatus(status);
}
private void InitializeInternal(CallSafeHandle call, bool server)
{
lock (myLock)
{
// Make sure this object and the delegated held by it will not be garbage collected
// before we release this handle.
gchandle = GCHandle.Alloc(this);
this.call = call;
this.server = server;
}
}
private void CheckStarted()
{
if (!started)
{
throw new InvalidOperationException("Call not started");
}
}
private void CheckNotDisposed()
{
if (disposed)
{
throw new InvalidOperationException("Call has already been disposed.");
}
}
private void CheckNoError()
{
if (errorOccured)
{
throw new InvalidOperationException("Error occured when processing call.");
}
}
private bool ReleaseResourcesIfPossible()
{
if (!disposed && call != null)
{
if (halfclosed && readingDone && finished)
if (shouldComplete)
{
ReleaseResources();
return true;
var status = finishedStatus.Value;
if (status.StatusCode != StatusCode.OK)
{
FireReadObserverOnError(new RpcException(status));
}
else
{
FireReadObserverOnCompleted();
}
}
}
return false;
}
private void ReleaseResources()
{
if (call != null) {
call.Dispose();
}
gchandle.Free();
disposed = true;
}
private void CompleteStreamObserver(Status status)
{
if (status.StatusCode != StatusCode.OK)
{
// TODO: wrap to handle exceptions;
readObserver.OnError(new RpcException(status));
} else {
// TODO: wrap to handle exceptions;
readObserver.OnCompleted();
}
}
/// <summary>
/// Handler for unary response completion.
/// </summary>
private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr)
private void HandleUnaryResponse(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
try
lock(myLock)
{
TaskCompletionSource<TRead> tcs;
lock(myLock)
{
finished = true;
halfclosed = true;
tcs = unaryResponseTcs;
ReleaseResourcesIfPossible();
}
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
finished = true;
halfclosed = true;
if (error != GRPCOpError.GRPC_OP_OK)
{
tcs.SetException(new RpcException(
new Status(StatusCode.Internal, "Internal error occured.")
));
return;
}
var status = ctx.GetReceivedStatus();
if (status.StatusCode != StatusCode.OK)
{
tcs.SetException(new RpcException(status));
return;
}
// TODO: handle deserialize error...
var msg = deserializer(ctx.GetReceivedMessage());
tcs.SetResult(msg);
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
ReleaseResourcesIfPossible();
}
}
private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
TaskCompletionSource<object> oldTcs = null;
lock (myLock)
{
oldTcs = writeTcs;
writeTcs = null;
}
if (errorOccured)
{
// TODO: use the right type of exception...
oldTcs.SetException(new Exception("Write failed"));
}
else
{
// TODO: where does the continuation run?
oldTcs.SetResult(null);
}
}
catch(Exception e)
if (wasError)
{
Console.WriteLine("Caught exception in a native handler: " + e);
unaryResponseTcs.SetException(new RpcException(
new Status(StatusCode.Internal, "Internal error occured.")
));
return;
}
}
private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
lock (myLock)
{
halfclosed = true;
ReleaseResourcesIfPossible();
}
if (error != GRPCOpError.GRPC_OP_OK)
{
halfcloseTcs.SetException(new Exception("Halfclose failed"));
}
else
{
halfcloseTcs.SetResult(null);
}
}
catch(Exception e)
var status = ctx.GetReceivedStatus();
if (status.StatusCode != StatusCode.OK)
{
Console.WriteLine("Caught exception in a native handler: " + e);
unaryResponseTcs.SetException(new RpcException(status));
return;
}
}
private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
var payload = ctx.GetReceivedMessage();
TaskCompletionSource<TRead> oldTcs = null;
IObserver<TRead> observer = null;
Nullable<Status> status = null;
lock (myLock)
{
oldTcs = readTcs;
readTcs = null;
if (payload == null)
{
readingDone = true;
}
observer = readObserver;
status = finishedStatus;
ReleaseResourcesIfPossible();
}
// TODO: wrap deserialization...
TRead msg = payload != null ? deserializer(payload) : default(TRead);
oldTcs.SetResult(msg);
// TODO: handle deserialization error
TResponse msg;
TryDeserialize(ctx.GetReceivedMessage(), out msg);
// TODO: make sure we deliver reads in the right order.
if (observer != null)
{
if (payload != null)
{
// TODO: wrap to handle exceptions
observer.OnNext(msg);
// start a new read
ReceiveMessageAsync();
}
else
{
if (!server)
{
if (status.HasValue)
{
CompleteStreamObserver(status.Value);
}
}
else
{
// TODO: wrap to handle exceptions..
observer.OnCompleted();
}
// TODO: completeStreamObserver serverside...
}
}
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
unaryResponseTcs.SetResult(msg);
}
private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr)
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
private void HandleFinished(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
var status = ctx.GetReceivedStatus();
bool wasReadingDone;
lock (myLock)
{
finished = true;
finishedStatus = status;
wasReadingDone = readingDone;
ReleaseResourcesIfPossible();
}
if (wasReadingDone) {
CompleteStreamObserver(status);
}
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
var status = ctx.GetReceivedStatus();
private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr)
{
try
lock (myLock)
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
lock(myLock)
{
finished = true;
// TODO: because of the way server calls are implemented, we need to set
// reading done to true here. Should be fixed in the future.
readingDone = true;
ReleaseResourcesIfPossible();
}
// TODO: handle error ...
finishedServersideTcs.SetResult(null);
finished = true;
finishedStatus = status;
ReleaseResourcesIfPossible();
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
CompleteReadObserver();
}
}
}

@ -0,0 +1,407 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// Base for handling both client side and server side calls.
/// Handles native call lifecycle and provides convenience methods.
/// </summary>
internal abstract class AsyncCallBase<TWrite, TRead>
{
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
protected readonly CompletionCallbackDelegate sendFinishedHandler;
protected readonly CompletionCallbackDelegate readFinishedHandler;
protected readonly CompletionCallbackDelegate halfclosedHandler;
protected readonly object myLock = new object();
protected GCHandle gchandle;
protected CallSafeHandle call;
protected bool disposed;
protected bool started;
protected bool errorOccured;
protected bool cancelRequested;
protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected bool readPending; // True if there is a read in progress.
protected bool readingDone;
protected bool halfcloseRequested;
protected bool halfclosed;
protected bool finished; // True if close has been received from the peer.
// Streaming reads will be delivered to this observer. For a call that only does unary read it may remain null.
protected IObserver<TRead> readObserver;
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = Preconditions.CheckNotNull(serializer);
this.deserializer = Preconditions.CheckNotNull(deserializer);
this.sendFinishedHandler = CreateBatchCompletionCallback(HandleSendFinished);
this.readFinishedHandler = CreateBatchCompletionCallback(HandleReadFinished);
this.halfclosedHandler = CreateBatchCompletionCallback(HandleHalfclosed);
}
/// <summary>
/// Requests cancelling the call.
/// </summary>
public void Cancel()
{
lock (myLock)
{
Preconditions.CheckState(started);
cancelRequested = true;
if (!disposed)
{
call.Cancel();
}
}
}
/// <summary>
/// Requests cancelling the call with given status.
/// </summary>
public void CancelWithStatus(Status status)
{
lock (myLock)
{
Preconditions.CheckState(started);
cancelRequested = true;
if (!disposed)
{
call.CancelWithStatus(status);
}
}
}
protected void InitializeInternal(CallSafeHandle call)
{
lock (myLock)
{
// Make sure this object and the delegated held by it will not be garbage collected
// before we release this handle.
gchandle = GCHandle.Alloc(this);
this.call = call;
}
}
/// <summary>
/// Initiates sending a message. Only once send operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate completionDelegate)
{
byte[] payload = UnsafeSerialize(msg);
lock (myLock)
{
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendMessage(payload, sendFinishedHandler);
sendCompletionDelegate = completionDelegate;
}
}
/// <summary>
/// Requests receiving a next message.
/// </summary>
protected void StartReceiveMessage()
{
lock (myLock)
{
Preconditions.CheckState(started);
Preconditions.CheckState(!disposed);
Preconditions.CheckState(!errorOccured);
Preconditions.CheckState(!readingDone);
Preconditions.CheckState(!readPending);
call.StartReceiveMessage(readFinishedHandler);
readPending = true;
}
}
/// <summary>
/// Default behavior just completes the read observer, but more sofisticated behavior might be required
/// by subclasses.
/// </summary>
protected virtual void CompleteReadObserver()
{
FireReadObserverOnCompleted();
}
/// <summary>
/// If there are no more pending actions and no new actions can be started, releases
/// the underlying native resources.
/// </summary>
protected bool ReleaseResourcesIfPossible()
{
if (!disposed && call != null)
{
if (halfclosed && readingDone && finished)
{
ReleaseResources();
return true;
}
}
return false;
}
private void ReleaseResources()
{
if (call != null)
{
call.Dispose();
}
gchandle.Free();
disposed = true;
}
protected void CheckSendingAllowed()
{
Preconditions.CheckState(started);
Preconditions.CheckState(!disposed);
Preconditions.CheckState(!errorOccured);
Preconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
protected byte[] UnsafeSerialize(TWrite msg)
{
return serializer(msg);
}
protected bool TrySerialize(TWrite msg, out byte[] payload)
{
try
{
payload = serializer(msg);
return true;
}
catch(Exception)
{
Console.WriteLine("Exception occured while trying to serialize message");
payload = null;
return false;
}
}
protected bool TryDeserialize(byte[] payload, out TRead msg)
{
try
{
msg = deserializer(payload);
return true;
}
catch(Exception)
{
Console.WriteLine("Exception occured while trying to deserialize message");
msg = default(TRead);
return false;
}
}
protected void FireReadObserverOnNext(TRead value)
{
try
{
readObserver.OnNext(value);
}
catch(Exception e)
{
Console.WriteLine("Exception occured while invoking readObserver.OnNext: " + e);
}
}
protected void FireReadObserverOnCompleted()
{
try
{
readObserver.OnCompleted();
}
catch(Exception e)
{
Console.WriteLine("Exception occured while invoking readObserver.OnCompleted: " + e);
}
}
protected void FireReadObserverOnError(Exception error)
{
try
{
readObserver.OnError(error);
}
catch(Exception e)
{
Console.WriteLine("Exception occured while invoking readObserver.OnError: " + e);
}
}
protected void FireCompletion(AsyncCompletionDelegate completionDelegate, Exception error)
{
try
{
completionDelegate(error);
}
catch(Exception e)
{
Console.WriteLine("Exception occured while invoking completion delegate: " + e);
}
}
/// <summary>
/// Creates completion callback delegate that wraps the batch completion handler in a try catch block to
/// prevent propagating exceptions accross managed/unmanaged boundary.
/// </summary>
protected CompletionCallbackDelegate CreateBatchCompletionCallback(Action<bool, BatchContextSafeHandleNotOwned> handler)
{
return new CompletionCallbackDelegate( (error, batchContextPtr) => {
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
bool wasError = (error != GRPCOpError.GRPC_OP_OK);
handler(wasError, ctx);
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
});
}
/// <summary>
/// Handles send completion.
/// </summary>
private void HandleSendFinished(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
AsyncCompletionDelegate origCompletionDelegate = null;
lock (myLock)
{
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
ReleaseResourcesIfPossible();
}
if (wasError)
{
FireCompletion(origCompletionDelegate, new OperationFailedException("Send failed"));
}
else
{
FireCompletion(origCompletionDelegate, null);
}
}
/// <summary>
/// Handles halfclose completion.
/// </summary>
private void HandleHalfclosed(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
AsyncCompletionDelegate origCompletionDelegate = null;
lock (myLock)
{
halfclosed = true;
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
ReleaseResourcesIfPossible();
}
if (wasError)
{
FireCompletion(origCompletionDelegate, new OperationFailedException("Halfclose failed"));
}
else
{
FireCompletion(origCompletionDelegate, null);
}
}
/// <summary>
/// Handles streaming read completion.
/// </summary>
private void HandleReadFinished(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
var payload = ctx.GetReceivedMessage();
lock (myLock)
{
readPending = false;
if (payload == null)
{
readingDone = true;
}
ReleaseResourcesIfPossible();
}
// TODO: handle the case when error occured...
if (payload != null)
{
// TODO: handle deserialization error
TRead msg;
TryDeserialize(payload, out msg);
FireReadObserverOnNext(msg);
// Start a new read. The current one has already been delivered,
// so correct ordering of reads is assured.
StartReceiveMessage();
}
else
{
CompleteReadObserver();
}
}
}
}

@ -0,0 +1,125 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// Handles server side native call lifecycle.
/// </summary>
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
readonly CompletionCallbackDelegate finishedServersideHandler;
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer)
{
this.finishedServersideHandler = CreateBatchCompletionCallback(HandleFinishedServerside);
}
public void Initialize(CallSafeHandle call)
{
InitializeInternal(call);
}
/// <summary>
/// Starts a server side call. Currently, all server side calls are implemented as duplex
/// streaming call and they are adapted to the appropriate streaming arity.
/// </summary>
public Task ServerSideCallAsync(IObserver<TRequest> readObserver)
{
lock (myLock)
{
Preconditions.CheckNotNull(call);
started = true;
this.readObserver = readObserver;
call.StartServerSide(finishedServersideHandler);
StartReceiveMessage();
return finishedServersideTcs.Task;
}
}
/// <summary>
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendMessage(TResponse msg, AsyncCompletionDelegate completionDelegate)
{
StartSendMessageInternal(msg, completionDelegate);
}
/// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate completionDelegate)
{
lock (myLock)
{
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendStatusFromServer(status, halfclosedHandler);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
}
}
/// <summary>
/// Handles the server side close completion.
/// </summary>
private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
lock (myLock)
{
finished = true;
ReleaseResourcesIfPossible();
}
// TODO: handle error ...
finishedServersideTcs.SetResult(null);
}
}
}

@ -0,0 +1,95 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// If error != null, there's been an error or operation has been cancelled.
/// </summary>
internal delegate void AsyncCompletionDelegate(Exception error);
/// <summary>
/// Helper for transforming AsyncCompletionDelegate into full-fledged Task.
/// </summary>
internal class AsyncCompletionTaskSource
{
readonly TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
readonly AsyncCompletionDelegate completionDelegate;
public AsyncCompletionTaskSource()
{
completionDelegate = new AsyncCompletionDelegate(HandleCompletion);
}
public Task Task
{
get
{
return tcs.Task;
}
}
public AsyncCompletionDelegate CompletionDelegate
{
get
{
return completionDelegate;
}
}
private void HandleCompletion(Exception error)
{
if (error == null)
{
tcs.SetResult(null);
return;
}
if (error is OperationCanceledException)
{
tcs.SetCanceled();
return;
}
tcs.SetException(error);
}
}
}

@ -1,5 +1,4 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
@ -30,7 +29,6 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
@ -38,14 +36,12 @@ using Grpc.Core;
namespace Grpc.Core.Internal
{
//TODO: rename the delegate
internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr);
internal delegate void CompletionCallbackDelegate(GRPCOpError error,IntPtr batchContextPtr);
/// <summary>
/// grpc_call from <grpc/grpc.h>
/// </summary>
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
const UInt32 GRPC_WRITE_BUFFER_HINT = 1;
[DllImport("grpc_csharp_ext.dll")]
@ -59,22 +55,22 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
@ -82,28 +78,27 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_destroy(IntPtr call);
private CallSafeHandle()
{
}
@ -115,12 +110,12 @@ namespace Grpc.Core.Internal
public void StartUnary(byte[] payload, CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length)));
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length)));
}
public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback)
{
grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong) payload.Length));
grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length));
}
public void StartClientStreaming(CompletionCallbackDelegate callback)
@ -130,7 +125,7 @@ namespace Grpc.Core.Internal
public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong) payload.Length)));
AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length)));
}
public void StartDuplexStreaming(CompletionCallbackDelegate callback)
@ -140,7 +135,7 @@ namespace Grpc.Core.Internal
public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong) payload.Length)));
AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length)));
}
public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
@ -173,19 +168,20 @@ namespace Grpc.Core.Internal
AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail));
}
protected override bool ReleaseHandle()
{
protected override bool ReleaseHandle()
{
grpcsharp_call_destroy(handle);
return true;
}
return true;
}
private static void AssertCallOk(GRPCCallError callError)
{
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
private static UInt32 GetFlags(bool buffered) {
private static UInt32 GetFlags(bool buffered)
{
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
}
}
}
}

@ -1,5 +1,4 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
@ -28,40 +27,40 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using Grpc.Core.Internal;
namespace Grpc.Core.Internal
{
internal class ClientStreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
{
{
readonly AsyncCall<TWrite, TRead> call;
public ClientStreamingInputObserver(AsyncCall<TWrite, TRead> call)
{
{
this.call = call;
}
public void OnCompleted()
{
}
public void OnCompleted()
{
var taskSource = new AsyncCompletionTaskSource();
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
call.SendCloseFromClientAsync().Wait();
}
taskSource.Task.Wait();
}
public void OnError(Exception error)
{
throw new InvalidOperationException("This should never be called.");
}
public void OnError(Exception error)
{
throw new InvalidOperationException("This should never be called.");
}
public void OnNext(TWrite value)
{
public void OnNext(TWrite value)
{
var taskSource = new AsyncCompletionTaskSource();
call.StartSendMessage(value, taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
call.SendMessageAsync(value).Wait();
}
}
taskSource.Task.Wait();
}
}
}

@ -1,5 +1,4 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
@ -28,9 +27,7 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
@ -40,8 +37,8 @@ namespace Grpc.Core.Internal
/// <summary>
/// grpc_completion_queue from <grpc/grpc.h>
/// </summary>
internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid
{
internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create();
@ -73,11 +70,11 @@ namespace Grpc.Core.Internal
grpcsharp_completion_queue_shutdown(this);
}
protected override bool ReleaseHandle()
protected override bool ReleaseHandle()
{
grpcsharp_completion_queue_destroy(handle);
return true;
}
}
return true;
}
}
}

@ -1,5 +1,4 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
@ -28,9 +27,7 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using Grpc.Core.Internal;
@ -40,32 +37,36 @@ namespace Grpc.Core.Internal
/// Observer that writes all arriving messages to a call abstraction (in blocking fashion)
/// and then halfcloses the call. Used for server-side call handling.
/// </summary>
internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite>
{
readonly AsyncCall<TWrite, TRead> call;
internal class ServerStreamingOutputObserver<TRequest, TResponse> : IObserver<TResponse>
{
readonly AsyncCallServer<TRequest, TResponse> call;
public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call)
{
public ServerStreamingOutputObserver(AsyncCallServer<TRequest, TResponse> call)
{
this.call = call;
}
}
public void OnCompleted()
{
public void OnCompleted()
{
var taskSource = new AsyncCompletionTaskSource();
call.StartSendStatusFromServer(new Status(StatusCode.OK, ""), taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
call.SendStatusFromServerAsync(new Status(StatusCode.OK, "")).Wait();
}
taskSource.Task.Wait();
}
public void OnError(Exception error)
{
public void OnError(Exception error)
{
// TODO: implement this...
throw new InvalidOperationException("This should never be called.");
}
throw new InvalidOperationException("This should never be called.");
}
public void OnNext(TWrite value)
{
public void OnNext(TResponse value)
{
var taskSource = new AsyncCompletionTaskSource();
call.StartSendMessage(value, taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
call.SendMessageAsync(value).Wait();
}
}
taskSource.Task.Wait();
}
}
}

@ -1,5 +1,4 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
@ -28,21 +27,19 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using System.Threading;
namespace Grpc.Core.Internal
{
/// <summary>
/// gpr_timespec from grpc/support/time.h
/// </summary>
[StructLayout(LayoutKind.Sequential)]
internal struct Timespec
{
/// <summary>
/// gpr_timespec from grpc/support/time.h
/// </summary>
[StructLayout(LayoutKind.Sequential)]
internal struct Timespec
{
const int nanosPerSecond = 1000 * 1000 * 1000;
const int nanosPerTick = 100;
@ -54,23 +51,22 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern int gprsharp_sizeof_timespec();
// TODO: revisit this.
// NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8
// NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8
// so IntPtr seems to have the right size to work on both.
public System.IntPtr tv_sec;
public System.IntPtr tv_nsec;
public System.IntPtr tv_sec;
public System.IntPtr tv_nsec;
/// <summary>
/// Timespec a long time in the future.
/// </summary>
public static Timespec InfFuture
{
get
{
/// <summary>
/// Timespec a long time in the future.
/// </summary>
public static Timespec InfFuture
{
get
{
return gprsharp_inf_future();
}
}
}
}
public static Timespec Now
{
@ -92,7 +88,8 @@ namespace Grpc.Core.Internal
/// Creates a GPR deadline from current instant and given timeout.
/// </summary>
/// <returns>The from timeout.</returns>
public static Timespec DeadlineFromTimeout(TimeSpan timeout) {
public static Timespec DeadlineFromTimeout(TimeSpan timeout)
{
if (timeout == Timeout.InfiniteTimeSpan)
{
return Timespec.InfFuture;
@ -100,7 +97,8 @@ namespace Grpc.Core.Internal
return Timespec.Now.Add(timeout);
}
public Timespec Add(TimeSpan timeSpan) {
public Timespec Add(TimeSpan timeSpan)
{
long nanos = tv_nsec.ToInt64() + (timeSpan.Ticks % TimeSpan.TicksPerSecond) * nanosPerTick;
long overflow_sec = (nanos > nanosPerSecond) ? 1 : 0;
@ -109,6 +107,6 @@ namespace Grpc.Core.Internal
result.tv_sec = new IntPtr(tv_sec.ToInt64() + (timeSpan.Ticks / TimeSpan.TicksPerSecond) + overflow_sec);
return result;
}
}
}
}

@ -0,0 +1,48 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
namespace Grpc.Core
{
/// <summary>
/// Thrown when gRPC operation fails.
/// </summary>
public class OperationFailedException : Exception
{
public OperationFailedException(string message) : base(message)
{
}
}
}

@ -32,7 +32,9 @@
#endregion
using System;
using System.Linq;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core
{
@ -54,17 +56,17 @@ namespace Grpc.Core
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCall<TResponse, TRequest>(
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
asyncCall.InitializeServer(call);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync();
var requestObserver = new RecordingObserver<TRequest>();
var finishedTask = asyncCall.ServerSideCallAsync(requestObserver);
var request = asyncCall.ReceiveMessageAsync().Result;
var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
var request = requestObserver.ToList().Result.Single();
var responseObserver = new ServerStreamingOutputObserver<TRequest, TResponse>(asyncCall);
handler(request, responseObserver);
finishedTask.Wait();
@ -85,15 +87,15 @@ namespace Grpc.Core
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCall<TResponse, TRequest>(
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
asyncCall.InitializeServer(call);
asyncCall.Initialize(call);
var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
var responseObserver = new ServerStreamingOutputObserver<TRequest,TResponse>(asyncCall);
var requestObserver = handler(responseObserver);
var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver);
var finishedTask = asyncCall.ServerSideCallAsync(requestObserver);
finishedTask.Wait();
}
}
@ -103,17 +105,15 @@ namespace Grpc.Core
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
// We don't care about the payload type here.
AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload);
asyncCall.Initialize(call);
asyncCall.InitializeServer(call);
var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>());
var finishedTask = asyncCall.ServerSideCallAsync(new NullObserver<byte[]>());
// TODO: this makes the call finish before all reads can be done which causes trouble
// in AsyncCall.HandleReadFinished callback. Revisit this.
asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, "No such method.")).Wait();
// TODO: check result of the completion status.
asyncCall.StartSendStatusFromServer(new Status(StatusCode.Unimplemented, "No such method."), new AsyncCompletionDelegate((error) => {}));
finishedTask.Wait();
}

@ -1,5 +1,4 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
@ -28,7 +27,6 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
@ -36,34 +34,40 @@ using System.Runtime.InteropServices;
namespace Grpc.Core
{
/// <summary>
/// Represents RPC result.
/// </summary>
public struct Status
{
readonly StatusCode statusCode;
readonly string detail;
/// <summary>
/// Represents RPC result.
/// </summary>
public struct Status
{
readonly StatusCode statusCode;
readonly string detail;
public Status(StatusCode statusCode, string detail)
{
this.statusCode = statusCode;
this.detail = detail;
}
public Status(StatusCode statusCode, string detail)
{
this.statusCode = statusCode;
this.detail = detail;
}
public StatusCode StatusCode
{
get
{
return statusCode;
}
}
/// <summary>
/// Gets the gRPC status code. OK indicates success, all other values indicate an error.
/// </summary>
public StatusCode StatusCode
{
get
{
return statusCode;
}
}
public string Detail
{
get
{
return detail;
}
}
}
/// <summary>
/// Gets the detail.
/// </summary>
public string Detail
{
get
{
return detail;
}
}
}
}

@ -0,0 +1,113 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace Grpc.Core.Utils
{
public static class Preconditions
{
/// <summary>
/// Throws ArgumentException if condition is false.
/// </summary>
public static void CheckArgument(bool condition)
{
if (!condition)
{
throw new ArgumentException();
}
}
/// <summary>
/// Throws ArgumentException with given message if condition is false.
/// </summary>
public static void CheckArgument(bool condition, string errorMessage)
{
if (!condition)
{
throw new ArgumentException(errorMessage);
}
}
/// <summary>
/// Throws NullReferenceException if reference is null.
/// </summary>
public static T CheckNotNull<T> (T reference)
{
if (reference == null)
{
throw new NullReferenceException();
}
return reference;
}
/// <summary>
/// Throws NullReferenceException with given message if reference is null.
/// </summary>
public static T CheckNotNull<T> (T reference, string errorMessage)
{
if (reference == null)
{
throw new NullReferenceException(errorMessage);
}
return reference;
}
/// <summary>
/// Throws InvalidOperationException if condition is false.
/// </summary>
public static void CheckState(bool condition)
{
if (!condition)
{
throw new InvalidOperationException();
}
}
/// <summary>
/// Throws InvalidOperationException with given message if condition is false.
/// </summary>
public static void CheckState(bool condition, string errorMessage)
{
if (!condition)
{
throw new InvalidOperationException(errorMessage);
}
}
}
}

@ -1,5 +1,4 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
@ -28,9 +27,7 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using System.Threading;
@ -38,25 +35,25 @@ using Grpc.Core;
namespace math
{
class MathClient
class MathClient
{
public static void Main (string[] args)
{
public static void Main(string[] args)
{
GrpcEnvironment.Initialize();
using (Channel channel = new Channel("127.0.0.1:23456"))
{
MathGrpc.IMathServiceClient stub = new MathGrpc.MathServiceClientStub(channel);
MathExamples.DivExample(stub);
using (Channel channel = new Channel("127.0.0.1:23456"))
{
MathGrpc.IMathServiceClient stub = new MathGrpc.MathServiceClientStub(channel);
MathExamples.DivExample(stub);
MathExamples.FibExample(stub);
MathExamples.SumExample(stub);
MathExamples.SumExample(stub);
MathExamples.DivManyExample(stub);
}
MathExamples.DivManyExample(stub);
}
GrpcEnvironment.Shutdown();
}
}
}
}
}

@ -1,5 +1,4 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
@ -28,7 +27,6 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
@ -39,59 +37,63 @@ using Grpc.Core.Utils;
namespace math
{
public static class MathExamples
{
public static void DivExample(MathGrpc.IMathServiceClient stub)
{
DivReply result = stub.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build());
Console.WriteLine("Div Result: " + result);
}
public static void DivAsyncExample(MathGrpc.IMathServiceClient stub)
{
Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
DivReply result = call.Result;
Console.WriteLine(result);
}
public static void DivAsyncWithCancellationExample(MathGrpc.IMathServiceClient stub)
{
Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
DivReply result = call.Result;
Console.WriteLine(result);
}
public static void FibExample(MathGrpc.IMathServiceClient stub)
{
public static class MathExamples
{
public static void DivExample(MathGrpc.IMathServiceClient stub)
{
DivReply result = stub.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build());
Console.WriteLine("Div Result: " + result);
}
public static void DivAsyncExample(MathGrpc.IMathServiceClient stub)
{
Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
DivReply result = call.Result;
Console.WriteLine(result);
}
public static void DivAsyncWithCancellationExample(MathGrpc.IMathServiceClient stub)
{
Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
DivReply result = call.Result;
Console.WriteLine(result);
}
public static void FibExample(MathGrpc.IMathServiceClient stub)
{
var recorder = new RecordingObserver<Num>();
stub.Fib(new FibArgs.Builder { Limit = 5 }.Build(), recorder);
List<Num> numbers = recorder.ToList().Result;
List<Num> numbers = recorder.ToList().Result;
Console.WriteLine("Fib Result: " + string.Join("|", recorder.ToList().Result));
}
}
public static void SumExample(MathGrpc.IMathServiceClient stub)
{
List<Num> numbers = new List<Num>{new Num.Builder { Num_ = 1 }.Build(),
new Num.Builder { Num_ = 2 }.Build(),
new Num.Builder { Num_ = 3 }.Build()};
public static void SumExample(MathGrpc.IMathServiceClient stub)
{
List<Num> numbers = new List<Num>
{new Num.Builder { Num_ = 1 }.Build(),
new Num.Builder { Num_ = 2 }.Build(),
new Num.Builder { Num_ = 3 }.Build()
};
var res = stub.Sum();
foreach (var num in numbers) {
foreach (var num in numbers)
{
res.Inputs.OnNext(num);
}
res.Inputs.OnCompleted();
Console.WriteLine("Sum Result: " + res.Task.Result);
}
Console.WriteLine("Sum Result: " + res.Task.Result);
}
public static void DivManyExample(MathGrpc.IMathServiceClient stub)
{
List<DivArgs> divArgsList = new List<DivArgs>{
new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(),
new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(),
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
};
public static void DivManyExample(MathGrpc.IMathServiceClient stub)
{
List<DivArgs> divArgsList = new List<DivArgs>
{
new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(),
new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(),
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
};
var recorder = new RecordingObserver<DivReply>();
@ -102,30 +104,30 @@ namespace math
}
inputs.OnCompleted();
Console.WriteLine("DivMany Result: " + string.Join("|", recorder.ToList().Result));
}
Console.WriteLine("DivMany Result: " + string.Join("|", recorder.ToList().Result));
}
public static void DependendRequestsExample(MathGrpc.IMathServiceClient stub)
{
var numberList = new List<Num>
{ new Num.Builder{ Num_ = 1 }.Build(),
new Num.Builder{ Num_ = 2 }.Build(), new Num.Builder{ Num_ = 3 }.Build()
};
public static void DependendRequestsExample(MathGrpc.IMathServiceClient stub)
{
var numberList = new List<Num>
{ new Num.Builder{ Num_ = 1 }.Build(),
new Num.Builder{ Num_ = 2 }.Build(), new Num.Builder{ Num_ = 3 }.Build()
};
numberList.ToObservable();
numberList.ToObservable();
//IObserver<Num> numbers;
//Task<Num> call = stub.Sum(out numbers);
//foreach (var num in numberList)
//{
// numbers.OnNext(num);
//}
//numbers.OnCompleted();
//IObserver<Num> numbers;
//Task<Num> call = stub.Sum(out numbers);
//foreach (var num in numberList)
//{
// numbers.OnNext(num);
//}
//numbers.OnCompleted();
//Num sum = call.Result;
//Num sum = call.Result;
//DivReply result = stub.Div(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numberList.Count }.Build());
}
}
//DivReply result = stub.Div(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numberList.Count }.Build());
}
}
}

Loading…
Cancel
Save