Cleanup of AsyncCall.cs

pull/578/head
Jan Tattermusch 10 years ago
parent a96afb013b
commit 607307d0be
  1. 18
      src/csharp/GrpcApiTests/MathClientServerTests.cs
  2. 2
      src/csharp/GrpcCore/GrpcEnvironment.cs
  3. 325
      src/csharp/GrpcCore/Internal/AsyncCall.cs
  4. 11
      src/csharp/GrpcCore/ServerCallHandler.cs
  5. 23
      src/csharp/GrpcCoreTests/ClientServerTest.cs

@ -64,6 +64,15 @@ namespace math.Tests
client = MathGrpc.NewStub(channel);
}
[TestFixtureTearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
[Test]
public void Div1()
{
@ -136,15 +145,6 @@ namespace math.Tests
CollectionAssert.AreEqual(new long[] {3, 4, 3}, result.ConvertAll((divReply) => divReply.Quotient));
CollectionAssert.AreEqual(new long[] {1, 16, 1}, result.ConvertAll((divReply) => divReply.Remainder));
}
[TestFixtureTearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
}
}

@ -42,7 +42,7 @@ namespace Google.GRPC.Core
/// </summary>
public class GrpcEnvironment
{
const int THREAD_POOL_SIZE = 1;
const int THREAD_POOL_SIZE = 4;
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_init();

@ -42,15 +42,13 @@ using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
/// <summary>
/// Handle native call lifecycle and provides convenience methods.
/// Handles native call lifecycle and provides convenience methods.
/// </summary>
internal class AsyncCall<TWrite, TRead> : IDisposable
internal class AsyncCall<TWrite, TRead>
{
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly CompletionCallbackDelegate unaryResponseHandler;
readonly CompletionCallbackDelegate finishedHandler;
readonly CompletionCallbackDelegate writeFinishedHandler;
@ -59,35 +57,44 @@ namespace Google.GRPC.Core.Internal
readonly CompletionCallbackDelegate finishedServersideHandler;
object myLock = new object();
bool disposed;
GCHandle gchandle;
CallSafeHandle call;
bool disposed;
bool server;
bool started;
bool errorOccured;
bool cancelRequested;
bool readingDone;
bool halfcloseRequested;
bool halfclosed;
bool doneWithReading;
Nullable<Status> finishedStatus;
bool finished;
// Completion of a pending write if not null.
TaskCompletionSource<object> writeTcs;
// Completion of a pending read if not null.
TaskCompletionSource<TRead> readTcs;
TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
TaskCompletionSource<object> halfcloseTcs = new TaskCompletionSource<object>();
TaskCompletionSource<Status> finishedTcs = new TaskCompletionSource<Status>();
// Completion of a pending halfclose if not null.
TaskCompletionSource<object> halfcloseTcs;
// Completion of a pending unary response if not null.
TaskCompletionSource<TRead> unaryResponseTcs;
// Set after status is received on client. Only used for server streaming and duplex streaming calls.
Nullable<Status> finishedStatus;
TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
// For streaming, the reads will be delivered to this observer.
IObserver<TRead> readObserver;
public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = serializer;
this.deserializer = deserializer;
this.unaryResponseHandler = HandleUnaryResponseCompletion;
this.unaryResponseHandler = HandleUnaryResponse;
this.finishedHandler = HandleFinished;
this.writeFinishedHandler = HandleWriteFinished;
this.readFinishedHandler = HandleReadFinished;
@ -95,46 +102,23 @@ namespace Google.GRPC.Core.Internal
this.finishedServersideHandler = HandleFinishedServerside;
}
/// <summary>
/// Initiates reading to given observer.
/// </summary>
public void StartReadingToStream(IObserver<TRead> readObserver) {
lock (myLock)
{
CheckStarted();
if (this.readObserver != null)
{
throw new InvalidOperationException("Already registered an observer.");
}
this.readObserver = readObserver;
ReceiveMessageAsync();
}
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) {
lock (myLock)
{
this.call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture);
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName)
{
InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false);
}
public void InitializeServer(CallSafeHandle call)
{
lock(myLock)
{
this.call = call;
started = true;
server = true;
}
InitializeInternal(call, true);
}
public Task<TRead> UnaryCallAsync(TWrite msg)
{
lock (myLock)
{
started = true;
halfcloseRequested = true;
readingDone = true;
// TODO: handle serialization error...
byte[] payload = serializer(msg);
@ -151,6 +135,7 @@ namespace Google.GRPC.Core.Internal
lock (myLock)
{
started = true;
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TRead>();
call.StartClientStreaming(unaryResponseHandler);
@ -191,15 +176,43 @@ namespace Google.GRPC.Core.Internal
}
}
public Task SendMessageAsync(TWrite msg) {
public Task ServerSideUnaryRequestCallAsync()
{
lock (myLock)
{
started = true;
call.StartServerSide(finishedServersideHandler);
return finishedServersideTcs.Task;
}
}
public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver)
{
lock (myLock)
{
started = true;
call.StartServerSide(finishedServersideHandler);
if (this.readObserver != null)
{
throw new InvalidOperationException("Already registered an observer.");
}
this.readObserver = readObserver;
ReceiveMessageAsync();
return finishedServersideTcs.Task;
}
}
public Task SendMessageAsync(TWrite msg)
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
@ -222,18 +235,19 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.StartSendCloseFromClient(halfclosedHandler);
halfcloseRequested = true;
halfcloseTcs = new TaskCompletionSource<object>();
return halfcloseTcs.Task;
}
}
@ -242,18 +256,18 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.StartSendStatusFromServer(status, halfclosedHandler);
halfcloseRequested = true;
halfcloseTcs = new TaskCompletionSource<object>();
return halfcloseTcs.Task;
}
}
@ -262,13 +276,11 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
// TODO: add check for not cancelled?
if (doneWithReading)
if (readingDone)
{
throw new InvalidOperationException("Already read the last message.");
}
@ -285,22 +297,12 @@ namespace Google.GRPC.Core.Internal
}
}
internal Task StartServerSide()
{
lock (myLock)
{
call.StartServerSide(finishedServersideHandler);
return finishedServersideTcs.Task;
}
}
public void Cancel()
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
cancelRequested = true;
}
// grpc_call_cancel is threadsafe
@ -311,41 +313,23 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
cancelRequested = true;
}
// grpc_call_cancel_with_status is threadsafe
call.CancelWithStatus(status);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
if (call != null)
{
call.Dispose();
}
}
disposed = true;
}
}
private void UpdateErrorOccured(GRPCOpError error)
private void InitializeInternal(CallSafeHandle call, bool server)
{
if (error == GRPCOpError.GRPC_OP_ERROR)
lock (myLock)
{
errorOccured = true;
// Make sure this object and the delegated held by it will not be garbage collected
// before we release this handle.
gchandle = GCHandle.Alloc(this);
this.call = call;
this.server = server;
}
}
@ -357,41 +341,46 @@ namespace Google.GRPC.Core.Internal
}
}
private void CheckNoError()
private void CheckNotDisposed()
{
if (errorOccured)
if (disposed)
{
throw new InvalidOperationException("Error occured when processing call.");
throw new InvalidOperationException("Call has already been disposed.");
}
}
private void CheckNotFinished()
private void CheckNoError()
{
if (finishedStatus.HasValue)
if (errorOccured)
{
throw new InvalidOperationException("Already finished.");
throw new InvalidOperationException("Error occured when processing call.");
}
}
private void CheckCancelNotRequested()
private bool ReleaseResourcesIfPossible()
{
if (cancelRequested)
if (!disposed && call != null)
{
throw new InvalidOperationException("Cancel has been requested.");
if (halfclosed && readingDone && finished)
{
ReleaseResources();
return true;
}
}
return false;
}
private void DisposeResourcesIfNeeded()
private void ReleaseResources()
{
if (call != null && started && finishedStatus.HasValue)
{
// TODO: should we also wait for all the pending events to finish?
if (call != null) {
call.Dispose();
}
gchandle.Free();
disposed = true;
}
private void CompleteStreamObserver(Status status) {
private void CompleteStreamObserver(Status status)
{
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
// TODO: wrap to handle exceptions;
@ -402,20 +391,27 @@ namespace Google.GRPC.Core.Internal
}
}
private void HandleUnaryResponseCompletion(GRPCOpError error, IntPtr batchContextPtr) {
try {
/// <summary>
/// Handler for unary response completion.
/// </summary>
private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
TaskCompletionSource<TRead> tcs;
lock(myLock) {
lock(myLock)
{
finished = true;
halfclosed = true;
tcs = unaryResponseTcs;
}
// we're done with this call, get rid of the native object.
call.Dispose();
ReleaseResourcesIfPossible();
}
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
if (error != GRPCOpError.GRPC_OP_OK) {
if (error != GRPCOpError.GRPC_OP_OK)
{
tcs.SetException(new RpcException(
new Status(StatusCode.GRPC_STATUS_INTERNAL, "Internal error occured.")
));
@ -423,7 +419,8 @@ namespace Google.GRPC.Core.Internal
}
var status = ctx.GetReceivedStatus();
if (status.StatusCode != StatusCode.GRPC_STATUS_OK) {
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
tcs.SetException(new RpcException(status));
return;
}
@ -431,18 +428,20 @@ namespace Google.GRPC.Core.Internal
// TODO: handle deserialize error...
var msg = deserializer(ctx.GetReceivedMessage());
tcs.SetResult(msg);
} catch(Exception e) {
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) {
try {
private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
TaskCompletionSource<object> oldTcs = null;
lock (myLock)
{
UpdateErrorOccured(error);
oldTcs = writeTcs;
writeTcs = null;
}
@ -458,20 +457,25 @@ namespace Google.GRPC.Core.Internal
oldTcs.SetResult(null);
}
} catch(Exception e) {
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) {
try {
private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
lock (myLock)
{
UpdateErrorOccured(error);
halfclosed = true;
ReleaseResourcesIfPossible();
}
if (errorOccured)
if (error != GRPCOpError.GRPC_OP_OK)
{
halfcloseTcs.SetException(new Exception("Halfclose failed"));
@ -480,14 +484,17 @@ namespace Google.GRPC.Core.Internal
{
halfcloseTcs.SetResult(null);
}
} catch(Exception e) {
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) {
try {
private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
var payload = ctx.GetReceivedMessage();
@ -502,7 +509,7 @@ namespace Google.GRPC.Core.Internal
readTcs = null;
if (payload == null)
{
doneWithReading = true;
readingDone = true;
}
observer = readObserver;
status = finishedStatus;
@ -515,7 +522,8 @@ namespace Google.GRPC.Core.Internal
// TODO: make sure we deliver reads in the right order.
if (observer != null) {
if (observer != null)
{
if (payload != null)
{
// TODO: wrap to handle exceptions
@ -526,58 +534,81 @@ namespace Google.GRPC.Core.Internal
}
else
{
if (!server) {
if (status.HasValue) {
if (!server)
{
if (status.HasValue)
{
CompleteStreamObserver(status.Value);
}
} else {
}
else
{
// TODO: wrap to handle exceptions..
observer.OnCompleted();
}
// TODO: completeStreamObserver serverside...
}
}
} catch(Exception e) {
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) {
try {
private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
var status = ctx.GetReceivedStatus();
bool wasDoneWithReading;
bool wasReadingDone;
lock (myLock)
{
finished = true;
finishedStatus = status;
DisposeResourcesIfNeeded();
wasReadingDone = readingDone;
wasDoneWithReading = doneWithReading;
ReleaseResourcesIfPossible();
}
if (wasDoneWithReading) {
if (wasReadingDone) {
CompleteStreamObserver(status);
}
} catch(Exception e) {
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) {
try {
private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
lock(myLock)
{
finished = true;
// TODO: because of the way server calls are implemented, we need to set
// reading done to true here. Should be fixed in the future.
readingDone = true;
ReleaseResourcesIfPossible();
}
// TODO: handle error ...
finishedServersideTcs.SetResult(null);
call.Dispose();
} catch(Exception e) {
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}

@ -60,7 +60,7 @@ namespace Google.GRPC.Core
asyncCall.InitializeServer(call);
var finishedTask = asyncCall.StartServerSide();
var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync();
var request = asyncCall.ReceiveMessageAsync().Result;
@ -91,14 +91,9 @@ namespace Google.GRPC.Core
asyncCall.InitializeServer(call);
var finishedTask = asyncCall.StartServerSide();
var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
var requestObserver = handler(responseObserver);
// feed the requests
asyncCall.StartReadingToStream(requestObserver);
var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver);
finishedTask.Wait();
}
}
@ -114,7 +109,7 @@ namespace Google.GRPC.Core
asyncCall.InitializeServer(call);
var finishedTask = asyncCall.StartServerSide();
var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync();
asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();

@ -52,11 +52,21 @@ namespace Google.GRPC.Core.Tests
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
[Test]
public void UnaryCall()
[TestFixtureSetUp]
public void Init()
{
GrpcEnvironment.Initialize();
}
[TestFixtureTearDown]
public void Cleanup()
{
GrpcEnvironment.Initialize();
}
[Test]
public void UnaryCall()
{
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService")
@ -82,8 +92,6 @@ namespace Google.GRPC.Core.Tests
[Test]
public void UnaryCallPerformance()
{
GrpcEnvironment.Initialize();
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService")
@ -107,16 +115,11 @@ namespace Google.GRPC.Core.Tests
}
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
[Test]
public void UnknownMethodHandler()
{
GrpcEnvironment.Initialize();
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService").Build());
@ -137,8 +140,6 @@ namespace Google.GRPC.Core.Tests
}
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) {

Loading…
Cancel
Save