From dbff2e04ba4f38d4035feffcd6f10303d9dfbf26 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 12 Nov 2019 17:37:25 -0800 Subject: [PATCH] C# health checks watch --- .../HealthClientServerTest.cs | 2 +- .../HealthServiceImplTest.cs | 37 ++++- .../Grpc.HealthCheck.Tests/NUnitMain.cs | 2 +- .../TestResponseStreamWriter.cs | 46 ++++++ .../TestServerCallContext.cs | 55 +++++++ .../Grpc.HealthCheck/HealthServiceImpl.cs | 146 +++++++++++++++++- src/csharp/Grpc.sln | 9 +- 7 files changed, 284 insertions(+), 13 deletions(-) create mode 100644 src/csharp/Grpc.HealthCheck.Tests/TestResponseStreamWriter.cs create mode 100644 src/csharp/Grpc.HealthCheck.Tests/TestServerCallContext.cs diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs index 45e8ad90623..371825bc447 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs index 54c77de0628..a1932420d6f 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using Grpc.Core; @@ -83,6 +84,40 @@ namespace Grpc.HealthCheck.Tests Assert.Throws(typeof(ArgumentNullException), () => impl.ClearStatus(null)); } + [Test] + public async Task Watch() + { + var cts = new CancellationTokenSource(); + var context = new TestServerCallContext(cts.Token); + var writer = new TestResponseStreamWriter(); + + var impl = new HealthServiceImpl(); + var callTask = impl.Watch(new HealthCheckRequest { Service = "" }, writer, context); + + var nextWriteTask = writer.WaitNextAsync(); + impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.Serving); + Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.Serving, (await nextWriteTask).Status); + + nextWriteTask = writer.WaitNextAsync(); + impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.NotServing); + Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.NotServing, (await nextWriteTask).Status); + + nextWriteTask = writer.WaitNextAsync(); + impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.Unknown); + Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.Unknown, (await nextWriteTask).Status); + + nextWriteTask = writer.WaitNextAsync(); + impl.SetStatus("grpc.test.TestService", HealthCheckResponse.Types.ServingStatus.Serving); + Assert.IsFalse(nextWriteTask.IsCompleted); + + nextWriteTask = writer.WaitNextAsync(); + impl.ClearStatus(""); + Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.ServiceUnknown, (await nextWriteTask).Status); + + cts.Cancel(); + await callTask; + } + private static HealthCheckResponse.Types.ServingStatus GetStatusHelper(HealthServiceImpl impl, string service) { return impl.Check(new HealthCheckRequest { Service = service }, null).Result.Status; diff --git a/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs b/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs index db6d32a5b25..b7126213889 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs @@ -33,7 +33,7 @@ namespace Grpc.HealthCheck.Tests public static int Main(string[] args) { // Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406. - GrpcEnvironment.SetLogger(new ConsoleLogger()); + //GrpcEnvironment.SetLogger(new ConsoleLogger()); return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args); } } diff --git a/src/csharp/Grpc.HealthCheck.Tests/TestResponseStreamWriter.cs b/src/csharp/Grpc.HealthCheck.Tests/TestResponseStreamWriter.cs new file mode 100644 index 00000000000..52e585393de --- /dev/null +++ b/src/csharp/Grpc.HealthCheck.Tests/TestResponseStreamWriter.cs @@ -0,0 +1,46 @@ +#region Copyright notice and license +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#endregion + +using System.Threading.Tasks; + +using Grpc.Core; +using Grpc.Health.V1; + +namespace Grpc.HealthCheck.Tests +{ + private class TestResponseStreamWriter : IServerStreamWriter + { + private TaskCompletionSource _tcs; + + public WriteOptions WriteOptions { get; set; } + + public Task WaitNextAsync() + { + _tcs = new TaskCompletionSource(); + return _tcs.Task; + } + + public Task WriteAsync(HealthCheckResponse message) + { + if (_tcs != null) + { + _tcs.TrySetResult(message); + } + + return Task.FromResult(null); + } + } +} diff --git a/src/csharp/Grpc.HealthCheck.Tests/TestServerCallContext.cs b/src/csharp/Grpc.HealthCheck.Tests/TestServerCallContext.cs new file mode 100644 index 00000000000..30cd0a14a5d --- /dev/null +++ b/src/csharp/Grpc.HealthCheck.Tests/TestServerCallContext.cs @@ -0,0 +1,55 @@ +#region Copyright notice and license +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#endregion + +using System; +using System.Threading; +using System.Threading.Tasks; + +using Grpc.Core; + +namespace Grpc.HealthCheck.Tests +{ + internal class TestServerCallContext : ServerCallContext + { + private readonly CancellationToken _cancellationToken; + + public TestServerCallContext(CancellationToken cancellationToken) + { + _cancellationToken = cancellationToken; + } + + protected override string MethodCore { get; } + protected override string HostCore { get; } + protected override string PeerCore { get; } + protected override DateTime DeadlineCore { get; } + protected override Metadata RequestHeadersCore { get; } + protected override CancellationToken CancellationTokenCore => _cancellationToken; + protected override Metadata ResponseTrailersCore { get; } + protected override Status StatusCore { get; set; } + protected override WriteOptions WriteOptionsCore { get; set; } + protected override AuthContext AuthContextCore { get; } + + protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options) + { + throw new NotImplementedException(); + } + + protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) + { + throw new NotImplementedException(); + } + } +} diff --git a/src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs b/src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs index 38c4900346e..a67c3ad5589 100644 --- a/src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs +++ b/src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,6 +15,7 @@ #endregion using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; @@ -39,8 +40,10 @@ namespace Grpc.HealthCheck public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase { private readonly object myLock = new object(); - private readonly Dictionary statusMap = + private readonly Dictionary statusMap = new Dictionary(); + private readonly Dictionary>> watchers = + new Dictionary>>(); /// /// Sets the health status for given service. @@ -51,7 +54,13 @@ namespace Grpc.HealthCheck { lock (myLock) { + HealthCheckResponse.Types.ServingStatus previousStatus = GetServiceStatus(service); statusMap[service] = status; + + if (status != previousStatus) + { + NotifyStatus(service, status); + } } } @@ -63,10 +72,16 @@ namespace Grpc.HealthCheck { lock (myLock) { + HealthCheckResponse.Types.ServingStatus previousStatus = GetServiceStatus(service); statusMap.Remove(service); + + if (previousStatus != HealthCheckResponse.Types.ServingStatus.ServiceUnknown) + { + NotifyStatus(service, HealthCheckResponse.Types.ServingStatus.ServiceUnknown); + } } } - + /// /// Clears statuses for all services. /// @@ -74,7 +89,17 @@ namespace Grpc.HealthCheck { lock (myLock) { + List> statuses = statusMap.ToList(); + statusMap.Clear(); + + foreach (KeyValuePair status in statuses) + { + if (status.Value != HealthCheckResponse.Types.ServingStatus.Unknown) + { + NotifyStatus(status.Key, HealthCheckResponse.Types.ServingStatus.Unknown); + } + } } } @@ -86,17 +111,124 @@ namespace Grpc.HealthCheck /// The asynchronous response. public override Task Check(HealthCheckRequest request, ServerCallContext context) { + HealthCheckResponse response = GetHealthCheckResponse(request.Service, throwOnNotFound: true); + + return Task.FromResult(response); + } + + /// + /// Performs a watch for the serving status of the requested service. + /// The server will immediately send back a message indicating the current + /// serving status. It will then subsequently send a new message whenever + /// the service's serving status changes. + /// + /// If the requested service is unknown when the call is received, the + /// server will send a message setting the serving status to + /// SERVICE_UNKNOWN but will *not* terminate the call. If at some + /// future point, the serving status of the service becomes known, the + /// server will send a new message with the service's serving status. + /// + /// If the call terminates with status UNIMPLEMENTED, then clients + /// should assume this method is not supported and should not retry the + /// call. If the call terminates with any other status (including OK), + /// clients should retry the call with appropriate exponential backoff. + /// + /// The request received from the client. + /// Used for sending responses back to the client. + /// The context of the server-side call handler being invoked. + /// A task indicating completion of the handler. + public override async Task Watch(HealthCheckRequest request, IServerStreamWriter responseStream, ServerCallContext context) + { + string service = request.Service; + TaskCompletionSource watchTcs = new TaskCompletionSource(); + + HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false); + await responseStream.WriteAsync(response); + lock (myLock) { - var service = request.Service; + if (!watchers.TryGetValue(service, out List> serverStreamWriters)) + { + serverStreamWriters = new List>(); + watchers.Add(service, serverStreamWriters); + } + + serverStreamWriters.Add(responseStream); + } + + // Handle the Watch call being canceled + context.CancellationToken.Register(() => { + lock (myLock) + { + if (watchers.TryGetValue(service, out List> serverStreamWriters)) + { + // Remove the response stream from the watchers + if (serverStreamWriters.Remove(responseStream)) + { + // Remove empty collection if service has no more response streams + if (serverStreamWriters.Count == 0) + { + watchers.Remove(service); + } + } + } + } + + // Allow watch method to exit. + watchTcs.TrySetResult(null); + }); + // Wait for call to be cancelled before exiting. + await watchTcs.Task; + } + + private HealthCheckResponse GetHealthCheckResponse(string service, bool throwOnNotFound) + { + HealthCheckResponse response = null; + lock (myLock) + { HealthCheckResponse.Types.ServingStatus status; if (!statusMap.TryGetValue(service, out status)) { - // TODO(jtattermusch): returning specific status from server handler is not supported yet. - throw new RpcException(new Status(StatusCode.NotFound, "")); + if (throwOnNotFound) + { + // TODO(jtattermusch): returning specific status from server handler is not supported yet. + throw new RpcException(new Status(StatusCode.NotFound, "")); + } + else + { + status = HealthCheckResponse.Types.ServingStatus.ServiceUnknown; + } + } + response = new HealthCheckResponse { Status = status }; + } + + return response; + } + + private HealthCheckResponse.Types.ServingStatus GetServiceStatus(string service) + { + if (statusMap.TryGetValue(service, out HealthCheckResponse.Types.ServingStatus s)) + { + return s; + } + else + { + return HealthCheckResponse.Types.ServingStatus.ServiceUnknown; + } + } + + private void NotifyStatus(string service, HealthCheckResponse.Types.ServingStatus status) + { + if (watchers.TryGetValue(service, out List> serverStreamWriters)) + { + HealthCheckResponse response = new HealthCheckResponse { Status = status }; + + foreach (IServerStreamWriter serverStreamWriter in serverStreamWriters) + { + // TODO(JamesNK): This will fail if a pending write is already in progress. + _ = serverStreamWriter.WriteAsync(response); } - return Task.FromResult(new HealthCheckResponse { Status = status }); } } } diff --git a/src/csharp/Grpc.sln b/src/csharp/Grpc.sln index 25030cc110e..19f3605d8f6 100644 --- a/src/csharp/Grpc.sln +++ b/src/csharp/Grpc.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 15 -VisualStudioVersion = 15.0.26430.4 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29505.145 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Core.Api", "Grpc.Core.Api\Grpc.Core.Api.csproj", "{63FCEA50-1505-11E9-B56E-0800200C9A66}" EndProject @@ -51,7 +51,7 @@ Global Release|Any CPU = Release|Any CPU EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution - {63FCEA50-1505-11E9-B56E-0800200C9A66}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {63FCEA50-1505-11E9-B56E-0800200C9A66}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {63FCEA50-1505-11E9-B56E-0800200C9A66}.Debug|Any CPU.Build.0 = Debug|Any CPU {63FCEA50-1505-11E9-B56E-0800200C9A66}.Release|Any CPU.ActiveCfg = Release|Any CPU {63FCEA50-1505-11E9-B56E-0800200C9A66}.Release|Any CPU.Build.0 = Release|Any CPU @@ -139,4 +139,7 @@ Global GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {BF5C0B7B-764F-4668-A052-A12BCCDA7304} + EndGlobalSection EndGlobal