diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 903c20796f7..74b249bdf8f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -466,6 +466,8 @@ async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop): ) except (KeyboardInterrupt, SystemExit): raise + except asyncio.CancelledError: + _LOGGER.debug('RPC cancelled for servicer method [%s]', _decode(rpc_state.method())) except _ServerStoppedError: _LOGGER.info('Aborting RPC due to server stop.') except Exception as e: diff --git a/src/python/grpcio_health_checking/grpc_health/v1/BUILD.bazel b/src/python/grpcio_health_checking/grpc_health/v1/BUILD.bazel index 78a55a7f2db..fe4aaed7aa8 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/BUILD.bazel +++ b/src/python/grpcio_health_checking/grpc_health/v1/BUILD.bazel @@ -16,7 +16,10 @@ py_grpc_library( py_library( name = "grpc_health", - srcs = ["health.py"], + srcs = [ + "_async.py", + "health.py", + ], imports = ["../../"], deps = [ ":health_py_pb2", diff --git a/src/python/grpcio_health_checking/grpc_health/v1/_async.py b/src/python/grpcio_health_checking/grpc_health/v1/_async.py new file mode 100644 index 00000000000..859cadbd642 --- /dev/null +++ b/src/python/grpcio_health_checking/grpc_health/v1/_async.py @@ -0,0 +1,113 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Reference implementation for health checking in gRPC Python.""" + +import asyncio +import collections +from typing import MutableMapping +import grpc + +from grpc_health.v1 import health_pb2 as _health_pb2 +from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc + + +class HealthServicer(_health_pb2_grpc.HealthServicer): + """An AsyncIO implementation of health checking servicer.""" + _server_status: MutableMapping[ + str, '_health_pb2.HealthCheckResponse.ServingStatus'] + _server_watchers: MutableMapping[str, asyncio.Condition] + _gracefully_shutting_down: bool + + def __init__(self) -> None: + self._server_status = dict() + self._server_watchers = collections.defaultdict(asyncio.Condition) + self._gracefully_shutting_down = False + + async def Check(self, request: _health_pb2.HealthCheckRequest, + context) -> None: + status = self._server_status.get(request.service) + + if status is None: + await context.abort(grpc.StatusCode.NOT_FOUND) + else: + return _health_pb2.HealthCheckResponse(status=status) + + async def Watch(self, request: _health_pb2.HealthCheckRequest, + context) -> None: + condition = self._server_watchers[request.service] + last_status = None + try: + async with condition: + while True: + status = self._server_status.get( + request.service, + _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN) + + # NOTE(lidiz) If the observed status is the same, it means + # there are missing intermediate statuses. It's considered + # acceptable since peer only interested in eventual status. + if status != last_status: + # Responds with current health state + await context.write( + _health_pb2.HealthCheckResponse(status=status)) + + # Records the last sent status + last_status = status + + # Polling on health state changes + await condition.wait() + finally: + if request.service in self._server_watchers: + del self._server_watchers[request.service] + + async def _set(self, service: str, + status: _health_pb2.HealthCheckResponse.ServingStatus + ) -> None: + if service in self._server_watchers: + condition = self._server_watchers.get(service) + async with condition: + self._server_status[service] = status + condition.notify_all() + else: + self._server_status[service] = status + + async def set(self, service: str, + status: _health_pb2.HealthCheckResponse.ServingStatus + ) -> None: + """Sets the status of a service. + + Args: + service: string, the name of the service. + status: HealthCheckResponse.status enum value indicating the status of + the service + """ + if self._gracefully_shutting_down: + return + else: + await self._set(service, status) + + async def enter_graceful_shutdown(self) -> None: + """Permanently sets the status of all services to NOT_SERVING. + + This should be invoked when the server is entering a graceful shutdown + period. After this method is invoked, future attempts to set the status + of a service will be ignored. + """ + if self._gracefully_shutting_down: + return + else: + self._gracefully_shutting_down = True + for service in self._server_status: + await self._set(service, + _health_pb2.HealthCheckResponse.NOT_SERVING) diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py index 15494fafdbc..05a16c73380 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/health.py +++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py @@ -15,13 +15,20 @@ import collections import threading - +import sys import grpc from grpc_health.v1 import health_pb2 as _health_pb2 from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc +if sys.version_info[0] >= 3 and sys.version_info[1] >= 6: + # Exposes AsyncHealthServicer as public API. + from . import _async as aio # pylint: disable=unused-import + +# The service name of the health checking servicer. SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name +# The entry of overall health for the entire server. +OVERALL_HEALTH = '' class _Watcher(): @@ -131,7 +138,7 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): """Sets the status of a service. Args: - service: string, the name of the service. NOTE, '' must be set. + service: string, the name of the service. status: HealthCheckResponse.status enum value indicating the status of the service """ diff --git a/src/python/grpcio_tests/tests_aio/health_check/BUILD.bazel b/src/python/grpcio_tests/tests_aio/health_check/BUILD.bazel new file mode 100644 index 00000000000..4e1b088d0b8 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/health_check/BUILD.bazel @@ -0,0 +1,29 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +package(default_testonly = 1) + +py_test( + name = "health_servicer_test", + size = "small", + srcs = ["health_servicer_test.py"], + imports = ["../../"], + python_version = "PY3", + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_health_checking/grpc_health/v1:grpc_health", + "//src/python/grpcio_tests/tests/unit/framework/common", + "//src/python/grpcio_tests/tests_aio/unit:_test_base", + ], +) diff --git a/src/python/grpcio_tests/tests_aio/health_check/__init__.py b/src/python/grpcio_tests/tests_aio/health_check/__init__.py new file mode 100644 index 00000000000..1517f71d093 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/health_check/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests_aio/health_check/health_servicer_test.py b/src/python/grpcio_tests/tests_aio/health_check/health_servicer_test.py new file mode 100644 index 00000000000..71166a2beff --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/health_check/health_servicer_test.py @@ -0,0 +1,262 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests AsyncIO version of grpcio-health-checking.""" + +import asyncio +import logging +import time +import random +import unittest + +import grpc + +from grpc_health.v1 import health +from grpc_health.v1 import health_pb2 +from grpc_health.v1 import health_pb2_grpc +from grpc.experimental import aio + +from tests.unit.framework.common import test_constants + +from tests_aio.unit._test_base import AioTestBase + +_SERVING_SERVICE = 'grpc.test.TestServiceServing' +_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown' +_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing' +_WATCH_SERVICE = 'grpc.test.WatchService' + +_LARGE_NUMBER_OF_STATUS_CHANGES = 1000 + + +async def _pipe_to_queue(call, queue): + async for response in call: + await queue.put(response) + + +class HealthServicerTest(AioTestBase): + + async def setUp(self): + self._servicer = health.aio.HealthServicer() + await self._servicer.set(health.OVERALL_HEALTH, + health_pb2.HealthCheckResponse.SERVING) + await self._servicer.set(_SERVING_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + await self._servicer.set(_UNKNOWN_SERVICE, + health_pb2.HealthCheckResponse.UNKNOWN) + await self._servicer.set(_NOT_SERVING_SERVICE, + health_pb2.HealthCheckResponse.NOT_SERVING) + self._server = aio.server() + port = self._server.add_insecure_port('[::]:0') + health_pb2_grpc.add_HealthServicer_to_server(self._servicer, + self._server) + await self._server.start() + + self._channel = aio.insecure_channel('localhost:%d' % port) + self._stub = health_pb2_grpc.HealthStub(self._channel) + + async def tearDown(self): + await self._channel.close() + await self._server.stop(None) + + async def test_check_empty_service(self): + request = health_pb2.HealthCheckRequest() + resp = await self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) + + async def test_check_serving_service(self): + request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE) + resp = await self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) + + async def test_check_unknown_service(self): + request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE) + resp = await self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status) + + async def test_check_not_serving_service(self): + request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE) + resp = await self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + resp.status) + + async def test_check_not_found_service(self): + request = health_pb2.HealthCheckRequest(service='not-found') + with self.assertRaises(aio.AioRpcError) as context: + await self._stub.Check(request) + + self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code()) + + async def test_health_service_name(self): + self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health') + + async def test_watch_empty_service(self): + request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH) + + call = self._stub.Watch(request) + queue = asyncio.Queue() + task = self.loop.create_task(_pipe_to_queue(call, queue)) + + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + (await queue.get()).status) + + call.cancel() + await task + self.assertTrue(queue.empty()) + + async def test_watch_new_service(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + call = self._stub.Watch(request) + queue = asyncio.Queue() + task = self.loop.create_task(_pipe_to_queue(call, queue)) + + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + (await queue.get()).status) + + await self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + (await queue.get()).status) + + await self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.NOT_SERVING) + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + (await queue.get()).status) + + call.cancel() + await task + self.assertTrue(queue.empty()) + + async def test_watch_service_isolation(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + call = self._stub.Watch(request) + queue = asyncio.Queue() + task = self.loop.create_task(_pipe_to_queue(call, queue)) + + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + (await queue.get()).status) + + await self._servicer.set('some-other-service', + health_pb2.HealthCheckResponse.SERVING) + # The change of health status in other service should be isolated. + # Hence, no additional notification should be observed. + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(queue.get(), test_constants.SHORT_TIMEOUT) + + call.cancel() + await task + self.assertTrue(queue.empty()) + + async def test_two_watchers(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + queue1 = asyncio.Queue() + queue2 = asyncio.Queue() + call1 = self._stub.Watch(request) + call2 = self._stub.Watch(request) + task1 = self.loop.create_task(_pipe_to_queue(call1, queue1)) + task2 = self.loop.create_task(_pipe_to_queue(call2, queue2)) + + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + (await queue1.get()).status) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + (await queue2.get()).status) + + await self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + (await queue1.get()).status) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + (await queue2.get()).status) + + call1.cancel() + call2.cancel() + await task1 + await task2 + self.assertTrue(queue1.empty()) + self.assertTrue(queue2.empty()) + + async def test_cancelled_watch_removed_from_watch_list(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + call = self._stub.Watch(request) + queue = asyncio.Queue() + task = self.loop.create_task(_pipe_to_queue(call, queue)) + + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + (await queue.get()).status) + + call.cancel() + await self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + await task + + # Wait for the serving coroutine to process client cancellation. + timeout = time.monotonic() + test_constants.TIME_ALLOWANCE + while (time.monotonic() < timeout and self._servicer._server_watchers): + await asyncio.sleep(1) + self.assertFalse(self._servicer._server_watchers, + 'There should not be any watcher left') + self.assertTrue(queue.empty()) + + async def test_graceful_shutdown(self): + request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH) + call = self._stub.Watch(request) + queue = asyncio.Queue() + task = self.loop.create_task(_pipe_to_queue(call, queue)) + + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + (await queue.get()).status) + + await self._servicer.enter_graceful_shutdown() + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + (await queue.get()).status) + + # This should be a no-op. + await self._servicer.set(health.OVERALL_HEALTH, + health_pb2.HealthCheckResponse.SERVING) + + resp = await self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + resp.status) + + call.cancel() + await task + self.assertTrue(queue.empty()) + + async def test_no_duplicate_status(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + call = self._stub.Watch(request) + queue = asyncio.Queue() + task = self.loop.create_task(_pipe_to_queue(call, queue)) + + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + (await queue.get()).status) + last_status = health_pb2.HealthCheckResponse.SERVICE_UNKNOWN + + for _ in range(_LARGE_NUMBER_OF_STATUS_CHANGES): + if random.randint(0, 1) == 0: + status = health_pb2.HealthCheckResponse.SERVING + else: + status = health_pb2.HealthCheckResponse.NOT_SERVING + + await self._servicer.set(_WATCH_SERVICE, status) + if status != last_status: + self.assertEqual(status, (await queue.get()).status) + last_status = status + + call.cancel() + await task + self.assertTrue(queue.empty()) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/tests.json b/src/python/grpcio_tests/tests_aio/tests.json index e05d64ac474..ec21a93476e 100644 --- a/src/python/grpcio_tests/tests_aio/tests.json +++ b/src/python/grpcio_tests/tests_aio/tests.json @@ -1,5 +1,6 @@ [ "_sanity._sanity_test.AioSanityTest", + "health_check.health_servicer_test.HealthServicerTest", "interop.local_interop_test.InsecureLocalInteropTest", "interop.local_interop_test.SecureLocalInteropTest", "unit.abort_test.TestAbort",