From a2495502df6133bdd939be6ef83ce417249e7ec3 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 20 Feb 2019 15:54:11 -0800 Subject: [PATCH 1/5] add enter_graceful_shutdown() to health service --- .../grpc_health/v1/health.py | 17 +++++++++++++ .../health_check/_health_servicer_test.py | 24 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py index b08297a5d71..04ea3b4ecfb 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/health.py +++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py @@ -82,6 +82,7 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): self._send_response_callbacks = {} self.Watch.__func__.experimental_non_blocking = experimental_non_blocking self.Watch.__func__.experimental_thread_pool = experimental_thread_pool + self._gracefully_shutting_down = False def _on_close_callback(self, send_response_callback, service): @@ -135,9 +136,25 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): the service """ with self._lock: + if self._gracefully_shutting_down: + return self._server_status[service] = status if service in self._send_response_callbacks: for send_response_callback in self._send_response_callbacks[ service]: send_response_callback( _health_pb2.HealthCheckResponse(status=status)) + + def enter_graceful_shutdown(self): + """Permanently sets the status of all services to NOT_SERVING. + + This should be invoked when the server is entering a graceful shutdown + period. After this method is invoked, future attempts to set the status + of a service will be ignored. + """ + with self._lock: + if self._gracefully_shutting_down: + return + for service in self._server_status: + self.set(service, _health_pb2.HealthCheckResponse.NOT_SERVING) # pylint: disable=no-member + self._gracefully_shutting_down = True diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py index f92596f5c9b..62eef8e2291 100644 --- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py +++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py @@ -203,6 +203,30 @@ class BaseWatchTests(object): 'watch set should be empty') self.assertTrue(response_queue.empty()) + def test_graceful_shutdown(self): + request = health_pb2.HealthCheckRequest(service='') + response_queue = queue.Queue() + rendezvous = self._stub.Watch(request) + thread = threading.Thread( + target=_consume_responses, args=(rendezvous, response_queue)) + thread.start() + + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response.status) + + self._servicer.enter_graceful_shutdown() + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + response.status) + + # This should be a no-op. + self._servicer.set('', health_pb2.HealthCheckResponse.SERVING) + + rendezvous.cancel() + thread.join() + self.assertTrue(response_queue.empty()) + class HealthServicerTest(BaseWatchTests.WatchTests): From fbc4ea7d8efa1081613fb79ac3d66331c8381670 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 20 Feb 2019 16:00:11 -0800 Subject: [PATCH 2/5] mark as experimental --- src/python/grpcio_health_checking/grpc_health/v1/health.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py index 04ea3b4ecfb..3d8c16ee1b6 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/health.py +++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py @@ -151,6 +151,8 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): This should be invoked when the server is entering a graceful shutdown period. After this method is invoked, future attempts to set the status of a service will be ignored. + + This is an EXPERIMENTAL API. """ with self._lock: if self._gracefully_shutting_down: From ab5b28538f16daed9ccd857b87714df49c2a4712 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 20 Feb 2019 16:20:32 -0800 Subject: [PATCH 3/5] use else: --- .../grpc_health/v1/health.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py index 3d8c16ee1b6..dc889cdc774 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/health.py +++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py @@ -118,13 +118,15 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): status = self._server_status.get(service) if status is None: status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member - send_response_callback( - _health_pb2.HealthCheckResponse(status=status)) - if service not in self._send_response_callbacks: - self._send_response_callbacks[service] = set() - self._send_response_callbacks[service].add(send_response_callback) - context.add_callback( - self._on_close_callback(send_response_callback, service)) + else: + send_response_callback( + _health_pb2.HealthCheckResponse(status=status)) + if service not in self._send_response_callbacks: + self._send_response_callbacks[service] = set() + self._send_response_callbacks[service].add( + send_response_callback) + context.add_callback( + self._on_close_callback(send_response_callback, service)) return blocking_watcher def set(self, service, status): @@ -157,6 +159,8 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): with self._lock: if self._gracefully_shutting_down: return - for service in self._server_status: - self.set(service, _health_pb2.HealthCheckResponse.NOT_SERVING) # pylint: disable=no-member - self._gracefully_shutting_down = True + else: + for service in self._server_status: + self.set(service, + _health_pb2.HealthCheckResponse.NOT_SERVING) # pylint: disable=no-member + self._gracefully_shutting_down = True From 93ef0db86b4eee95ae105f1436951e3b2f349886 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 20 Feb 2019 17:11:42 -0800 Subject: [PATCH 4/5] use else in right spot --- .../grpc_health/v1/health.py | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py index dc889cdc774..15494fafdbc 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/health.py +++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py @@ -118,15 +118,13 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): status = self._server_status.get(service) if status is None: status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member - else: - send_response_callback( - _health_pb2.HealthCheckResponse(status=status)) - if service not in self._send_response_callbacks: - self._send_response_callbacks[service] = set() - self._send_response_callbacks[service].add( - send_response_callback) - context.add_callback( - self._on_close_callback(send_response_callback, service)) + send_response_callback( + _health_pb2.HealthCheckResponse(status=status)) + if service not in self._send_response_callbacks: + self._send_response_callbacks[service] = set() + self._send_response_callbacks[service].add(send_response_callback) + context.add_callback( + self._on_close_callback(send_response_callback, service)) return blocking_watcher def set(self, service, status): @@ -140,12 +138,13 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): with self._lock: if self._gracefully_shutting_down: return - self._server_status[service] = status - if service in self._send_response_callbacks: - for send_response_callback in self._send_response_callbacks[ - service]: - send_response_callback( - _health_pb2.HealthCheckResponse(status=status)) + else: + self._server_status[service] = status + if service in self._send_response_callbacks: + for send_response_callback in self._send_response_callbacks[ + service]: + send_response_callback( + _health_pb2.HealthCheckResponse(status=status)) def enter_graceful_shutdown(self): """Permanently sets the status of all services to NOT_SERVING. From af3d32214c06a77f64db1661ed56fa8c9f86340d Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 20 Feb 2019 19:44:13 -0800 Subject: [PATCH 5/5] increase timeout --- .../grpcio_tests/tests/health_check/_health_servicer_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py index 62eef8e2291..b8da700fada 100644 --- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py +++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py @@ -194,7 +194,7 @@ class BaseWatchTests(object): thread.join() # Wait, if necessary, for serving thread to process client cancellation - timeout = time.time() + test_constants.SHORT_TIMEOUT + timeout = time.time() + test_constants.TIME_ALLOWANCE while time.time( ) < timeout and self._servicer._send_response_callbacks[_WATCH_SERVICE]: time.sleep(1)