From 0a423d05ca03fd8f787e2c441f80e1408ba6a9c6 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 25 Oct 2019 17:31:58 -0700 Subject: [PATCH] Add 4 server tests and 1 channel tests * Improve the shutdown process * Refactor the AioRpcError --- .../_cython/_cygrpc/aio/iomgr/socket.pyx.pxi | 2 + .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 4 +- .../grpcio/grpc/experimental/aio/__init__.py | 5 + .../tests_aio/unit/channel_test.py | 27 ++++- .../grpcio_tests/tests_aio/unit/init_test.py | 39 +++++++ .../tests_aio/unit/server_test.py | 106 +++++++++++------- 6 files changed, 139 insertions(+), 44 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 2d56a568348..7c5c3fa2e9a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -152,6 +152,8 @@ cdef class _AsyncioSocket: cdef void close(self): if self.is_connected(): self._writer.close() + if self._server: + self._server.close() def _new_connection_callback(self, object reader, object writer): client_socket = _AsyncioSocket.create( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 7f17a695017..413abc24b56 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -305,8 +305,8 @@ cdef class AioServer: async def _server_main_loop(self, object server_started): self._server.start(backup_queue=False) - server_started.set_result(True) cdef RPCState rpc_state + server_started.set_result(True) while True: # When shutdown process starts, no more new connections. @@ -377,7 +377,7 @@ cdef class AioServer: await shutdown_completed else: try: - await asyncio.wait_for(shutdown_completed, grace) + await asyncio.wait_for(asyncio.shield(shutdown_completed), grace) except asyncio.TimeoutError: # Cancels all ongoing calls by the end of grace period. grpc_server_cancel_all_calls(self._server.c_server) diff --git a/src/python/grpcio/grpc/experimental/aio/__init__.py b/src/python/grpcio/grpc/experimental/aio/__init__.py index 696db001133..3a736197d65 100644 --- a/src/python/grpcio/grpc/experimental/aio/__init__.py +++ b/src/python/grpcio/grpc/experimental/aio/__init__.py @@ -17,6 +17,11 @@ import abc import six import grpc +<<<<<<< HEAD +======= +from grpc import _common +from grpc._cython import cygrpc +>>>>>>> Add 4 server tests and 1 channel tests from grpc._cython.cygrpc import init_grpc_aio from ._call import AioRpcError diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index 96817c61a6f..a9069d4a5f0 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -22,6 +22,9 @@ from tests.unit.framework.common import test_constants from tests_aio.unit._test_server import start_test_server from tests_aio.unit._test_base import AioTestBase +_UNARY_CALL_METHOD = '/grpc.testing.TestService/UnaryCall' +_EMPTY_CALL_METHOD = '/grpc.testing.TestService/EmptyCall' + class TestChannel(AioTestBase): @@ -32,7 +35,7 @@ class TestChannel(AioTestBase): async with aio.insecure_channel(server_target) as channel: hi = channel.unary_unary( - '/grpc.testing.TestService/UnaryCall', + _UNARY_CALL_METHOD, request_serializer=messages_pb2.SimpleRequest. SerializeToString, response_deserializer=messages_pb2.SimpleResponse.FromString @@ -48,7 +51,7 @@ class TestChannel(AioTestBase): channel = aio.insecure_channel(server_target) hi = channel.unary_unary( - '/grpc.testing.TestService/UnaryCall', + _UNARY_CALL_METHOD, request_serializer=messages_pb2.SimpleRequest.SerializeToString, response_deserializer=messages_pb2.SimpleResponse.FromString) response = await hi(messages_pb2.SimpleRequest()) @@ -66,7 +69,7 @@ class TestChannel(AioTestBase): async with aio.insecure_channel(server_target) as channel: empty_call_with_sleep = channel.unary_unary( - "/grpc.testing.TestService/EmptyCall", + _EMPTY_CALL_METHOD, request_serializer=messages_pb2.SimpleRequest. SerializeToString, response_deserializer=messages_pb2.SimpleResponse. @@ -95,6 +98,24 @@ class TestChannel(AioTestBase): self.loop.run_until_complete(coro()) + @unittest.skip('https://github.com/grpc/grpc/issues/20818') + def test_call_to_the_void(self): + + async def coro(): + channel = aio.insecure_channel('0.1.1.1:1111') + hi = channel.unary_unary( + _UNARY_CALL_METHOD, + request_serializer=messages_pb2.SimpleRequest.SerializeToString, + response_deserializer=messages_pb2.SimpleResponse.FromString) + response = await hi(messages_pb2.SimpleRequest()) + + self.assertEqual(type(response), messages_pb2.SimpleResponse) + + await channel.close() + + self.loop.run_until_complete(coro()) + + if __name__ == '__main__': logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/unit/init_test.py b/src/python/grpcio_tests/tests_aio/unit/init_test.py index 9f5d8bb0d85..300e2d92766 100644 --- a/src/python/grpcio_tests/tests_aio/unit/init_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/init_test.py @@ -19,6 +19,45 @@ from tests_aio.unit._test_server import start_test_server from tests_aio.unit._test_base import AioTestBase +class TestAioRpcError(unittest.TestCase): + _TEST_INITIAL_METADATA = ("initial metadata",) + _TEST_TRAILING_METADATA = ("trailing metadata",) + + def test_attributes(self): + aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", self._TEST_TRAILING_METADATA) + self.assertEqual(aio_rpc_error.initial_metadata(), + self._TEST_INITIAL_METADATA) + self.assertEqual(aio_rpc_error.code(), grpc.StatusCode.OK) + self.assertEqual(aio_rpc_error.details(), "details") + self.assertEqual(aio_rpc_error.trailing_metadata(), + self._TEST_TRAILING_METADATA) + + def test_class_hierarchy(self): + aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", self._TEST_TRAILING_METADATA) + + self.assertIsInstance(aio_rpc_error, grpc.RpcError) + + def test_class_attributes(self): + aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", self._TEST_TRAILING_METADATA) + self.assertEqual(aio_rpc_error.__class__.__name__, "AioRpcError") + self.assertEqual(aio_rpc_error.__class__.__doc__, + aio.AioRpcError.__doc__) + + def test_class_singleton(self): + first_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", + self._TEST_TRAILING_METADATA) + second_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", + self._TEST_TRAILING_METADATA) + + self.assertIs(first_aio_rpc_error.__class__, + second_aio_rpc_error.__class__) + + class TestInsecureChannel(AioTestBase): def test_insecure_channel(self): diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index c1e6eddc81e..f6eabc20529 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -18,42 +18,62 @@ import unittest import grpc from grpc.experimental import aio from tests_aio.unit._test_base import AioTestBase +from tests.unit.framework.common import test_constants + _SIMPLE_UNARY_UNARY = '/test/SimpleUnaryUnary' _BLOCK_FOREVER = '/test/BlockForever' +_BLOCK_SHORTLY = '/test/BlockShortly' _REQUEST = b'\x00\x00\x00' _RESPONSE = b'\x01\x01\x01' -async def _unary_unary(unused_request, unused_context): - return _RESPONSE +class _GenericHandler(grpc.GenericRpcHandler): + def __init__(self): + self._called = asyncio.get_event_loop().create_future() + @staticmethod + async def _unary_unary(unused_request, unused_context): + return _RESPONSE -async def _block_forever(unused_request, unused_context): - await asyncio.get_event_loop().create_future() + async def _block_forever(self, unused_request, unused_context): + await asyncio.get_event_loop().create_future() -class _GenericHandler(grpc.GenericRpcHandler): + async def _block_shortly(self, unused_request, unused_context): + await asyncio.sleep(test_constants.SHORT_TIMEOUT/2) + return _RESPONSE def service(self, handler_details): + self._called.set_result(None) if handler_details.method == _SIMPLE_UNARY_UNARY: - return grpc.unary_unary_rpc_method_handler(_unary_unary) + return grpc.unary_unary_rpc_method_handler(self._unary_unary) if handler_details.method == _BLOCK_FOREVER: - return grpc.unary_unary_rpc_method_handler(_block_forever) + return grpc.unary_unary_rpc_method_handler(self._block_forever) + if handler_details.method == _BLOCK_SHORTLY: + return grpc.unary_unary_rpc_method_handler(self._block_shortly) + + async def wait_for_call(self): + await self._called + + +async def _start_test_server(): + server = aio.server() + port = server.add_insecure_port('[::]:0') + generic_handler = _GenericHandler() + server.add_generic_rpc_handlers((generic_handler,)) + await server.start() + return 'localhost:%d' % port, server, generic_handler class TestServer(AioTestBase): def test_unary_unary(self): - async def test_unary_unary_body(): - server = aio.server() - port = server.add_insecure_port('[::]:0') - server.add_generic_rpc_handlers((_GenericHandler(),)) - await server.start() + server_target, _, _ = await _start_test_server() - async with aio.insecure_channel('localhost:%d' % port) as channel: + async with aio.insecure_channel(server_target) as channel: unary_call = channel.unary_unary(_SIMPLE_UNARY_UNARY) response = await unary_call(_REQUEST) self.assertEqual(response, _RESPONSE) @@ -61,55 +81,63 @@ class TestServer(AioTestBase): self.loop.run_until_complete(test_unary_unary_body()) def test_shutdown(self): - async def test_shutdown_body(): - server = aio.server() - port = server.add_insecure_port('[::]:0') - await server.start() + _, server, _ = await _start_test_server() await server.stop(None) self.loop.run_until_complete(test_shutdown_body()) def test_shutdown_after_call(self): - async def test_shutdown_body(): - server = aio.server() - port = server.add_insecure_port('[::]:0') - server.add_generic_rpc_handlers((_GenericHandler(),)) - await server.start() + server_target, server, _ = await _start_test_server() - async with aio.insecure_channel('localhost:%d' % port) as channel: + async with aio.insecure_channel(server_target) as channel: await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST) await server.stop(None) self.loop.run_until_complete(test_shutdown_body()) - def test_shutdown_during_call(self): + def test_graceful_shutdown_success(self): + async def test_graceful_shutdown_success_body(): + server_target, server, generic_handler = await _start_test_server() - async def test_shutdown_body(): - server = aio.server() - port = server.add_insecure_port('[::]:0') - server.add_generic_rpc_handlers((_GenericHandler(),)) - await server.start() + channel = aio.insecure_channel(server_target) + call_task = self.loop.create_task(channel.unary_unary(_BLOCK_SHORTLY)(_REQUEST)) + await generic_handler.wait_for_call() - async with aio.insecure_channel('localhost:%d' % port) as channel: - self.loop.create_task(channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) - await asyncio.sleep(0) + await server.stop(test_constants.SHORT_TIMEOUT) + await channel.close() + self.assertEqual(await call_task, _RESPONSE) + self.assertTrue(call_task.done()) + self.loop.run_until_complete(test_graceful_shutdown_success_body()) - await server.stop(None) - self.loop.run_until_complete(test_shutdown_body()) + def test_graceful_shutdown_failed(self): + async def test_graceful_shutdown_failed_body(): + server_target, server, generic_handler = await _start_test_server() + channel = aio.insecure_channel(server_target) + call_task = self.loop.create_task(channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) + await generic_handler.wait_for_call() + + await server.stop(test_constants.SHORT_TIMEOUT) + + with self.assertRaises(aio.AioRpcError) as exception_context: + await call_task + self.assertEqual(exception_context.exception.code(), grpc.StatusCode.UNAVAILABLE) + self.assertIn('GOAWAY', exception_context.exception.details()) + await channel.close() + self.loop.run_until_complete(test_graceful_shutdown_failed_body()) + + @unittest.skip('https://github.com/grpc/grpc/issues/20818') def test_shutdown_before_call(self): async def test_shutdown_body(): - server = aio.server() - port = server.add_insecure_port('[::]:0') - server.add_generic_rpc_handlers((_GenericHandler(),)) - await server.start() + server_target, server, _ =_start_test_server() await server.stop(None) + # Ensures the server is cleaned up at this point. + # Some proper exception should be raised. async with aio.insecure_channel('localhost:%d' % port) as channel: await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST) - self.loop.run_until_complete(test_shutdown_body())