From fb3911f243a1942b3e62967560ee48ec55c23b30 Mon Sep 17 00:00:00 2001 From: Mariano Anaya Date: Mon, 16 Sep 2019 14:36:17 +0200 Subject: [PATCH] Implement timeout for the async unary_unary call Allow passing the ``timeout`` parameter to the asynchronous version of the ``unary_unary`` call, and use it accordingly. Maintains the same interface as the synchronous version. Other changes: * Remove default parameters from the internal API methods * Make keyword-only arguments in the external-facing public API Create new exception: ``AioRpcError``. Define the exception in Cython, exposing a similar interface that the one returned by the synchronous API (``grpc.RpcError``). Then mix the class with the ``grpc.RpcError``, dynamically: this can only be done at run-time because it's not possible to use the Cython class until all Cython code has been compiled, which happens after the ``grpc`` module has been loaded. The new ``AioRpcError`` exception lives inside the ``experimental`` module. Fixes https://github.com/grpc/grpc/issues/19871 --- src/python/grpcio/grpc/_cython/BUILD.bazel | 2 + .../grpc/_cython/_cygrpc/aio/call.pyx.pxi | 20 +++++++--- .../grpc/_cython/_cygrpc/aio/channel.pyx.pxi | 6 +-- .../_cython/_cygrpc/aio/rpc_error.pxd.pxi | 27 +++++++++++++ .../_cython/_cygrpc/aio/rpc_error.pyx.pxi | 35 ++++++++++++++++ src/python/grpcio/grpc/_cython/cygrpc.pyx | 1 + .../grpcio/grpc/experimental/aio/__init__.py | 26 ++++++++++++ .../grpcio/grpc/experimental/aio/_channel.py | 28 +++++++++---- src/python/grpcio_tests/tests_aio/tests.json | 1 + .../tests_aio/unit/channel_test.py | 33 +++++++++++++++ .../grpcio_tests/tests_aio/unit/init_test.py | 40 +++++++++++++++++++ .../tests_aio/unit/sync_server.py | 5 +++ 12 files changed, 207 insertions(+), 17 deletions(-) create mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pxd.pxi create mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pyx.pxi diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel index e3cb89a81d3..916086731e7 100644 --- a/src/python/grpcio/grpc/_cython/BUILD.bazel +++ b/src/python/grpcio/grpc/_cython/BUILD.bazel @@ -10,6 +10,8 @@ pyx_library( "_cygrpc/_hooks.pyx.pxi", "_cygrpc/aio/call.pxd.pxi", "_cygrpc/aio/call.pyx.pxi", + "_cygrpc/aio/rpc_error.pxd.pxi", + "_cygrpc/aio/rpc_error.pyx.pxi", "_cygrpc/aio/callbackcontext.pxd.pxi", "_cygrpc/aio/channel.pxd.pxi", "_cygrpc/aio/channel.pyx.pxi", diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi index d67d24fcb0b..f6acc199483 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -13,15 +13,15 @@ # limitations under the License. cimport cpython +import grpc _EMPTY_FLAGS = 0 -_EMPTY_METADATA = () +_EMPTY_METADATA = None _OP_ARRAY_LENGTH = 6 cdef class _AioCall: - def __cinit__(self, AioChannel channel): self._channel = channel self._functor.functor_run = _AioCall.functor_run @@ -59,7 +59,7 @@ cdef class _AioCall: else: call._waiter_call.set_result(None) - async def unary_unary(self, method, request): + async def unary_unary(self, method, request, timeout): cdef grpc_call * call cdef grpc_slice method_slice cdef grpc_op * ops @@ -72,7 +72,7 @@ cdef class _AioCall: cdef Operation receive_status_on_client_operation cdef grpc_call_error call_status - + cdef gpr_timespec deadline = _timespec_from_time(timeout) method_slice = grpc_slice_from_copied_buffer( method, @@ -86,7 +86,7 @@ cdef class _AioCall: self._cq, method_slice, NULL, - _timespec_from_time(None), + deadline, NULL ) @@ -146,4 +146,12 @@ cdef class _AioCall: grpc_call_unref(call) gpr_free(ops) - return receive_message_operation.message() + if receive_status_on_client_operation.code() == grpc._cygrpc.StatusCode.ok: + return receive_message_operation.message() + + raise grpc.experimental.aio.AioRpcError( + receive_initial_metadata_operation.initial_metadata(), + receive_status_on_client_operation.code(), + receive_status_on_client_operation.details(), + receive_status_on_client_operation.trailing_metadata(), + ) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi index b52c070553d..cbcd4553864 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi @@ -18,13 +18,13 @@ cdef class AioChannel: self._target = target def __repr__(self): - class_name = self.__class__.__name__ + class_name = self.__class__.__name__ id_ = id(self) return f"<{class_name} {id_}>" def close(self): grpc_channel_destroy(self.channel) - async def unary_unary(self, method, request): + async def unary_unary(self, method, request, timeout): call = _AioCall(self) - return await call.unary_unary(method, request) + return await call.unary_unary(method, request, timeout) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pxd.pxi new file mode 100644 index 00000000000..5772751a3b6 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pxd.pxi @@ -0,0 +1,27 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Exceptions for the aio version of the RPC calls.""" + + +cdef class _AioRpcError(Exception): + cdef readonly: + tuple _initial_metadata + int _code + str _details + tuple _trailing_metadata + + cpdef tuple initial_metadata(self) + cpdef int code(self) + cpdef str details(self) + cpdef tuple trailing_metadata(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pyx.pxi new file mode 100644 index 00000000000..95b9144eff9 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pyx.pxi @@ -0,0 +1,35 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Exceptions for the aio version of the RPC calls.""" + + +cdef class _AioRpcError(Exception): + + def __cinit__(self, tuple initial_metadata, int code, str details, tuple trailing_metadata): + self._initial_metadata = initial_metadata + self._code = code + self._details = details + self._trailing_metadata = trailing_metadata + + cpdef tuple initial_metadata(self): + return self._initial_metadata + + cpdef int code(self): + return self._code + + cpdef str details(self): + return self._details + + cpdef tuple trailing_metadata(self): + return self._trailing_metadata diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 316fb993095..c4635be72d3 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -63,6 +63,7 @@ include "_cygrpc/aio/iomgr/resolver.pyx.pxi" include "_cygrpc/aio/grpc_aio.pyx.pxi" include "_cygrpc/aio/call.pyx.pxi" include "_cygrpc/aio/channel.pyx.pxi" +include "_cygrpc/aio/rpc_error.pyx.pxi" # diff --git a/src/python/grpcio/grpc/experimental/aio/__init__.py b/src/python/grpcio/grpc/experimental/aio/__init__.py index 6004126549b..b94343b8703 100644 --- a/src/python/grpcio/grpc/experimental/aio/__init__.py +++ b/src/python/grpcio/grpc/experimental/aio/__init__.py @@ -14,8 +14,11 @@ """gRPC's Asynchronous Python API.""" import abc +import types import six +import grpc +from grpc._cython import cygrpc from grpc._cython.cygrpc import init_grpc_aio @@ -74,6 +77,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): @abc.abstractmethod async def __call__(self, request, + *, timeout=None, metadata=None, credentials=None, @@ -121,3 +125,25 @@ def insecure_channel(target, options=None, compression=None): from grpc.experimental.aio import _channel # pylint: disable=cyclic-import return _channel.Channel(target, () if options is None else options, None, compression) + + +class _AioRpcError: + """Private implementation of AioRpcError""" + + +class AioRpcError: + """An RpcError to be used by the asynchronous API. + + Parent classes: (cygrpc._AioRpcError, RpcError) + """ + # Dynamically registered as subclass of _AioRpcError and RpcError, because the former one is + # only available after the cython code has been compiled. + _class_built = _AioRpcError + + def __new__(cls, *args, **kwargs): + if cls._class_built is _AioRpcError: + cls._class_built = types.new_class( + "AioRpcError", (cygrpc._AioRpcError, grpc.RpcError)) + cls._class_built.__doc__ = cls.__doc__ + + return cls._class_built(*args, **kwargs) diff --git a/src/python/grpcio/grpc/experimental/aio/_channel.py b/src/python/grpcio/grpc/experimental/aio/_channel.py index e3c8fcdbf2f..4ded2eb1c09 100644 --- a/src/python/grpcio/grpc/experimental/aio/_channel.py +++ b/src/python/grpcio/grpc/experimental/aio/_channel.py @@ -12,32 +12,42 @@ # See the License for the specific language governing permissions and # limitations under the License. """Invocation-side implementation of gRPC Asyncio Python.""" +import asyncio +from typing import Callable, Optional from grpc import _common from grpc._cython import cygrpc from grpc.experimental import aio +SerializingFunction = Callable[[str], bytes] +DeserializingFunction = Callable[[bytes], str] + class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable): - def __init__(self, channel, method, request_serializer, - response_deserializer): + def __init__(self, channel: cygrpc.AioChannel, method: bytes, + request_serializer: SerializingFunction, + response_deserializer: DeserializingFunction) -> None: self._channel = channel self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._loop = asyncio.get_event_loop() + + def _timeout_to_deadline(self, timeout: int) -> Optional[int]: + if timeout is None: + return None + return self._loop.time() + timeout async def __call__(self, request, + *, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None): - if timeout: - raise NotImplementedError("TODO: timeout not implemented yet") - if metadata: raise NotImplementedError("TODO: metadata not implemented yet") @@ -51,9 +61,11 @@ class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable): if compression: raise NotImplementedError("TODO: compression not implemented yet") - response = await self._channel.unary_unary( - self._method, _common.serialize(request, self._request_serializer)) - + serialized_request = _common.serialize(request, + self._request_serializer) + timeout = self._timeout_to_deadline(timeout) + response = await self._channel.unary_unary(self._method, + serialized_request, timeout) return _common.deserialize(response, self._response_deserializer) diff --git a/src/python/grpcio_tests/tests_aio/tests.json b/src/python/grpcio_tests/tests_aio/tests.json index 49d025a5abe..23439717b18 100644 --- a/src/python/grpcio_tests/tests_aio/tests.json +++ b/src/python/grpcio_tests/tests_aio/tests.json @@ -1,5 +1,6 @@ [ "_sanity._sanity_test.AioSanityTest", "unit.channel_test.TestChannel", + "unit.init_test.TestAioRpcError", "unit.init_test.TestInsecureChannel" ] diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index 6bc53ec625e..cdf7a4cd49a 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -15,9 +15,12 @@ import logging import unittest +import grpc + from grpc.experimental import aio from tests_aio.unit import test_base from src.proto.grpc.testing import messages_pb2 +from tests.unit.framework.common import test_constants class TestChannel(test_base.AioTestBase): @@ -52,6 +55,36 @@ class TestChannel(test_base.AioTestBase): self.loop.run_until_complete(coro()) + def test_unary_call_times_out(self): + + async def coro(): + async with aio.insecure_channel(self.server_target) as channel: + empty_call_with_sleep = channel.unary_unary( + "/grpc.testing.TestService/EmptyCall", + request_serializer=messages_pb2.SimpleRequest. + SerializeToString, + response_deserializer=messages_pb2.SimpleResponse. + FromString, + ) + timeout = test_constants.SHORT_TIMEOUT / 2 + # TODO: Update once the async server is ready, change the synchronization mechanism by removing the + # sleep() as both components (client & server) will be on the same process. + with self.assertRaises(grpc.RpcError) as exception_context: + await empty_call_with_sleep( + messages_pb2.SimpleRequest(), timeout=timeout) + + status_code, details = grpc.StatusCode.DEADLINE_EXCEEDED.value + self.assertEqual(exception_context.exception.code(), + status_code) + self.assertEqual(exception_context.exception.details(), + details.title()) + self.assertIsNotNone( + exception_context.exception.initial_metadata()) + self.assertIsNotNone( + exception_context.exception.trailing_metadata()) + + self.loop.run_until_complete(coro()) + if __name__ == '__main__': logging.basicConfig() diff --git a/src/python/grpcio_tests/tests_aio/unit/init_test.py b/src/python/grpcio_tests/tests_aio/unit/init_test.py index ab580f18e11..8371d44574e 100644 --- a/src/python/grpcio_tests/tests_aio/unit/init_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/init_test.py @@ -15,10 +15,50 @@ import logging import unittest +import grpc from grpc.experimental import aio from tests_aio.unit import test_base +class TestAioRpcError(unittest.TestCase): + _TEST_INITIAL_METADATA = ("initial metadata",) + _TEST_TRAILING_METADATA = ("trailing metadata",) + + def test_attributes(self): + aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", self._TEST_TRAILING_METADATA) + self.assertEqual(aio_rpc_error.initial_metadata(), + self._TEST_INITIAL_METADATA) + self.assertEqual(aio_rpc_error.code(), 0) + self.assertEqual(aio_rpc_error.details(), "details") + self.assertEqual(aio_rpc_error.trailing_metadata(), + self._TEST_TRAILING_METADATA) + + def test_class_hierarchy(self): + aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", self._TEST_TRAILING_METADATA) + + self.assertIsInstance(aio_rpc_error, grpc.RpcError) + + def test_class_attributes(self): + aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", self._TEST_TRAILING_METADATA) + self.assertEqual(aio_rpc_error.__class__.__name__, "AioRpcError") + self.assertEqual(aio_rpc_error.__class__.__doc__, + aio.AioRpcError.__doc__) + + def test_class_singleton(self): + first_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", + self._TEST_TRAILING_METADATA) + second_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", + self._TEST_TRAILING_METADATA) + + self.assertIs(first_aio_rpc_error.__class__, + second_aio_rpc_error.__class__) + + class TestInsecureChannel(test_base.AioTestBase): def test_insecure_channel(self): diff --git a/src/python/grpcio_tests/tests_aio/unit/sync_server.py b/src/python/grpcio_tests/tests_aio/unit/sync_server.py index 105ded8e76c..82def8ed4c0 100644 --- a/src/python/grpcio_tests/tests_aio/unit/sync_server.py +++ b/src/python/grpcio_tests/tests_aio/unit/sync_server.py @@ -20,6 +20,7 @@ from time import sleep import grpc from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import test_pb2_grpc +from tests.unit.framework.common import test_constants # TODO (https://github.com/grpc/grpc/issues/19762) @@ -29,6 +30,10 @@ class TestServiceServicer(test_pb2_grpc.TestServiceServicer): def UnaryCall(self, request, context): return messages_pb2.SimpleResponse() + def EmptyCall(self, request, context): + while True: + sleep(test_constants.LONG_TIMEOUT) + if __name__ == "__main__": parser = argparse.ArgumentParser(description='Synchronous gRPC server.')