Merge pull request #21382 from JamesNK/jamesnk/fix-flakey-test

Fix flaky health check test
pull/21388/head
Jan Tattermusch 5 years ago committed by GitHub
commit 642e45a033
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs
  2. 26
      src/csharp/Grpc.HealthCheck.Tests/TestResponseStreamWriter.cs
  3. 7
      src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs

@ -201,18 +201,22 @@ namespace Grpc.HealthCheck.Tests
{
var cts = new CancellationTokenSource();
var context = new TestServerCallContext(cts.Token);
var writer = new TestResponseStreamWriter();
var writer = new TestResponseStreamWriter(started: false);
var impl = new HealthServiceImpl();
var callTask = impl.Watch(new HealthCheckRequest { Service = "" }, writer, context);
// Write new 10 statuses. Only last 5 statuses will be returned when we read them from watch writer
// Write new statuses. Only last statuses will be returned when we read them from watch writer
for (var i = 0; i < HealthServiceImpl.MaxStatusBufferSize * 2; i++)
{
// These statuses aren't "valid" but it is useful for testing to have an incrementing number
impl.SetStatus("", (HealthCheckResponse.Types.ServingStatus)i);
impl.SetStatus("", (HealthCheckResponse.Types.ServingStatus)i + 10);
}
// Start reading responses now that statuses have been queued up
// This is to keep the test non-flakey
writer.Start();
// Read messages in a background task
var statuses = new List<HealthCheckResponse.Types.ServingStatus>();
var readStatusesTask = Task.Run(async () => {
@ -240,11 +244,11 @@ namespace Grpc.HealthCheck.Tests
Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.ServiceUnknown, statuses[0]);
// Last 5 queued messages
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)5, statuses[1]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)6, statuses[2]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)7, statuses[3]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)8, statuses[4]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)9, statuses[5]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)15, statuses[statuses.Count - 5]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)16, statuses[statuses.Count - 4]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)17, statuses[statuses.Count - 3]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)18, statuses[statuses.Count - 2]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)19, statuses[statuses.Count - 1]);
}
#endif

@ -25,24 +25,42 @@ namespace Grpc.HealthCheck.Tests
{
internal class TestResponseStreamWriter : IServerStreamWriter<HealthCheckResponse>
{
private Channel<HealthCheckResponse> _channel;
private readonly Channel<HealthCheckResponse> _channel;
private readonly TaskCompletionSource<object> _startTcs;
public TestResponseStreamWriter(int maxCapacity = 1)
public TestResponseStreamWriter(int maxCapacity = 1, bool started = true)
{
_channel = System.Threading.Channels.Channel.CreateBounded<HealthCheckResponse>(new BoundedChannelOptions(maxCapacity) {
SingleReader = false,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});
if (!started)
{
_startTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
}
}
public ChannelReader<HealthCheckResponse> WrittenMessagesReader => _channel.Reader;
public WriteOptions WriteOptions { get; set; }
public Task WriteAsync(HealthCheckResponse message)
public async Task WriteAsync(HealthCheckResponse message)
{
return _channel.Writer.WriteAsync(message).AsTask();
if (_startTcs != null)
{
await _startTcs.Task;
}
await _channel.Writer.WriteAsync(message);
}
public void Start()
{
if (_startTcs != null)
{
_startTcs.TrySetResult(null);
}
}
public void Complete()

@ -157,9 +157,6 @@ namespace Grpc.HealthCheck
{
string service = request.Service;
HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false);
await responseStream.WriteAsync(response);
// Channel is used to to marshall multiple callers updating status into a single queue.
// This is required because IServerStreamWriter is not thread safe.
//
@ -205,6 +202,10 @@ namespace Grpc.HealthCheck
channel.Writer.Complete();
});
// Send current status immediately
HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false);
await responseStream.WriteAsync(response);
// Read messages. WaitToReadAsync will wait until new messages are available.
// Loop will exit when the call is canceled and the writer is marked as complete.
while (await channel.Reader.WaitToReadAsync())

Loading…
Cancel
Save