From 87bb6ed2bfb5b79cff6e91c961b80c7b0b101f3e Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 14 Nov 2019 11:06:18 +0100 Subject: [PATCH 1/4] add UnobservedTaskExceptionTest --- .../UnobservedTaskExceptionTest.cs | 99 +++++++++++++++++++ src/csharp/tests.json | 3 +- 2 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 src/csharp/Grpc.IntegrationTesting/UnobservedTaskExceptionTest.cs diff --git a/src/csharp/Grpc.IntegrationTesting/UnobservedTaskExceptionTest.cs b/src/csharp/Grpc.IntegrationTesting/UnobservedTaskExceptionTest.cs new file mode 100644 index 00000000000..a656b26dae5 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/UnobservedTaskExceptionTest.cs @@ -0,0 +1,99 @@ +#region Copyright notice and license + +// Copyright 2019 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Utils; +using Grpc.Testing; +using NUnit.Framework; + +namespace Grpc.IntegrationTesting +{ + /// + /// Runs interop tests in-process. + /// + public class UnobservedTaskExceptionTest + { + const string Host = "localhost"; + Server server; + Channel channel; + TestService.TestServiceClient client; + + [OneTimeSetUp] + public void Init() + { + // Disable SO_REUSEPORT to prevent https://github.com/grpc/grpc/issues/10755 + server = new Server(new[] { new ChannelOption(ChannelOptions.SoReuseport, 0) }) + { + Services = { TestService.BindService(new TestServiceImpl()) }, + Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } } + }; + server.Start(); + + int port = server.Ports.Single().BoundPort; + channel = new Channel(Host, port, ChannelCredentials.Insecure); + client = new TestService.TestServiceClient(channel); + } + + [OneTimeTearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + + [Test] + public async Task NoUnobservedTaskExceptionForAbandonedStreamingResponse() + { + // Verify that https://github.com/grpc/grpc/issues/17458 has been fixed. + // Create a streaming response call, then cancel it without reading all the responses + // and check that no unobserved task exceptions have been thrown. + + int unobservedTaskExceptionCounter = 0; + + TaskScheduler.UnobservedTaskException += (sender, e) => { + unobservedTaskExceptionCounter++; + Console.WriteLine("Detected unobserved task exception: " + e.Exception); + }; + + var bodySizes = new List { 10, 10, 10, 10, 10 }; + var request = new StreamingOutputCallRequest { + ResponseParameters = { bodySizes.Select((size) => new ResponseParameters { Size = size }) } + }; + + for (int i = 0; i < 50; i++) + { + Console.WriteLine($"Starting iteration {i}"); + using (var call = client.StreamingOutputCall(request)) + { + // Intentionally only read the first response (we know there's more) + // The call will be cancelled as soon as we leave the "using" statement. + var firstResponse = await call.ResponseStream.MoveNext(); + } + // Make it more likely to trigger the "Unobserved task exception" warning + GC.Collect(); + } + + Assert.AreEqual(0, unobservedTaskExceptionCounter); + } + } +} diff --git a/src/csharp/tests.json b/src/csharp/tests.json index e18266ab9d7..5b9ca6dbd14 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -68,7 +68,8 @@ "Grpc.IntegrationTesting.InteropClientServerTest", "Grpc.IntegrationTesting.MetadataCredentialsTest", "Grpc.IntegrationTesting.RunnerClientServerTest", - "Grpc.IntegrationTesting.SslCredentialsTest" + "Grpc.IntegrationTesting.SslCredentialsTest", + "Grpc.IntegrationTesting.UnobservedTaskExceptionTest" ], "Grpc.Reflection.Tests": [ "Grpc.Reflection.Tests.ReflectionClientServerTest", From d34f7f3df5837babe78d4f8e2ab51d3b4f4c36a7 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 14 Nov 2019 11:20:43 +0100 Subject: [PATCH 2/4] fix unobserved task exception problem for non-exhausted response streams --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 830a1f4edc6..899af94fb6c 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -626,6 +626,14 @@ namespace Grpc.Core.Internal if (status.StatusCode != StatusCode.OK) { streamingResponseCallFinishedTcs.SetException(new RpcException(status, receivedStatus.Trailers)); + if (status.StatusCode == StatusCode.Cancelled) + { + // Make sure the exception set to the Task is observed, + // otherwise this can trigger "Unobserved exception" when the response stream + // is not read until its end and the task created by the TCS is garbage collected. + // See https://github.com/grpc/grpc/issues/17458 + var _ = streamingResponseCallFinishedTcs.Task.Exception; + } return; } From 7eeb1e564d537538cb55e5fa316e46aecf82f088 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 26 Nov 2019 21:41:12 +0100 Subject: [PATCH 3/4] address review comments --- .../Grpc.IntegrationTesting/UnobservedTaskExceptionTest.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/csharp/Grpc.IntegrationTesting/UnobservedTaskExceptionTest.cs b/src/csharp/Grpc.IntegrationTesting/UnobservedTaskExceptionTest.cs index a656b26dae5..db2c6c37f3f 100644 --- a/src/csharp/Grpc.IntegrationTesting/UnobservedTaskExceptionTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/UnobservedTaskExceptionTest.cs @@ -22,6 +22,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using Grpc.Core; +using Grpc.Core.Internal; using Grpc.Core.Utils; using Grpc.Testing; using NUnit.Framework; @@ -68,10 +69,10 @@ namespace Grpc.IntegrationTesting // Create a streaming response call, then cancel it without reading all the responses // and check that no unobserved task exceptions have been thrown. - int unobservedTaskExceptionCounter = 0; + var unobservedTaskExceptionCounter = new AtomicCounter(); TaskScheduler.UnobservedTaskException += (sender, e) => { - unobservedTaskExceptionCounter++; + unobservedTaskExceptionCounter.Increment(); Console.WriteLine("Detected unobserved task exception: " + e.Exception); }; @@ -93,7 +94,7 @@ namespace Grpc.IntegrationTesting GC.Collect(); } - Assert.AreEqual(0, unobservedTaskExceptionCounter); + Assert.AreEqual(0, unobservedTaskExceptionCounter.Count); } } } From 06b420d3148fc73bc5190072b48b6d986ed39ebd Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 27 Nov 2019 11:55:01 +0100 Subject: [PATCH 4/4] consider cancelRequested flag too --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 899af94fb6c..6a7466ac278 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -599,6 +599,7 @@ namespace Grpc.Core.Internal TaskCompletionSource delayedStreamingWriteTcs = null; bool releasedResources; + bool origCancelRequested; lock (myLock) { finished = true; @@ -610,6 +611,7 @@ namespace Grpc.Core.Internal } releasedResources = ReleaseResourcesIfPossible(); + origCancelRequested = cancelRequested; } if (releasedResources) @@ -626,7 +628,7 @@ namespace Grpc.Core.Internal if (status.StatusCode != StatusCode.OK) { streamingResponseCallFinishedTcs.SetException(new RpcException(status, receivedStatus.Trailers)); - if (status.StatusCode == StatusCode.Cancelled) + if (status.StatusCode == StatusCode.Cancelled || origCancelRequested) { // Make sure the exception set to the Task is observed, // otherwise this can trigger "Unobserved exception" when the response stream