C# health checks watch

pull/21120/head
James Newton-King 5 years ago
parent 2d80e830c0
commit dbff2e04ba
No known key found for this signature in database
GPG Key ID: A66B2F456BF5526
  1. 2
      src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
  2. 37
      src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs
  3. 2
      src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs
  4. 46
      src/csharp/Grpc.HealthCheck.Tests/TestResponseStreamWriter.cs
  5. 55
      src/csharp/Grpc.HealthCheck.Tests/TestServerCallContext.cs
  6. 146
      src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs
  7. 9
      src/csharp/Grpc.sln

@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");

@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
@ -18,6 +18,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
@ -83,6 +84,40 @@ namespace Grpc.HealthCheck.Tests
Assert.Throws(typeof(ArgumentNullException), () => impl.ClearStatus(null));
}
[Test]
public async Task Watch()
{
var cts = new CancellationTokenSource();
var context = new TestServerCallContext(cts.Token);
var writer = new TestResponseStreamWriter();
var impl = new HealthServiceImpl();
var callTask = impl.Watch(new HealthCheckRequest { Service = "" }, writer, context);
var nextWriteTask = writer.WaitNextAsync();
impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.Serving);
Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.Serving, (await nextWriteTask).Status);
nextWriteTask = writer.WaitNextAsync();
impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.NotServing);
Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.NotServing, (await nextWriteTask).Status);
nextWriteTask = writer.WaitNextAsync();
impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.Unknown);
Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.Unknown, (await nextWriteTask).Status);
nextWriteTask = writer.WaitNextAsync();
impl.SetStatus("grpc.test.TestService", HealthCheckResponse.Types.ServingStatus.Serving);
Assert.IsFalse(nextWriteTask.IsCompleted);
nextWriteTask = writer.WaitNextAsync();
impl.ClearStatus("");
Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.ServiceUnknown, (await nextWriteTask).Status);
cts.Cancel();
await callTask;
}
private static HealthCheckResponse.Types.ServingStatus GetStatusHelper(HealthServiceImpl impl, string service)
{
return impl.Check(new HealthCheckRequest { Service = service }, null).Result.Status;

@ -33,7 +33,7 @@ namespace Grpc.HealthCheck.Tests
public static int Main(string[] args)
{
// Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406.
GrpcEnvironment.SetLogger(new ConsoleLogger());
//GrpcEnvironment.SetLogger(new ConsoleLogger());
return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args);
}
}

@ -0,0 +1,46 @@
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Health.V1;
namespace Grpc.HealthCheck.Tests
{
private class TestResponseStreamWriter : IServerStreamWriter<HealthCheckResponse>
{
private TaskCompletionSource<HealthCheckResponse> _tcs;
public WriteOptions WriteOptions { get; set; }
public Task<HealthCheckResponse> WaitNextAsync()
{
_tcs = new TaskCompletionSource<HealthCheckResponse>();
return _tcs.Task;
}
public Task WriteAsync(HealthCheckResponse message)
{
if (_tcs != null)
{
_tcs.TrySetResult(message);
}
return Task.FromResult<object>(null);
}
}
}

@ -0,0 +1,55 @@
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
namespace Grpc.HealthCheck.Tests
{
internal class TestServerCallContext : ServerCallContext
{
private readonly CancellationToken _cancellationToken;
public TestServerCallContext(CancellationToken cancellationToken)
{
_cancellationToken = cancellationToken;
}
protected override string MethodCore { get; }
protected override string HostCore { get; }
protected override string PeerCore { get; }
protected override DateTime DeadlineCore { get; }
protected override Metadata RequestHeadersCore { get; }
protected override CancellationToken CancellationTokenCore => _cancellationToken;
protected override Metadata ResponseTrailersCore { get; }
protected override Status StatusCore { get; set; }
protected override WriteOptions WriteOptionsCore { get; set; }
protected override AuthContext AuthContextCore { get; }
protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options)
{
throw new NotImplementedException();
}
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
throw new NotImplementedException();
}
}
}

@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
@ -15,6 +15,7 @@
#endregion
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@ -39,8 +40,10 @@ namespace Grpc.HealthCheck
public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase
{
private readonly object myLock = new object();
private readonly Dictionary<string, HealthCheckResponse.Types.ServingStatus> statusMap =
private readonly Dictionary<string, HealthCheckResponse.Types.ServingStatus> statusMap =
new Dictionary<string, HealthCheckResponse.Types.ServingStatus>();
private readonly Dictionary<string, List<IServerStreamWriter<HealthCheckResponse>>> watchers =
new Dictionary<string, List<IServerStreamWriter<HealthCheckResponse>>>();
/// <summary>
/// Sets the health status for given service.
@ -51,7 +54,13 @@ namespace Grpc.HealthCheck
{
lock (myLock)
{
HealthCheckResponse.Types.ServingStatus previousStatus = GetServiceStatus(service);
statusMap[service] = status;
if (status != previousStatus)
{
NotifyStatus(service, status);
}
}
}
@ -63,10 +72,16 @@ namespace Grpc.HealthCheck
{
lock (myLock)
{
HealthCheckResponse.Types.ServingStatus previousStatus = GetServiceStatus(service);
statusMap.Remove(service);
if (previousStatus != HealthCheckResponse.Types.ServingStatus.ServiceUnknown)
{
NotifyStatus(service, HealthCheckResponse.Types.ServingStatus.ServiceUnknown);
}
}
}
/// <summary>
/// Clears statuses for all services.
/// </summary>
@ -74,7 +89,17 @@ namespace Grpc.HealthCheck
{
lock (myLock)
{
List<KeyValuePair<string, HealthCheckResponse.Types.ServingStatus>> statuses = statusMap.ToList();
statusMap.Clear();
foreach (KeyValuePair<string, HealthCheckResponse.Types.ServingStatus> status in statuses)
{
if (status.Value != HealthCheckResponse.Types.ServingStatus.Unknown)
{
NotifyStatus(status.Key, HealthCheckResponse.Types.ServingStatus.Unknown);
}
}
}
}
@ -86,17 +111,124 @@ namespace Grpc.HealthCheck
/// <returns>The asynchronous response.</returns>
public override Task<HealthCheckResponse> Check(HealthCheckRequest request, ServerCallContext context)
{
HealthCheckResponse response = GetHealthCheckResponse(request.Service, throwOnNotFound: true);
return Task.FromResult(response);
}
/// <summary>
/// Performs a watch for the serving status of the requested service.
/// The server will immediately send back a message indicating the current
/// serving status. It will then subsequently send a new message whenever
/// the service's serving status changes.
///
/// If the requested service is unknown when the call is received, the
/// server will send a message setting the serving status to
/// SERVICE_UNKNOWN but will *not* terminate the call. If at some
/// future point, the serving status of the service becomes known, the
/// server will send a new message with the service's serving status.
///
/// If the call terminates with status UNIMPLEMENTED, then clients
/// should assume this method is not supported and should not retry the
/// call. If the call terminates with any other status (including OK),
/// clients should retry the call with appropriate exponential backoff.
/// </summary>
/// <param name="request">The request received from the client.</param>
/// <param name="responseStream">Used for sending responses back to the client.</param>
/// <param name="context">The context of the server-side call handler being invoked.</param>
/// <returns>A task indicating completion of the handler.</returns>
public override async Task Watch(HealthCheckRequest request, IServerStreamWriter<HealthCheckResponse> responseStream, ServerCallContext context)
{
string service = request.Service;
TaskCompletionSource<object> watchTcs = new TaskCompletionSource<object>();
HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false);
await responseStream.WriteAsync(response);
lock (myLock)
{
var service = request.Service;
if (!watchers.TryGetValue(service, out List<IServerStreamWriter<HealthCheckResponse>> serverStreamWriters))
{
serverStreamWriters = new List<IServerStreamWriter<HealthCheckResponse>>();
watchers.Add(service, serverStreamWriters);
}
serverStreamWriters.Add(responseStream);
}
// Handle the Watch call being canceled
context.CancellationToken.Register(() => {
lock (myLock)
{
if (watchers.TryGetValue(service, out List<IServerStreamWriter<HealthCheckResponse>> serverStreamWriters))
{
// Remove the response stream from the watchers
if (serverStreamWriters.Remove(responseStream))
{
// Remove empty collection if service has no more response streams
if (serverStreamWriters.Count == 0)
{
watchers.Remove(service);
}
}
}
}
// Allow watch method to exit.
watchTcs.TrySetResult(null);
});
// Wait for call to be cancelled before exiting.
await watchTcs.Task;
}
private HealthCheckResponse GetHealthCheckResponse(string service, bool throwOnNotFound)
{
HealthCheckResponse response = null;
lock (myLock)
{
HealthCheckResponse.Types.ServingStatus status;
if (!statusMap.TryGetValue(service, out status))
{
// TODO(jtattermusch): returning specific status from server handler is not supported yet.
throw new RpcException(new Status(StatusCode.NotFound, ""));
if (throwOnNotFound)
{
// TODO(jtattermusch): returning specific status from server handler is not supported yet.
throw new RpcException(new Status(StatusCode.NotFound, ""));
}
else
{
status = HealthCheckResponse.Types.ServingStatus.ServiceUnknown;
}
}
response = new HealthCheckResponse { Status = status };
}
return response;
}
private HealthCheckResponse.Types.ServingStatus GetServiceStatus(string service)
{
if (statusMap.TryGetValue(service, out HealthCheckResponse.Types.ServingStatus s))
{
return s;
}
else
{
return HealthCheckResponse.Types.ServingStatus.ServiceUnknown;
}
}
private void NotifyStatus(string service, HealthCheckResponse.Types.ServingStatus status)
{
if (watchers.TryGetValue(service, out List<IServerStreamWriter<HealthCheckResponse>> serverStreamWriters))
{
HealthCheckResponse response = new HealthCheckResponse { Status = status };
foreach (IServerStreamWriter<HealthCheckResponse> serverStreamWriter in serverStreamWriters)
{
// TODO(JamesNK): This will fail if a pending write is already in progress.
_ = serverStreamWriter.WriteAsync(response);
}
return Task.FromResult(new HealthCheckResponse { Status = status });
}
}
}

@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26430.4
# Visual Studio Version 16
VisualStudioVersion = 16.0.29505.145
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Core.Api", "Grpc.Core.Api\Grpc.Core.Api.csproj", "{63FCEA50-1505-11E9-B56E-0800200C9A66}"
EndProject
@ -51,7 +51,7 @@ Global
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{63FCEA50-1505-11E9-B56E-0800200C9A66}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{63FCEA50-1505-11E9-B56E-0800200C9A66}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{63FCEA50-1505-11E9-B56E-0800200C9A66}.Debug|Any CPU.Build.0 = Debug|Any CPU
{63FCEA50-1505-11E9-B56E-0800200C9A66}.Release|Any CPU.ActiveCfg = Release|Any CPU
{63FCEA50-1505-11E9-B56E-0800200C9A66}.Release|Any CPU.Build.0 = Release|Any CPU
@ -139,4 +139,7 @@ Global
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {BF5C0B7B-764F-4668-A052-A12BCCDA7304}
EndGlobalSection
EndGlobal

Loading…
Cancel
Save