diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py index b08297a5d71..15494fafdbc 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/health.py +++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py @@ -82,6 +82,7 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): self._send_response_callbacks = {} self.Watch.__func__.experimental_non_blocking = experimental_non_blocking self.Watch.__func__.experimental_thread_pool = experimental_thread_pool + self._gracefully_shutting_down = False def _on_close_callback(self, send_response_callback, service): @@ -135,9 +136,30 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): the service """ with self._lock: - self._server_status[service] = status - if service in self._send_response_callbacks: - for send_response_callback in self._send_response_callbacks[ - service]: - send_response_callback( - _health_pb2.HealthCheckResponse(status=status)) + if self._gracefully_shutting_down: + return + else: + self._server_status[service] = status + if service in self._send_response_callbacks: + for send_response_callback in self._send_response_callbacks[ + service]: + send_response_callback( + _health_pb2.HealthCheckResponse(status=status)) + + def enter_graceful_shutdown(self): + """Permanently sets the status of all services to NOT_SERVING. + + This should be invoked when the server is entering a graceful shutdown + period. After this method is invoked, future attempts to set the status + of a service will be ignored. + + This is an EXPERIMENTAL API. + """ + with self._lock: + if self._gracefully_shutting_down: + return + else: + for service in self._server_status: + self.set(service, + _health_pb2.HealthCheckResponse.NOT_SERVING) # pylint: disable=no-member + self._gracefully_shutting_down = True diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py index f92596f5c9b..b8da700fada 100644 --- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py +++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py @@ -194,7 +194,7 @@ class BaseWatchTests(object): thread.join() # Wait, if necessary, for serving thread to process client cancellation - timeout = time.time() + test_constants.SHORT_TIMEOUT + timeout = time.time() + test_constants.TIME_ALLOWANCE while time.time( ) < timeout and self._servicer._send_response_callbacks[_WATCH_SERVICE]: time.sleep(1) @@ -203,6 +203,30 @@ class BaseWatchTests(object): 'watch set should be empty') self.assertTrue(response_queue.empty()) + def test_graceful_shutdown(self): + request = health_pb2.HealthCheckRequest(service='') + response_queue = queue.Queue() + rendezvous = self._stub.Watch(request) + thread = threading.Thread( + target=_consume_responses, args=(rendezvous, response_queue)) + thread.start() + + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response.status) + + self._servicer.enter_graceful_shutdown() + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + response.status) + + # This should be a no-op. + self._servicer.set('', health_pb2.HealthCheckResponse.SERVING) + + rendezvous.cancel() + thread.join() + self.assertTrue(response_queue.empty()) + class HealthServicerTest(BaseWatchTests.WatchTests):