From 607307d0beca6b3742ba446390603b42f5a57c19 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 18 Feb 2015 11:05:45 -0800 Subject: [PATCH] Cleanup of AsyncCall.cs --- .../GrpcApiTests/MathClientServerTests.cs | 18 +- src/csharp/GrpcCore/GrpcEnvironment.cs | 2 +- src/csharp/GrpcCore/Internal/AsyncCall.cs | 325 ++++++++++-------- src/csharp/GrpcCore/ServerCallHandler.cs | 11 +- src/csharp/GrpcCoreTests/ClientServerTest.cs | 23 +- 5 files changed, 203 insertions(+), 176 deletions(-) diff --git a/src/csharp/GrpcApiTests/MathClientServerTests.cs b/src/csharp/GrpcApiTests/MathClientServerTests.cs index bd298b0932f..9056142097b 100644 --- a/src/csharp/GrpcApiTests/MathClientServerTests.cs +++ b/src/csharp/GrpcApiTests/MathClientServerTests.cs @@ -64,6 +64,15 @@ namespace math.Tests client = MathGrpc.NewStub(channel); } + [TestFixtureTearDown] + public void Cleanup() + { + channel.Dispose(); + + server.ShutdownAsync().Wait(); + GrpcEnvironment.Shutdown(); + } + [Test] public void Div1() { @@ -136,15 +145,6 @@ namespace math.Tests CollectionAssert.AreEqual(new long[] {3, 4, 3}, result.ConvertAll((divReply) => divReply.Quotient)); CollectionAssert.AreEqual(new long[] {1, 16, 1}, result.ConvertAll((divReply) => divReply.Remainder)); } - - [TestFixtureTearDown] - public void Cleanup() - { - channel.Dispose(); - - server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); - } } } diff --git a/src/csharp/GrpcCore/GrpcEnvironment.cs b/src/csharp/GrpcCore/GrpcEnvironment.cs index c4f030267d2..55a6cac8f69 100644 --- a/src/csharp/GrpcCore/GrpcEnvironment.cs +++ b/src/csharp/GrpcCore/GrpcEnvironment.cs @@ -42,7 +42,7 @@ namespace Google.GRPC.Core /// public class GrpcEnvironment { - const int THREAD_POOL_SIZE = 1; + const int THREAD_POOL_SIZE = 4; [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_init(); diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs index ae7428978ea..ce0ba30d53d 100644 --- a/src/csharp/GrpcCore/Internal/AsyncCall.cs +++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs @@ -42,15 +42,13 @@ using Google.GRPC.Core.Internal; namespace Google.GRPC.Core.Internal { /// - /// Handle native call lifecycle and provides convenience methods. + /// Handles native call lifecycle and provides convenience methods. /// - internal class AsyncCall : IDisposable + internal class AsyncCall { readonly Func serializer; readonly Func deserializer; - // TODO: make sure the delegate doesn't get garbage collected while - // native callbacks are in the completion queue. readonly CompletionCallbackDelegate unaryResponseHandler; readonly CompletionCallbackDelegate finishedHandler; readonly CompletionCallbackDelegate writeFinishedHandler; @@ -59,35 +57,44 @@ namespace Google.GRPC.Core.Internal readonly CompletionCallbackDelegate finishedServersideHandler; object myLock = new object(); - bool disposed; + GCHandle gchandle; CallSafeHandle call; + bool disposed; bool server; + bool started; bool errorOccured; - bool cancelRequested; + bool readingDone; bool halfcloseRequested; bool halfclosed; - bool doneWithReading; - Nullable finishedStatus; + bool finished; + // Completion of a pending write if not null. TaskCompletionSource writeTcs; + + // Completion of a pending read if not null. TaskCompletionSource readTcs; - TaskCompletionSource finishedServersideTcs = new TaskCompletionSource(); - TaskCompletionSource halfcloseTcs = new TaskCompletionSource(); - TaskCompletionSource finishedTcs = new TaskCompletionSource(); + // Completion of a pending halfclose if not null. + TaskCompletionSource halfcloseTcs; + // Completion of a pending unary response if not null. TaskCompletionSource unaryResponseTcs; + // Set after status is received on client. Only used for server streaming and duplex streaming calls. + Nullable finishedStatus; + TaskCompletionSource finishedServersideTcs = new TaskCompletionSource(); + + // For streaming, the reads will be delivered to this observer. IObserver readObserver; public AsyncCall(Func serializer, Func deserializer) { this.serializer = serializer; this.deserializer = deserializer; - this.unaryResponseHandler = HandleUnaryResponseCompletion; + this.unaryResponseHandler = HandleUnaryResponse; this.finishedHandler = HandleFinished; this.writeFinishedHandler = HandleWriteFinished; this.readFinishedHandler = HandleReadFinished; @@ -95,46 +102,23 @@ namespace Google.GRPC.Core.Internal this.finishedServersideHandler = HandleFinishedServerside; } - /// - /// Initiates reading to given observer. - /// - public void StartReadingToStream(IObserver readObserver) { - lock (myLock) - { - CheckStarted(); - if (this.readObserver != null) - { - throw new InvalidOperationException("Already registered an observer."); - } - this.readObserver = readObserver; - ReceiveMessageAsync(); - } - } - - public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) { - lock (myLock) - { - this.call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture); - } + public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) + { + InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false); } public void InitializeServer(CallSafeHandle call) { - lock(myLock) - { - this.call = call; - started = true; - server = true; - } + InitializeInternal(call, true); } - public Task UnaryCallAsync(TWrite msg) { lock (myLock) { started = true; halfcloseRequested = true; + readingDone = true; // TODO: handle serialization error... byte[] payload = serializer(msg); @@ -151,6 +135,7 @@ namespace Google.GRPC.Core.Internal lock (myLock) { started = true; + readingDone = true; unaryResponseTcs = new TaskCompletionSource(); call.StartClientStreaming(unaryResponseHandler); @@ -191,15 +176,43 @@ namespace Google.GRPC.Core.Internal } } - public Task SendMessageAsync(TWrite msg) { + public Task ServerSideUnaryRequestCallAsync() + { lock (myLock) { + started = true; + call.StartServerSide(finishedServersideHandler); + return finishedServersideTcs.Task; + } + } + + public Task ServerSideStreamingRequestCallAsync(IObserver readObserver) + { + lock (myLock) + { + started = true; + call.StartServerSide(finishedServersideHandler); + + if (this.readObserver != null) + { + throw new InvalidOperationException("Already registered an observer."); + } + this.readObserver = readObserver; + ReceiveMessageAsync(); + + return finishedServersideTcs.Task; + } + } + + public Task SendMessageAsync(TWrite msg) + { + lock (myLock) + { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - CheckCancelNotRequested(); - if (halfcloseRequested || halfclosed) + if (halfcloseRequested) { throw new InvalidOperationException("Already halfclosed."); } @@ -222,18 +235,19 @@ namespace Google.GRPC.Core.Internal { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - CheckCancelNotRequested(); - if (halfcloseRequested || halfclosed) + if (halfcloseRequested) { throw new InvalidOperationException("Already halfclosed."); } call.StartSendCloseFromClient(halfclosedHandler); + halfcloseRequested = true; + halfcloseTcs = new TaskCompletionSource(); return halfcloseTcs.Task; } } @@ -242,18 +256,18 @@ namespace Google.GRPC.Core.Internal { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - CheckCancelNotRequested(); - if (halfcloseRequested || halfclosed) + if (halfcloseRequested) { throw new InvalidOperationException("Already halfclosed."); } call.StartSendStatusFromServer(status, halfclosedHandler); halfcloseRequested = true; + halfcloseTcs = new TaskCompletionSource(); return halfcloseTcs.Task; } } @@ -262,13 +276,11 @@ namespace Google.GRPC.Core.Internal { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - // TODO: add check for not cancelled? - - if (doneWithReading) + if (readingDone) { throw new InvalidOperationException("Already read the last message."); } @@ -285,22 +297,12 @@ namespace Google.GRPC.Core.Internal } } - internal Task StartServerSide() - { - lock (myLock) - { - call.StartServerSide(finishedServersideHandler); - return finishedServersideTcs.Task; - } - } - public void Cancel() { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); - cancelRequested = true; } // grpc_call_cancel is threadsafe @@ -311,41 +313,23 @@ namespace Google.GRPC.Core.Internal { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); - cancelRequested = true; } // grpc_call_cancel_with_status is threadsafe call.CancelWithStatus(status); } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (!disposed) - { - if (disposing) - { - if (call != null) - { - call.Dispose(); - } - } - disposed = true; - } - } - private void UpdateErrorOccured(GRPCOpError error) + private void InitializeInternal(CallSafeHandle call, bool server) { - if (error == GRPCOpError.GRPC_OP_ERROR) + lock (myLock) { - errorOccured = true; + // Make sure this object and the delegated held by it will not be garbage collected + // before we release this handle. + gchandle = GCHandle.Alloc(this); + this.call = call; + this.server = server; } } @@ -357,41 +341,46 @@ namespace Google.GRPC.Core.Internal } } - private void CheckNoError() + private void CheckNotDisposed() { - if (errorOccured) + if (disposed) { - throw new InvalidOperationException("Error occured when processing call."); + throw new InvalidOperationException("Call has already been disposed."); } } - private void CheckNotFinished() + private void CheckNoError() { - if (finishedStatus.HasValue) + if (errorOccured) { - throw new InvalidOperationException("Already finished."); + throw new InvalidOperationException("Error occured when processing call."); } } - private void CheckCancelNotRequested() + private bool ReleaseResourcesIfPossible() { - if (cancelRequested) + if (!disposed && call != null) { - throw new InvalidOperationException("Cancel has been requested."); + if (halfclosed && readingDone && finished) + { + ReleaseResources(); + return true; + } } + return false; } - private void DisposeResourcesIfNeeded() + private void ReleaseResources() { - if (call != null && started && finishedStatus.HasValue) - { - // TODO: should we also wait for all the pending events to finish? - + if (call != null) { call.Dispose(); } + gchandle.Free(); + disposed = true; } - private void CompleteStreamObserver(Status status) { + private void CompleteStreamObserver(Status status) + { if (status.StatusCode != StatusCode.GRPC_STATUS_OK) { // TODO: wrap to handle exceptions; @@ -402,20 +391,27 @@ namespace Google.GRPC.Core.Internal } } - private void HandleUnaryResponseCompletion(GRPCOpError error, IntPtr batchContextPtr) { - try { - + /// + /// Handler for unary response completion. + /// + private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr) + { + try + { TaskCompletionSource tcs; - lock(myLock) { + lock(myLock) + { + finished = true; + halfclosed = true; tcs = unaryResponseTcs; - } - // we're done with this call, get rid of the native object. - call.Dispose(); + ReleaseResourcesIfPossible(); + } var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - if (error != GRPCOpError.GRPC_OP_OK) { + if (error != GRPCOpError.GRPC_OP_OK) + { tcs.SetException(new RpcException( new Status(StatusCode.GRPC_STATUS_INTERNAL, "Internal error occured.") )); @@ -423,7 +419,8 @@ namespace Google.GRPC.Core.Internal } var status = ctx.GetReceivedStatus(); - if (status.StatusCode != StatusCode.GRPC_STATUS_OK) { + if (status.StatusCode != StatusCode.GRPC_STATUS_OK) + { tcs.SetException(new RpcException(status)); return; } @@ -431,18 +428,20 @@ namespace Google.GRPC.Core.Internal // TODO: handle deserialize error... var msg = deserializer(ctx.GetReceivedMessage()); tcs.SetResult(msg); - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } - private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) { - try { - + private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { TaskCompletionSource oldTcs = null; lock (myLock) { - UpdateErrorOccured(error); oldTcs = writeTcs; writeTcs = null; } @@ -458,20 +457,25 @@ namespace Google.GRPC.Core.Internal oldTcs.SetResult(null); } - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } - private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) { - try { + private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) + { + try + { lock (myLock) { - UpdateErrorOccured(error); halfclosed = true; + + ReleaseResourcesIfPossible(); } - if (errorOccured) + if (error != GRPCOpError.GRPC_OP_OK) { halfcloseTcs.SetException(new Exception("Halfclose failed")); @@ -480,14 +484,17 @@ namespace Google.GRPC.Core.Internal { halfcloseTcs.SetResult(null); } - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } - private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) { - try { - + private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); var payload = ctx.GetReceivedMessage(); @@ -502,7 +509,7 @@ namespace Google.GRPC.Core.Internal readTcs = null; if (payload == null) { - doneWithReading = true; + readingDone = true; } observer = readObserver; status = finishedStatus; @@ -515,7 +522,8 @@ namespace Google.GRPC.Core.Internal // TODO: make sure we deliver reads in the right order. - if (observer != null) { + if (observer != null) + { if (payload != null) { // TODO: wrap to handle exceptions @@ -526,58 +534,81 @@ namespace Google.GRPC.Core.Internal } else { - if (!server) { - if (status.HasValue) { + if (!server) + { + if (status.HasValue) + { CompleteStreamObserver(status.Value); } - } else { + } + else + { // TODO: wrap to handle exceptions.. observer.OnCompleted(); } // TODO: completeStreamObserver serverside... } } - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } - private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) { - try { + private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); var status = ctx.GetReceivedStatus(); - bool wasDoneWithReading; + bool wasReadingDone; lock (myLock) { + finished = true; finishedStatus = status; - DisposeResourcesIfNeeded(); + wasReadingDone = readingDone; - wasDoneWithReading = doneWithReading; + ReleaseResourcesIfPossible(); } - if (wasDoneWithReading) { + if (wasReadingDone) { CompleteStreamObserver(status); } - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } - private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) { - try { + private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) + { + try + { var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + lock(myLock) + { + finished = true; + + // TODO: because of the way server calls are implemented, we need to set + // reading done to true here. Should be fixed in the future. + readingDone = true; + + ReleaseResourcesIfPossible(); + } // TODO: handle error ... finishedServersideTcs.SetResult(null); - call.Dispose(); - - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs index 3bc3b153964..73dfa52def8 100644 --- a/src/csharp/GrpcCore/ServerCallHandler.cs +++ b/src/csharp/GrpcCore/ServerCallHandler.cs @@ -60,7 +60,7 @@ namespace Google.GRPC.Core asyncCall.InitializeServer(call); - var finishedTask = asyncCall.StartServerSide(); + var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync(); var request = asyncCall.ReceiveMessageAsync().Result; @@ -91,14 +91,9 @@ namespace Google.GRPC.Core asyncCall.InitializeServer(call); - var finishedTask = asyncCall.StartServerSide(); - var responseObserver = new ServerStreamingOutputObserver(asyncCall); var requestObserver = handler(responseObserver); - - // feed the requests - asyncCall.StartReadingToStream(requestObserver); - + var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver); finishedTask.Wait(); } } @@ -114,7 +109,7 @@ namespace Google.GRPC.Core asyncCall.InitializeServer(call); - var finishedTask = asyncCall.StartServerSide(); + var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync(); asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index dd3fc7038e7..37d770e0c04 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -52,11 +52,21 @@ namespace Google.GRPC.Core.Tests Marshallers.StringMarshaller, Marshallers.StringMarshaller); - [Test] - public void UnaryCall() + [TestFixtureSetUp] + public void Init() + { + GrpcEnvironment.Initialize(); + } + + [TestFixtureTearDown] + public void Cleanup() { GrpcEnvironment.Initialize(); + } + [Test] + public void UnaryCall() + { Server server = new Server(); server.AddServiceDefinition( ServerServiceDefinition.CreateBuilder("someService") @@ -82,8 +92,6 @@ namespace Google.GRPC.Core.Tests [Test] public void UnaryCallPerformance() { - GrpcEnvironment.Initialize(); - Server server = new Server(); server.AddServiceDefinition( ServerServiceDefinition.CreateBuilder("someService") @@ -107,16 +115,11 @@ namespace Google.GRPC.Core.Tests } server.ShutdownAsync().Wait(); - - GrpcEnvironment.Shutdown(); } - [Test] public void UnknownMethodHandler() { - GrpcEnvironment.Initialize(); - Server server = new Server(); server.AddServiceDefinition( ServerServiceDefinition.CreateBuilder("someService").Build()); @@ -137,8 +140,6 @@ namespace Google.GRPC.Core.Tests } server.ShutdownAsync().Wait(); - - GrpcEnvironment.Shutdown(); } private void HandleUnaryEchoString(string request, IObserver responseObserver) {