Fix flaky health check test

pull/21382/head
James Newton-King 6 years ago
parent 683986ae6c
commit 9cedb80c6f
No known key found for this signature in database
GPG Key ID: A66B2F456BF5526
  1. 20
      src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs
  2. 26
      src/csharp/Grpc.HealthCheck.Tests/TestResponseStreamWriter.cs
  3. 7
      src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs

@ -201,18 +201,22 @@ namespace Grpc.HealthCheck.Tests
{ {
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var context = new TestServerCallContext(cts.Token); var context = new TestServerCallContext(cts.Token);
var writer = new TestResponseStreamWriter(); var writer = new TestResponseStreamWriter(started: false);
var impl = new HealthServiceImpl(); var impl = new HealthServiceImpl();
var callTask = impl.Watch(new HealthCheckRequest { Service = "" }, writer, context); var callTask = impl.Watch(new HealthCheckRequest { Service = "" }, writer, context);
// Write new 10 statuses. Only last 5 statuses will be returned when we read them from watch writer // Write new statuses. Only last statuses will be returned when we read them from watch writer
for (var i = 0; i < HealthServiceImpl.MaxStatusBufferSize * 2; i++) for (var i = 0; i < HealthServiceImpl.MaxStatusBufferSize * 2; i++)
{ {
// These statuses aren't "valid" but it is useful for testing to have an incrementing number // These statuses aren't "valid" but it is useful for testing to have an incrementing number
impl.SetStatus("", (HealthCheckResponse.Types.ServingStatus)i); impl.SetStatus("", (HealthCheckResponse.Types.ServingStatus)i + 10);
} }
// Start reading responses now that statuses have been queued up
// This is to keep the test non-flakey
writer.Start();
// Read messages in a background task // Read messages in a background task
var statuses = new List<HealthCheckResponse.Types.ServingStatus>(); var statuses = new List<HealthCheckResponse.Types.ServingStatus>();
var readStatusesTask = Task.Run(async () => { var readStatusesTask = Task.Run(async () => {
@ -240,11 +244,11 @@ namespace Grpc.HealthCheck.Tests
Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.ServiceUnknown, statuses[0]); Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.ServiceUnknown, statuses[0]);
// Last 5 queued messages // Last 5 queued messages
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)5, statuses[1]); Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)15, statuses[statuses.Count - 5]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)6, statuses[2]); Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)16, statuses[statuses.Count - 4]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)7, statuses[3]); Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)17, statuses[statuses.Count - 3]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)8, statuses[4]); Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)18, statuses[statuses.Count - 2]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)9, statuses[5]); Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)19, statuses[statuses.Count - 1]);
} }
#endif #endif

@ -25,24 +25,42 @@ namespace Grpc.HealthCheck.Tests
{ {
internal class TestResponseStreamWriter : IServerStreamWriter<HealthCheckResponse> internal class TestResponseStreamWriter : IServerStreamWriter<HealthCheckResponse>
{ {
private Channel<HealthCheckResponse> _channel; private readonly Channel<HealthCheckResponse> _channel;
private readonly TaskCompletionSource<object> _startTcs;
public TestResponseStreamWriter(int maxCapacity = 1) public TestResponseStreamWriter(int maxCapacity = 1, bool started = true)
{ {
_channel = System.Threading.Channels.Channel.CreateBounded<HealthCheckResponse>(new BoundedChannelOptions(maxCapacity) { _channel = System.Threading.Channels.Channel.CreateBounded<HealthCheckResponse>(new BoundedChannelOptions(maxCapacity) {
SingleReader = false, SingleReader = false,
SingleWriter = true, SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait FullMode = BoundedChannelFullMode.Wait
}); });
if (!started)
{
_startTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
}
} }
public ChannelReader<HealthCheckResponse> WrittenMessagesReader => _channel.Reader; public ChannelReader<HealthCheckResponse> WrittenMessagesReader => _channel.Reader;
public WriteOptions WriteOptions { get; set; } public WriteOptions WriteOptions { get; set; }
public Task WriteAsync(HealthCheckResponse message) public async Task WriteAsync(HealthCheckResponse message)
{ {
return _channel.Writer.WriteAsync(message).AsTask(); if (_startTcs != null)
{
await _startTcs.Task;
}
await _channel.Writer.WriteAsync(message);
}
public void Start()
{
if (_startTcs != null)
{
_startTcs.TrySetResult(null);
}
} }
public void Complete() public void Complete()

@ -157,9 +157,6 @@ namespace Grpc.HealthCheck
{ {
string service = request.Service; string service = request.Service;
HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false);
await responseStream.WriteAsync(response);
// Channel is used to to marshall multiple callers updating status into a single queue. // Channel is used to to marshall multiple callers updating status into a single queue.
// This is required because IServerStreamWriter is not thread safe. // This is required because IServerStreamWriter is not thread safe.
// //
@ -205,6 +202,10 @@ namespace Grpc.HealthCheck
channel.Writer.Complete(); channel.Writer.Complete();
}); });
// Send current status immediately
HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false);
await responseStream.WriteAsync(response);
// Read messages. WaitToReadAsync will wait until new messages are available. // Read messages. WaitToReadAsync will wait until new messages are available.
// Loop will exit when the call is canceled and the writer is marked as complete. // Loop will exit when the call is canceled and the writer is marked as complete.
while (await channel.Reader.WaitToReadAsync()) while (await channel.Reader.WaitToReadAsync())

Loading…
Cancel
Save