Merge pull request #22028 from lidizheng/aio-health-checking-2

[Aio] Implement health checking servicer in AsyncIO
pull/22066/head
Lidi Zheng 5 years ago committed by GitHub
commit 4c14e507cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  2. 5
      src/python/grpcio_health_checking/grpc_health/v1/BUILD.bazel
  3. 113
      src/python/grpcio_health_checking/grpc_health/v1/_async.py
  4. 11
      src/python/grpcio_health_checking/grpc_health/v1/health.py
  5. 29
      src/python/grpcio_tests/tests_aio/health_check/BUILD.bazel
  6. 13
      src/python/grpcio_tests/tests_aio/health_check/__init__.py
  7. 262
      src/python/grpcio_tests/tests_aio/health_check/health_servicer_test.py
  8. 1
      src/python/grpcio_tests/tests_aio/tests.json

@ -466,6 +466,8 @@ async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop):
) )
except (KeyboardInterrupt, SystemExit): except (KeyboardInterrupt, SystemExit):
raise raise
except asyncio.CancelledError:
_LOGGER.debug('RPC cancelled for servicer method [%s]', _decode(rpc_state.method()))
except _ServerStoppedError: except _ServerStoppedError:
_LOGGER.info('Aborting RPC due to server stop.') _LOGGER.info('Aborting RPC due to server stop.')
except Exception as e: except Exception as e:

@ -16,7 +16,10 @@ py_grpc_library(
py_library( py_library(
name = "grpc_health", name = "grpc_health",
srcs = ["health.py"], srcs = [
"_async.py",
"health.py",
],
imports = ["../../"], imports = ["../../"],
deps = [ deps = [
":health_py_pb2", ":health_py_pb2",

@ -0,0 +1,113 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Reference implementation for health checking in gRPC Python."""
import asyncio
import collections
from typing import MutableMapping
import grpc
from grpc_health.v1 import health_pb2 as _health_pb2
from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc
class HealthServicer(_health_pb2_grpc.HealthServicer):
"""An AsyncIO implementation of health checking servicer."""
_server_status: MutableMapping[
str, '_health_pb2.HealthCheckResponse.ServingStatus']
_server_watchers: MutableMapping[str, asyncio.Condition]
_gracefully_shutting_down: bool
def __init__(self) -> None:
self._server_status = dict()
self._server_watchers = collections.defaultdict(asyncio.Condition)
self._gracefully_shutting_down = False
async def Check(self, request: _health_pb2.HealthCheckRequest,
context) -> None:
status = self._server_status.get(request.service)
if status is None:
await context.abort(grpc.StatusCode.NOT_FOUND)
else:
return _health_pb2.HealthCheckResponse(status=status)
async def Watch(self, request: _health_pb2.HealthCheckRequest,
context) -> None:
condition = self._server_watchers[request.service]
last_status = None
try:
async with condition:
while True:
status = self._server_status.get(
request.service,
_health_pb2.HealthCheckResponse.SERVICE_UNKNOWN)
# NOTE(lidiz) If the observed status is the same, it means
# there are missing intermediate statuses. It's considered
# acceptable since peer only interested in eventual status.
if status != last_status:
# Responds with current health state
await context.write(
_health_pb2.HealthCheckResponse(status=status))
# Records the last sent status
last_status = status
# Polling on health state changes
await condition.wait()
finally:
if request.service in self._server_watchers:
del self._server_watchers[request.service]
async def _set(self, service: str,
status: _health_pb2.HealthCheckResponse.ServingStatus
) -> None:
if service in self._server_watchers:
condition = self._server_watchers.get(service)
async with condition:
self._server_status[service] = status
condition.notify_all()
else:
self._server_status[service] = status
async def set(self, service: str,
status: _health_pb2.HealthCheckResponse.ServingStatus
) -> None:
"""Sets the status of a service.
Args:
service: string, the name of the service.
status: HealthCheckResponse.status enum value indicating the status of
the service
"""
if self._gracefully_shutting_down:
return
else:
await self._set(service, status)
async def enter_graceful_shutdown(self) -> None:
"""Permanently sets the status of all services to NOT_SERVING.
This should be invoked when the server is entering a graceful shutdown
period. After this method is invoked, future attempts to set the status
of a service will be ignored.
"""
if self._gracefully_shutting_down:
return
else:
self._gracefully_shutting_down = True
for service in self._server_status:
await self._set(service,
_health_pb2.HealthCheckResponse.NOT_SERVING)

@ -15,13 +15,20 @@
import collections import collections
import threading import threading
import sys
import grpc import grpc
from grpc_health.v1 import health_pb2 as _health_pb2 from grpc_health.v1 import health_pb2 as _health_pb2
from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc
if sys.version_info[0] >= 3 and sys.version_info[1] >= 6:
# Exposes AsyncHealthServicer as public API.
from . import _async as aio # pylint: disable=unused-import
# The service name of the health checking servicer.
SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name
# The entry of overall health for the entire server.
OVERALL_HEALTH = ''
class _Watcher(): class _Watcher():
@ -131,7 +138,7 @@ class HealthServicer(_health_pb2_grpc.HealthServicer):
"""Sets the status of a service. """Sets the status of a service.
Args: Args:
service: string, the name of the service. NOTE, '' must be set. service: string, the name of the service.
status: HealthCheckResponse.status enum value indicating the status of status: HealthCheckResponse.status enum value indicating the status of
the service the service
""" """

@ -0,0 +1,29 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
package(default_testonly = 1)
py_test(
name = "health_servicer_test",
size = "small",
srcs = ["health_servicer_test.py"],
imports = ["../../"],
python_version = "PY3",
deps = [
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_health_checking/grpc_health/v1:grpc_health",
"//src/python/grpcio_tests/tests/unit/framework/common",
"//src/python/grpcio_tests/tests_aio/unit:_test_base",
],
)

@ -0,0 +1,13 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,262 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests AsyncIO version of grpcio-health-checking."""
import asyncio
import logging
import time
import random
import unittest
import grpc
from grpc_health.v1 import health
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
from grpc.experimental import aio
from tests.unit.framework.common import test_constants
from tests_aio.unit._test_base import AioTestBase
_SERVING_SERVICE = 'grpc.test.TestServiceServing'
_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown'
_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing'
_WATCH_SERVICE = 'grpc.test.WatchService'
_LARGE_NUMBER_OF_STATUS_CHANGES = 1000
async def _pipe_to_queue(call, queue):
async for response in call:
await queue.put(response)
class HealthServicerTest(AioTestBase):
async def setUp(self):
self._servicer = health.aio.HealthServicer()
await self._servicer.set(health.OVERALL_HEALTH,
health_pb2.HealthCheckResponse.SERVING)
await self._servicer.set(_SERVING_SERVICE,
health_pb2.HealthCheckResponse.SERVING)
await self._servicer.set(_UNKNOWN_SERVICE,
health_pb2.HealthCheckResponse.UNKNOWN)
await self._servicer.set(_NOT_SERVING_SERVICE,
health_pb2.HealthCheckResponse.NOT_SERVING)
self._server = aio.server()
port = self._server.add_insecure_port('[::]:0')
health_pb2_grpc.add_HealthServicer_to_server(self._servicer,
self._server)
await self._server.start()
self._channel = aio.insecure_channel('localhost:%d' % port)
self._stub = health_pb2_grpc.HealthStub(self._channel)
async def tearDown(self):
await self._channel.close()
await self._server.stop(None)
async def test_check_empty_service(self):
request = health_pb2.HealthCheckRequest()
resp = await self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
async def test_check_serving_service(self):
request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
resp = await self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
async def test_check_unknown_service(self):
request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
resp = await self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
async def test_check_not_serving_service(self):
request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
resp = await self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
resp.status)
async def test_check_not_found_service(self):
request = health_pb2.HealthCheckRequest(service='not-found')
with self.assertRaises(aio.AioRpcError) as context:
await self._stub.Check(request)
self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
async def test_health_service_name(self):
self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health')
async def test_watch_empty_service(self):
request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH)
call = self._stub.Watch(request)
queue = asyncio.Queue()
task = self.loop.create_task(_pipe_to_queue(call, queue))
self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
(await queue.get()).status)
call.cancel()
await task
self.assertTrue(queue.empty())
async def test_watch_new_service(self):
request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
call = self._stub.Watch(request)
queue = asyncio.Queue()
task = self.loop.create_task(_pipe_to_queue(call, queue))
self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
(await queue.get()).status)
await self._servicer.set(_WATCH_SERVICE,
health_pb2.HealthCheckResponse.SERVING)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
(await queue.get()).status)
await self._servicer.set(_WATCH_SERVICE,
health_pb2.HealthCheckResponse.NOT_SERVING)
self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
(await queue.get()).status)
call.cancel()
await task
self.assertTrue(queue.empty())
async def test_watch_service_isolation(self):
request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
call = self._stub.Watch(request)
queue = asyncio.Queue()
task = self.loop.create_task(_pipe_to_queue(call, queue))
self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
(await queue.get()).status)
await self._servicer.set('some-other-service',
health_pb2.HealthCheckResponse.SERVING)
# The change of health status in other service should be isolated.
# Hence, no additional notification should be observed.
with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(queue.get(), test_constants.SHORT_TIMEOUT)
call.cancel()
await task
self.assertTrue(queue.empty())
async def test_two_watchers(self):
request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
call1 = self._stub.Watch(request)
call2 = self._stub.Watch(request)
task1 = self.loop.create_task(_pipe_to_queue(call1, queue1))
task2 = self.loop.create_task(_pipe_to_queue(call2, queue2))
self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
(await queue1.get()).status)
self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
(await queue2.get()).status)
await self._servicer.set(_WATCH_SERVICE,
health_pb2.HealthCheckResponse.SERVING)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
(await queue1.get()).status)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
(await queue2.get()).status)
call1.cancel()
call2.cancel()
await task1
await task2
self.assertTrue(queue1.empty())
self.assertTrue(queue2.empty())
async def test_cancelled_watch_removed_from_watch_list(self):
request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
call = self._stub.Watch(request)
queue = asyncio.Queue()
task = self.loop.create_task(_pipe_to_queue(call, queue))
self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
(await queue.get()).status)
call.cancel()
await self._servicer.set(_WATCH_SERVICE,
health_pb2.HealthCheckResponse.SERVING)
await task
# Wait for the serving coroutine to process client cancellation.
timeout = time.monotonic() + test_constants.TIME_ALLOWANCE
while (time.monotonic() < timeout and self._servicer._server_watchers):
await asyncio.sleep(1)
self.assertFalse(self._servicer._server_watchers,
'There should not be any watcher left')
self.assertTrue(queue.empty())
async def test_graceful_shutdown(self):
request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH)
call = self._stub.Watch(request)
queue = asyncio.Queue()
task = self.loop.create_task(_pipe_to_queue(call, queue))
self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
(await queue.get()).status)
await self._servicer.enter_graceful_shutdown()
self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
(await queue.get()).status)
# This should be a no-op.
await self._servicer.set(health.OVERALL_HEALTH,
health_pb2.HealthCheckResponse.SERVING)
resp = await self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
resp.status)
call.cancel()
await task
self.assertTrue(queue.empty())
async def test_no_duplicate_status(self):
request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
call = self._stub.Watch(request)
queue = asyncio.Queue()
task = self.loop.create_task(_pipe_to_queue(call, queue))
self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
(await queue.get()).status)
last_status = health_pb2.HealthCheckResponse.SERVICE_UNKNOWN
for _ in range(_LARGE_NUMBER_OF_STATUS_CHANGES):
if random.randint(0, 1) == 0:
status = health_pb2.HealthCheckResponse.SERVING
else:
status = health_pb2.HealthCheckResponse.NOT_SERVING
await self._servicer.set(_WATCH_SERVICE, status)
if status != last_status:
self.assertEqual(status, (await queue.get()).status)
last_status = status
call.cancel()
await task
self.assertTrue(queue.empty())
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main(verbosity=2)

@ -1,5 +1,6 @@
[ [
"_sanity._sanity_test.AioSanityTest", "_sanity._sanity_test.AioSanityTest",
"health_check.health_servicer_test.HealthServicerTest",
"interop.local_interop_test.InsecureLocalInteropTest", "interop.local_interop_test.InsecureLocalInteropTest",
"interop.local_interop_test.SecureLocalInteropTest", "interop.local_interop_test.SecureLocalInteropTest",
"unit.abort_test.TestAbort", "unit.abort_test.TestAbort",

Loading…
Cancel
Save