From 7b4d0b28c68617751a40112ca2f7ad518e3ecd7c Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 14 Feb 2020 12:24:35 -0800 Subject: [PATCH] Implement health checking servicer in AsyncIO --- .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 2 + .../grpc_health/v1/BUILD.bazel | 5 +- .../grpc_health/v1/_async.py | 107 ++++++++ .../grpc_health/v1/health.py | 5 +- .../tests_aio/health_check/BUILD.bazel | 29 +++ .../tests_aio/health_check/__init__.py | 13 + .../health_check/health_servicer_test.py | 242 ++++++++++++++++++ 7 files changed, 401 insertions(+), 2 deletions(-) create mode 100644 src/python/grpcio_health_checking/grpc_health/v1/_async.py create mode 100644 src/python/grpcio_tests/tests_aio/health_check/BUILD.bazel create mode 100644 src/python/grpcio_tests/tests_aio/health_check/__init__.py create mode 100644 src/python/grpcio_tests/tests_aio/health_check/health_servicer_test.py diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 903c20796f7..74b249bdf8f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -466,6 +466,8 @@ async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop): ) except (KeyboardInterrupt, SystemExit): raise + except asyncio.CancelledError: + _LOGGER.debug('RPC cancelled for servicer method [%s]', _decode(rpc_state.method())) except _ServerStoppedError: _LOGGER.info('Aborting RPC due to server stop.') except Exception as e: diff --git a/src/python/grpcio_health_checking/grpc_health/v1/BUILD.bazel b/src/python/grpcio_health_checking/grpc_health/v1/BUILD.bazel index 78a55a7f2db..ff7be4347ed 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/BUILD.bazel +++ b/src/python/grpcio_health_checking/grpc_health/v1/BUILD.bazel @@ -16,7 +16,10 @@ py_grpc_library( py_library( name = "grpc_health", - srcs = ["health.py"], + srcs = [ + "health.py", + "_async.py", + ], imports = ["../../"], deps = [ ":health_py_pb2", diff --git a/src/python/grpcio_health_checking/grpc_health/v1/_async.py b/src/python/grpcio_health_checking/grpc_health/v1/_async.py new file mode 100644 index 00000000000..fd2e7be3d9d --- /dev/null +++ b/src/python/grpcio_health_checking/grpc_health/v1/_async.py @@ -0,0 +1,107 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Reference implementation for health checking in gRPC Python.""" + +import logging +import asyncio +import collections + +import grpc +from grpc.experimental import aio + +from grpc_health.v1 import health_pb2 as _health_pb2 +from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc + + +class AsyncHealthServicer(_health_pb2_grpc.HealthServicer): + """An AsyncIO implementation of health checking servicer.""" + + def __init__(self): + self._lock = asyncio.Lock() + self._server_status = dict() + self._server_watchers = collections.defaultdict(asyncio.Condition) + self._gracefully_shutting_down = False + + async def Check(self, request: _health_pb2.HealthCheckRequest, context): + status = self._server_status.get(request.service) + logging.debug('Status %s, %s', request.service, status) + + if status is None: + await context.abort(grpc.StatusCode.NOT_FOUND) + else: + return _health_pb2.HealthCheckResponse(status=status) + + async def Watch(self, request: _health_pb2.HealthCheckRequest, context): + status = self._server_status.get(request.service) + + if status is None: + status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN + + try: + condition = self._server_watchers[request.service] + async with condition: + # Responds with current health state + await context.write( + _health_pb2.HealthCheckResponse(status=status)) + + # Polling on health state changes + while True: + await condition.wait() + + status = self._server_status.get(request.service) + await context.write( + _health_pb2.HealthCheckResponse(status=status)) + finally: + del self._server_watchers[request.service] + + async def _set(self, service: str, + status: _health_pb2.HealthCheckResponse.ServingStatus): + if service in self._server_watchers: + condition = self._server_watchers.get(service) + async with condition: + self._server_status[service] = status + condition.notify_all() + else: + self._server_status[service] = status + + async def set(self, service: str, + status: _health_pb2.HealthCheckResponse.ServingStatus): + """Sets the status of a service. + + Args: + service: string, the name of the service. NOTE, '' must be set. + status: HealthCheckResponse.status enum value indicating the status of + the service + """ + if self._gracefully_shutting_down: + return + else: + await self._set(service, status) + + async def enter_graceful_shutdown(self): + """Permanently sets the status of all services to NOT_SERVING. + + This should be invoked when the server is entering a graceful shutdown + period. After this method is invoked, future attempts to set the status + of a service will be ignored. + + This is an EXPERIMENTAL API. + """ + if self._gracefully_shutting_down: + return + else: + self._gracefully_shutting_down = True + for service in self._server_status: + await self._set(service, + _health_pb2.HealthCheckResponse.NOT_SERVING) diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py index 15494fafdbc..acbcd441580 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/health.py +++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py @@ -15,12 +15,15 @@ import collections import threading - +import sys import grpc from grpc_health.v1 import health_pb2 as _health_pb2 from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc +if sys.version_info[0] > 2: + from ._async import AsyncHealthServicer + SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name diff --git a/src/python/grpcio_tests/tests_aio/health_check/BUILD.bazel b/src/python/grpcio_tests/tests_aio/health_check/BUILD.bazel new file mode 100644 index 00000000000..fde3f69991f --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/health_check/BUILD.bazel @@ -0,0 +1,29 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +package(default_visibility = ["//visibility:public"]) + +py_test( + name = "health_servicer_test", + size = "small", + srcs = ["health_servicer_test.py"], + imports = ["../../"], + python_version = "PY3", + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_health_checking/grpc_health/v1:grpc_health", + "//src/python/grpcio_tests/tests/unit/framework/common", + "//src/python/grpcio_tests/tests_aio/unit:_test_base", + ], +) diff --git a/src/python/grpcio_tests/tests_aio/health_check/__init__.py b/src/python/grpcio_tests/tests_aio/health_check/__init__.py new file mode 100644 index 00000000000..1517f71d093 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/health_check/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests_aio/health_check/health_servicer_test.py b/src/python/grpcio_tests/tests_aio/health_check/health_servicer_test.py new file mode 100644 index 00000000000..0f686bb8e9c --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/health_check/health_servicer_test.py @@ -0,0 +1,242 @@ +# Copyright 2016 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests AsyncIO version of grpcio-health-checking.""" + +import asyncio +import logging +import time +import unittest + +import grpc + +from grpc_health.v1 import health +from grpc_health.v1 import health_pb2 +from grpc_health.v1 import health_pb2_grpc +from grpc.experimental import aio + +from tests.unit.framework.common import test_constants + +from tests_aio.unit._test_base import AioTestBase + +_SERVING_SERVICE = 'grpc.test.TestServiceServing' +_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown' +_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing' +_WATCH_SERVICE = 'grpc.test.WatchService' + + +async def _pipe_to_queue(call, queue): + async for response in call: + await queue.put(response) + + +class HealthServicerTest(AioTestBase): + + async def setUp(self): + self._servicer = health.AsyncHealthServicer() + await self._servicer.set('', health_pb2.HealthCheckResponse.SERVING) + await self._servicer.set(_SERVING_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + await self._servicer.set(_UNKNOWN_SERVICE, + health_pb2.HealthCheckResponse.UNKNOWN) + await self._servicer.set(_NOT_SERVING_SERVICE, + health_pb2.HealthCheckResponse.NOT_SERVING) + self._server = aio.server() + port = self._server.add_insecure_port('[::]:0') + health_pb2_grpc.add_HealthServicer_to_server(self._servicer, + self._server) + await self._server.start() + + self._channel = aio.insecure_channel('localhost:%d' % port) + self._stub = health_pb2_grpc.HealthStub(self._channel) + + async def tearDown(self): + await self._channel.close() + await self._server.stop(None) + + async def test_check_empty_service(self): + request = health_pb2.HealthCheckRequest() + resp = await self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) + + async def test_check_serving_service(self): + request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE) + resp = await self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) + + async def test_check_unknown_service(self): + request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE) + resp = await self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status) + + async def test_check_not_serving_service(self): + request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE) + resp = await self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + resp.status) + + async def test_check_not_found_service(self): + request = health_pb2.HealthCheckRequest(service='not-found') + with self.assertRaises(grpc.RpcError) as context: + await self._stub.Check(request) + + self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code()) + + async def test_health_service_name(self): + self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health') + + async def test_watch_empty_service(self): + request = health_pb2.HealthCheckRequest(service='') + + call = self._stub.Watch(request) + queue = asyncio.Queue() + task = self.loop.create_task(_pipe_to_queue(call, queue)) + + response = await queue.get() + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response.status) + + call.cancel() + await task + self.assertTrue(queue.empty()) + + async def test_watch_new_service(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + call = self._stub.Watch(request) + queue = asyncio.Queue() + task = self.loop.create_task(_pipe_to_queue(call, queue)) + + response = await queue.get() + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response.status) + + await self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + response = await queue.get() + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response.status) + + await self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.NOT_SERVING) + response = await queue.get() + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + response.status) + + call.cancel() + await task + self.assertTrue(queue.empty()) + + async def test_watch_service_isolation(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + call = self._stub.Watch(request) + queue = asyncio.Queue() + task = self.loop.create_task(_pipe_to_queue(call, queue)) + + response = await queue.get() + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response.status) + + await self._servicer.set('some-other-service', + health_pb2.HealthCheckResponse.SERVING) + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(queue.get(), test_constants.SHORT_TIMEOUT) + + call.cancel() + await task + self.assertTrue(queue.empty()) + + async def test_two_watchers(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + queue1 = asyncio.Queue() + queue2 = asyncio.Queue() + call1 = self._stub.Watch(request) + call2 = self._stub.Watch(request) + task1 = self.loop.create_task(_pipe_to_queue(call1, queue1)) + task2 = self.loop.create_task(_pipe_to_queue(call2, queue2)) + + response1 = await queue1.get() + response2 = await queue2.get() + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response1.status) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response2.status) + + await self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + response1 = await queue1.get() + response2 = await queue2.get() + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response1.status) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response2.status) + + call1.cancel() + call2.cancel() + await task1 + await task2 + self.assertTrue(queue1.empty()) + self.assertTrue(queue2.empty()) + + async def test_cancelled_watch_removed_from_watch_list(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + call = self._stub.Watch(request) + queue = asyncio.Queue() + task = self.loop.create_task(_pipe_to_queue(call, queue)) + + response = await queue.get() + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response.status) + + call.cancel() + await self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + await task + + # Wait for the serving coroutine to process client cancellation. + timeout = time.time() + test_constants.TIME_ALLOWANCE + while (time.time() < timeout and self._servicer._server_watchers): + await asyncio.sleep(1) + self.assertFalse(self._servicer._server_watchers, + 'There should not be any watcher left') + self.assertTrue(queue.empty()) + + async def test_graceful_shutdown(self): + request = health_pb2.HealthCheckRequest(service='') + call = self._stub.Watch(request) + queue = asyncio.Queue() + task = self.loop.create_task(_pipe_to_queue(call, queue)) + + response = await queue.get() + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response.status) + + await self._servicer.enter_graceful_shutdown() + response = await queue.get() + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + response.status) + + # This should be a no-op. + await self._servicer.set('', health_pb2.HealthCheckResponse.SERVING) + + resp = await self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + resp.status) + + call.cancel() + await task + self.assertTrue(queue.empty()) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + unittest.main(verbosity=2)