diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index ca39cef905f..d49fe6644c3 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -26,14 +26,9 @@ class Server: def __init__(self, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression): self._loop = asyncio.get_event_loop() - self._server = cygrpc.AioServer( - self._loop, - thread_pool, - generic_handlers, - interceptors, - options, - maximum_concurrent_rpcs, - compression) + self._server = cygrpc.AioServer(self._loop, thread_pool, + generic_handlers, interceptors, options, + maximum_concurrent_rpcs, compression) self._shutdown_started = False self._shutdown_future = self._loop.create_future() @@ -99,7 +94,7 @@ class Server: If a grace period is specified, all RPCs active at the end of the grace period are aborted. - + If a grace period is not specified (by passing None for `grace`), all existing RPCs are aborted immediately and this method blocks until the last RPC handler terminates. @@ -140,7 +135,7 @@ class Server: Returns: A bool indicates if the operation times out. """ - if timeout == None: + if timeout is None: await self._shutdown_future else: try: diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index a9069d4a5f0..2809f3b4b37 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -97,7 +97,6 @@ class TestChannel(AioTestBase): self.loop.run_until_complete(coro()) - @unittest.skip('https://github.com/grpc/grpc/issues/20818') def test_call_to_the_void(self): diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index f6eabc20529..d8f1c384043 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -20,7 +20,6 @@ from grpc.experimental import aio from tests_aio.unit._test_base import AioTestBase from tests.unit.framework.common import test_constants - _SIMPLE_UNARY_UNARY = '/test/SimpleUnaryUnary' _BLOCK_FOREVER = '/test/BlockForever' _BLOCK_SHORTLY = '/test/BlockShortly' @@ -30,6 +29,7 @@ _RESPONSE = b'\x01\x01\x01' class _GenericHandler(grpc.GenericRpcHandler): + def __init__(self): self._called = asyncio.get_event_loop().create_future() @@ -40,9 +40,8 @@ class _GenericHandler(grpc.GenericRpcHandler): async def _block_forever(self, unused_request, unused_context): await asyncio.get_event_loop().create_future() - async def _block_shortly(self, unused_request, unused_context): - await asyncio.sleep(test_constants.SHORT_TIMEOUT/2) + await asyncio.sleep(test_constants.SHORT_TIMEOUT / 2) return _RESPONSE def service(self, handler_details): @@ -70,6 +69,7 @@ async def _start_test_server(): class TestServer(AioTestBase): def test_unary_unary(self): + async def test_unary_unary_body(): server_target, _, _ = await _start_test_server() @@ -79,14 +79,17 @@ class TestServer(AioTestBase): self.assertEqual(response, _RESPONSE) self.loop.run_until_complete(test_unary_unary_body()) - + def test_shutdown(self): + async def test_shutdown_body(): _, server, _ = await _start_test_server() await server.stop(None) + self.loop.run_until_complete(test_shutdown_body()) def test_shutdown_after_call(self): + async def test_shutdown_body(): server_target, server, _ = await _start_test_server() @@ -94,50 +97,59 @@ class TestServer(AioTestBase): await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST) await server.stop(None) + self.loop.run_until_complete(test_shutdown_body()) def test_graceful_shutdown_success(self): + async def test_graceful_shutdown_success_body(): server_target, server, generic_handler = await _start_test_server() channel = aio.insecure_channel(server_target) - call_task = self.loop.create_task(channel.unary_unary(_BLOCK_SHORTLY)(_REQUEST)) + call_task = self.loop.create_task( + channel.unary_unary(_BLOCK_SHORTLY)(_REQUEST)) await generic_handler.wait_for_call() await server.stop(test_constants.SHORT_TIMEOUT) await channel.close() self.assertEqual(await call_task, _RESPONSE) self.assertTrue(call_task.done()) + self.loop.run_until_complete(test_graceful_shutdown_success_body()) def test_graceful_shutdown_failed(self): + async def test_graceful_shutdown_failed_body(): server_target, server, generic_handler = await _start_test_server() channel = aio.insecure_channel(server_target) - call_task = self.loop.create_task(channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) + call_task = self.loop.create_task( + channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) await generic_handler.wait_for_call() await server.stop(test_constants.SHORT_TIMEOUT) with self.assertRaises(aio.AioRpcError) as exception_context: await call_task - self.assertEqual(exception_context.exception.code(), grpc.StatusCode.UNAVAILABLE) + self.assertEqual(exception_context.exception.code(), + grpc.StatusCode.UNAVAILABLE) self.assertIn('GOAWAY', exception_context.exception.details()) await channel.close() + self.loop.run_until_complete(test_graceful_shutdown_failed_body()) @unittest.skip('https://github.com/grpc/grpc/issues/20818') def test_shutdown_before_call(self): async def test_shutdown_body(): - server_target, server, _ =_start_test_server() + server_target, server, _ = _start_test_server() await server.stop(None) # Ensures the server is cleaned up at this point. # Some proper exception should be raised. async with aio.insecure_channel('localhost:%d' % port) as channel: await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST) + self.loop.run_until_complete(test_shutdown_body())