Add watchers lock

pull/21120/head
James Newton-King 6 years ago
parent 48ba78a7de
commit dbc0bcc91d
No known key found for this signature in database
GPG Key ID: A66B2F456BF5526
  1. 67
      src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs

@ -39,7 +39,9 @@ namespace Grpc.HealthCheck
/// </summary> /// </summary>
public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase
{ {
private readonly object myLock = new object(); private readonly object statusLock = new object();
private readonly object watchersLock = new object();
private readonly Dictionary<string, HealthCheckResponse.Types.ServingStatus> statusMap = private readonly Dictionary<string, HealthCheckResponse.Types.ServingStatus> statusMap =
new Dictionary<string, HealthCheckResponse.Types.ServingStatus>(); new Dictionary<string, HealthCheckResponse.Types.ServingStatus>();
private readonly Dictionary<string, List<IServerStreamWriter<HealthCheckResponse>>> watchers = private readonly Dictionary<string, List<IServerStreamWriter<HealthCheckResponse>>> watchers =
@ -52,15 +54,16 @@ namespace Grpc.HealthCheck
/// <param name="status">the health status</param> /// <param name="status">the health status</param>
public void SetStatus(string service, HealthCheckResponse.Types.ServingStatus status) public void SetStatus(string service, HealthCheckResponse.Types.ServingStatus status)
{ {
lock (myLock) HealthCheckResponse.Types.ServingStatus previousStatus;
lock (statusLock)
{ {
HealthCheckResponse.Types.ServingStatus previousStatus = GetServiceStatus(service); previousStatus = GetServiceStatus(service);
statusMap[service] = status; statusMap[service] = status;
}
if (status != previousStatus) if (status != previousStatus)
{ {
NotifyStatus(service, status); NotifyStatus(service, status);
}
} }
} }
@ -70,15 +73,16 @@ namespace Grpc.HealthCheck
/// <param name="service">The service. Cannot be null.</param> /// <param name="service">The service. Cannot be null.</param>
public void ClearStatus(string service) public void ClearStatus(string service)
{ {
lock (myLock) HealthCheckResponse.Types.ServingStatus previousStatus;
lock (statusLock)
{ {
HealthCheckResponse.Types.ServingStatus previousStatus = GetServiceStatus(service); previousStatus = GetServiceStatus(service);
statusMap.Remove(service); statusMap.Remove(service);
}
if (previousStatus != HealthCheckResponse.Types.ServingStatus.ServiceUnknown) if (previousStatus != HealthCheckResponse.Types.ServingStatus.ServiceUnknown)
{ {
NotifyStatus(service, HealthCheckResponse.Types.ServingStatus.ServiceUnknown); NotifyStatus(service, HealthCheckResponse.Types.ServingStatus.ServiceUnknown);
}
} }
} }
@ -87,18 +91,18 @@ namespace Grpc.HealthCheck
/// </summary> /// </summary>
public void ClearAll() public void ClearAll()
{ {
lock (myLock) List<KeyValuePair<string, HealthCheckResponse.Types.ServingStatus>> statuses;
lock (statusLock)
{ {
List<KeyValuePair<string, HealthCheckResponse.Types.ServingStatus>> statuses = statusMap.ToList(); statuses = statusMap.ToList();
statusMap.Clear(); statusMap.Clear();
}
foreach (KeyValuePair<string, HealthCheckResponse.Types.ServingStatus> status in statuses) foreach (KeyValuePair<string, HealthCheckResponse.Types.ServingStatus> status in statuses)
{
if (status.Value != HealthCheckResponse.Types.ServingStatus.ServiceUnknown)
{ {
if (status.Value != HealthCheckResponse.Types.ServingStatus.Unknown) NotifyStatus(status.Key, HealthCheckResponse.Types.ServingStatus.ServiceUnknown);
{
NotifyStatus(status.Key, HealthCheckResponse.Types.ServingStatus.Unknown);
}
} }
} }
} }
@ -145,7 +149,7 @@ namespace Grpc.HealthCheck
HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false); HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false);
await responseStream.WriteAsync(response); await responseStream.WriteAsync(response);
lock (myLock) lock (watchersLock)
{ {
if (!watchers.TryGetValue(service, out List<IServerStreamWriter<HealthCheckResponse>> serverStreamWriters)) if (!watchers.TryGetValue(service, out List<IServerStreamWriter<HealthCheckResponse>> serverStreamWriters))
{ {
@ -158,7 +162,7 @@ namespace Grpc.HealthCheck
// Handle the Watch call being canceled // Handle the Watch call being canceled
context.CancellationToken.Register(() => { context.CancellationToken.Register(() => {
lock (myLock) lock (watchersLock)
{ {
if (watchers.TryGetValue(service, out List<IServerStreamWriter<HealthCheckResponse>> serverStreamWriters)) if (watchers.TryGetValue(service, out List<IServerStreamWriter<HealthCheckResponse>> serverStreamWriters))
{ {
@ -185,7 +189,7 @@ namespace Grpc.HealthCheck
private HealthCheckResponse GetHealthCheckResponse(string service, bool throwOnNotFound) private HealthCheckResponse GetHealthCheckResponse(string service, bool throwOnNotFound)
{ {
HealthCheckResponse response = null; HealthCheckResponse response = null;
lock (myLock) lock (statusLock)
{ {
HealthCheckResponse.Types.ServingStatus status; HealthCheckResponse.Types.ServingStatus status;
if (!statusMap.TryGetValue(service, out status)) if (!statusMap.TryGetValue(service, out status))
@ -220,14 +224,17 @@ namespace Grpc.HealthCheck
private void NotifyStatus(string service, HealthCheckResponse.Types.ServingStatus status) private void NotifyStatus(string service, HealthCheckResponse.Types.ServingStatus status)
{ {
if (watchers.TryGetValue(service, out List<IServerStreamWriter<HealthCheckResponse>> serverStreamWriters)) lock (watchersLock)
{ {
HealthCheckResponse response = new HealthCheckResponse { Status = status }; if (watchers.TryGetValue(service, out List<IServerStreamWriter<HealthCheckResponse>> serverStreamWriters))
foreach (IServerStreamWriter<HealthCheckResponse> serverStreamWriter in serverStreamWriters)
{ {
// TODO(JamesNK): This will fail if a pending write is already in progress. HealthCheckResponse response = new HealthCheckResponse { Status = status };
_ = serverStreamWriter.WriteAsync(response);
foreach (IServerStreamWriter<HealthCheckResponse> serverStreamWriter in serverStreamWriters)
{
// TODO(JamesNK): This will fail if a pending write is already in progress.
_ = serverStreamWriter.WriteAsync(response);
}
} }
} }
} }

Loading…
Cancel
Save