Merge pull request #20277 from Skyscanner/grpc-19871/unary_unary_client_aio-implement-timeout

Grpc 19871/unary unary client aio implement timeout
pull/20570/head^2
Lidi Zheng 5 years ago committed by GitHub
commit 308ba0c254
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/python/grpcio/grpc/_cython/BUILD.bazel
  2. 20
      src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi
  3. 4
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi
  4. 27
      src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pxd.pxi
  5. 35
      src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pyx.pxi
  6. 1
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  7. 26
      src/python/grpcio/grpc/experimental/aio/__init__.py
  8. 28
      src/python/grpcio/grpc/experimental/aio/_channel.py
  9. 1
      src/python/grpcio_tests/tests_aio/tests.json
  10. 33
      src/python/grpcio_tests/tests_aio/unit/channel_test.py
  11. 40
      src/python/grpcio_tests/tests_aio/unit/init_test.py
  12. 5
      src/python/grpcio_tests/tests_aio/unit/sync_server.py

@ -10,6 +10,8 @@ pyx_library(
"_cygrpc/_hooks.pyx.pxi",
"_cygrpc/aio/call.pxd.pxi",
"_cygrpc/aio/call.pyx.pxi",
"_cygrpc/aio/rpc_error.pxd.pxi",
"_cygrpc/aio/rpc_error.pyx.pxi",
"_cygrpc/aio/callbackcontext.pxd.pxi",
"_cygrpc/aio/channel.pxd.pxi",
"_cygrpc/aio/channel.pyx.pxi",

@ -13,15 +13,15 @@
# limitations under the License.
cimport cpython
import grpc
_EMPTY_FLAGS = 0
_EMPTY_METADATA = ()
_EMPTY_METADATA = None
_OP_ARRAY_LENGTH = 6
cdef class _AioCall:
def __cinit__(self, AioChannel channel):
self._channel = channel
self._functor.functor_run = _AioCall.functor_run
@ -59,7 +59,7 @@ cdef class _AioCall:
else:
call._waiter_call.set_result(None)
async def unary_unary(self, method, request):
async def unary_unary(self, method, request, timeout):
cdef grpc_call * call
cdef grpc_slice method_slice
cdef grpc_op * ops
@ -72,7 +72,7 @@ cdef class _AioCall:
cdef Operation receive_status_on_client_operation
cdef grpc_call_error call_status
cdef gpr_timespec deadline = _timespec_from_time(timeout)
method_slice = grpc_slice_from_copied_buffer(
<const char *> method,
@ -86,7 +86,7 @@ cdef class _AioCall:
self._cq,
method_slice,
NULL,
_timespec_from_time(None),
deadline,
NULL
)
@ -146,4 +146,12 @@ cdef class _AioCall:
grpc_call_unref(call)
gpr_free(ops)
return receive_message_operation.message()
if receive_status_on_client_operation.code() == grpc._cygrpc.StatusCode.ok:
return receive_message_operation.message()
raise grpc.experimental.aio.AioRpcError(
receive_initial_metadata_operation.initial_metadata(),
receive_status_on_client_operation.code(),
receive_status_on_client_operation.details(),
receive_status_on_client_operation.trailing_metadata(),
)

@ -25,6 +25,6 @@ cdef class AioChannel:
def close(self):
grpc_channel_destroy(self.channel)
async def unary_unary(self, method, request):
async def unary_unary(self, method, request, timeout):
call = _AioCall(self)
return await call.unary_unary(method, request)
return await call.unary_unary(method, request, timeout)

@ -0,0 +1,27 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exceptions for the aio version of the RPC calls."""
cdef class _AioRpcError(Exception):
cdef readonly:
tuple _initial_metadata
int _code
str _details
tuple _trailing_metadata
cpdef tuple initial_metadata(self)
cpdef int code(self)
cpdef str details(self)
cpdef tuple trailing_metadata(self)

@ -0,0 +1,35 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exceptions for the aio version of the RPC calls."""
cdef class _AioRpcError(Exception):
def __cinit__(self, tuple initial_metadata, int code, str details, tuple trailing_metadata):
self._initial_metadata = initial_metadata
self._code = code
self._details = details
self._trailing_metadata = trailing_metadata
cpdef tuple initial_metadata(self):
return self._initial_metadata
cpdef int code(self):
return self._code
cpdef str details(self):
return self._details
cpdef tuple trailing_metadata(self):
return self._trailing_metadata

@ -63,6 +63,7 @@ include "_cygrpc/aio/iomgr/resolver.pyx.pxi"
include "_cygrpc/aio/grpc_aio.pyx.pxi"
include "_cygrpc/aio/call.pyx.pxi"
include "_cygrpc/aio/channel.pyx.pxi"
include "_cygrpc/aio/rpc_error.pyx.pxi"
#

@ -14,8 +14,11 @@
"""gRPC's Asynchronous Python API."""
import abc
import types
import six
import grpc
from grpc._cython import cygrpc
from grpc._cython.cygrpc import init_grpc_aio
@ -74,6 +77,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
@abc.abstractmethod
async def __call__(self,
request,
*,
timeout=None,
metadata=None,
credentials=None,
@ -121,3 +125,25 @@ def insecure_channel(target, options=None, compression=None):
from grpc.experimental.aio import _channel # pylint: disable=cyclic-import
return _channel.Channel(target, ()
if options is None else options, None, compression)
class _AioRpcError:
"""Private implementation of AioRpcError"""
class AioRpcError:
"""An RpcError to be used by the asynchronous API.
Parent classes: (cygrpc._AioRpcError, RpcError)
"""
# Dynamically registered as subclass of _AioRpcError and RpcError, because the former one is
# only available after the cython code has been compiled.
_class_built = _AioRpcError
def __new__(cls, *args, **kwargs):
if cls._class_built is _AioRpcError:
cls._class_built = types.new_class(
"AioRpcError", (cygrpc._AioRpcError, grpc.RpcError))
cls._class_built.__doc__ = cls.__doc__
return cls._class_built(*args, **kwargs)

@ -12,32 +12,42 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Invocation-side implementation of gRPC Asyncio Python."""
import asyncio
from typing import Callable, Optional
from grpc import _common
from grpc._cython import cygrpc
from grpc.experimental import aio
SerializingFunction = Callable[[str], bytes]
DeserializingFunction = Callable[[bytes], str]
class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable):
def __init__(self, channel, method, request_serializer,
response_deserializer):
def __init__(self, channel: cygrpc.AioChannel, method: bytes,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction) -> None:
self._channel = channel
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._loop = asyncio.get_event_loop()
def _timeout_to_deadline(self, timeout: int) -> Optional[int]:
if timeout is None:
return None
return self._loop.time() + timeout
async def __call__(self,
request,
*,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None):
if timeout:
raise NotImplementedError("TODO: timeout not implemented yet")
if metadata:
raise NotImplementedError("TODO: metadata not implemented yet")
@ -51,9 +61,11 @@ class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable):
if compression:
raise NotImplementedError("TODO: compression not implemented yet")
response = await self._channel.unary_unary(
self._method, _common.serialize(request, self._request_serializer))
serialized_request = _common.serialize(request,
self._request_serializer)
timeout = self._timeout_to_deadline(timeout)
response = await self._channel.unary_unary(self._method,
serialized_request, timeout)
return _common.deserialize(response, self._response_deserializer)

@ -1,5 +1,6 @@
[
"_sanity._sanity_test.AioSanityTest",
"unit.channel_test.TestChannel",
"unit.init_test.TestAioRpcError",
"unit.init_test.TestInsecureChannel"
]

@ -15,9 +15,12 @@
import logging
import unittest
import grpc
from grpc.experimental import aio
from tests_aio.unit import test_base
from src.proto.grpc.testing import messages_pb2
from tests.unit.framework.common import test_constants
class TestChannel(test_base.AioTestBase):
@ -52,6 +55,36 @@ class TestChannel(test_base.AioTestBase):
self.loop.run_until_complete(coro())
def test_unary_call_times_out(self):
async def coro():
async with aio.insecure_channel(self.server_target) as channel:
empty_call_with_sleep = channel.unary_unary(
"/grpc.testing.TestService/EmptyCall",
request_serializer=messages_pb2.SimpleRequest.
SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.
FromString,
)
timeout = test_constants.SHORT_TIMEOUT / 2
# TODO: Update once the async server is ready, change the synchronization mechanism by removing the
# sleep(<timeout>) as both components (client & server) will be on the same process.
with self.assertRaises(grpc.RpcError) as exception_context:
await empty_call_with_sleep(
messages_pb2.SimpleRequest(), timeout=timeout)
status_code, details = grpc.StatusCode.DEADLINE_EXCEEDED.value
self.assertEqual(exception_context.exception.code(),
status_code)
self.assertEqual(exception_context.exception.details(),
details.title())
self.assertIsNotNone(
exception_context.exception.initial_metadata())
self.assertIsNotNone(
exception_context.exception.trailing_metadata())
self.loop.run_until_complete(coro())
if __name__ == '__main__':
logging.basicConfig()

@ -15,10 +15,50 @@
import logging
import unittest
import grpc
from grpc.experimental import aio
from tests_aio.unit import test_base
class TestAioRpcError(unittest.TestCase):
_TEST_INITIAL_METADATA = ("initial metadata",)
_TEST_TRAILING_METADATA = ("trailing metadata",)
def test_attributes(self):
aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0,
"details", self._TEST_TRAILING_METADATA)
self.assertEqual(aio_rpc_error.initial_metadata(),
self._TEST_INITIAL_METADATA)
self.assertEqual(aio_rpc_error.code(), 0)
self.assertEqual(aio_rpc_error.details(), "details")
self.assertEqual(aio_rpc_error.trailing_metadata(),
self._TEST_TRAILING_METADATA)
def test_class_hierarchy(self):
aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0,
"details", self._TEST_TRAILING_METADATA)
self.assertIsInstance(aio_rpc_error, grpc.RpcError)
def test_class_attributes(self):
aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0,
"details", self._TEST_TRAILING_METADATA)
self.assertEqual(aio_rpc_error.__class__.__name__, "AioRpcError")
self.assertEqual(aio_rpc_error.__class__.__doc__,
aio.AioRpcError.__doc__)
def test_class_singleton(self):
first_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0,
"details",
self._TEST_TRAILING_METADATA)
second_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0,
"details",
self._TEST_TRAILING_METADATA)
self.assertIs(first_aio_rpc_error.__class__,
second_aio_rpc_error.__class__)
class TestInsecureChannel(test_base.AioTestBase):
def test_insecure_channel(self):

@ -20,6 +20,7 @@ from time import sleep
import grpc
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
from tests.unit.framework.common import test_constants
# TODO (https://github.com/grpc/grpc/issues/19762)
@ -29,6 +30,10 @@ class TestServiceServicer(test_pb2_grpc.TestServiceServicer):
def UnaryCall(self, request, context):
return messages_pb2.SimpleResponse()
def EmptyCall(self, request, context):
while True:
sleep(test_constants.LONG_TIMEOUT)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Synchronous gRPC server.')

Loading…
Cancel
Save