diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel index 916086731e7..e3cb89a81d3 100644 --- a/src/python/grpcio/grpc/_cython/BUILD.bazel +++ b/src/python/grpcio/grpc/_cython/BUILD.bazel @@ -10,8 +10,6 @@ pyx_library( "_cygrpc/_hooks.pyx.pxi", "_cygrpc/aio/call.pxd.pxi", "_cygrpc/aio/call.pyx.pxi", - "_cygrpc/aio/rpc_error.pxd.pxi", - "_cygrpc/aio/rpc_error.pyx.pxi", "_cygrpc/aio/callbackcontext.pxd.pxi", "_cygrpc/aio/channel.pxd.pxi", "_cygrpc/aio/channel.pyx.pxi", diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi index f6acc199483..d67d24fcb0b 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -13,15 +13,15 @@ # limitations under the License. cimport cpython -import grpc _EMPTY_FLAGS = 0 -_EMPTY_METADATA = None +_EMPTY_METADATA = () _OP_ARRAY_LENGTH = 6 cdef class _AioCall: + def __cinit__(self, AioChannel channel): self._channel = channel self._functor.functor_run = _AioCall.functor_run @@ -59,7 +59,7 @@ cdef class _AioCall: else: call._waiter_call.set_result(None) - async def unary_unary(self, method, request, timeout): + async def unary_unary(self, method, request): cdef grpc_call * call cdef grpc_slice method_slice cdef grpc_op * ops @@ -72,7 +72,7 @@ cdef class _AioCall: cdef Operation receive_status_on_client_operation cdef grpc_call_error call_status - cdef gpr_timespec deadline = _timespec_from_time(timeout) + method_slice = grpc_slice_from_copied_buffer( method, @@ -86,7 +86,7 @@ cdef class _AioCall: self._cq, method_slice, NULL, - deadline, + _timespec_from_time(None), NULL ) @@ -146,12 +146,4 @@ cdef class _AioCall: grpc_call_unref(call) gpr_free(ops) - if receive_status_on_client_operation.code() == grpc._cygrpc.StatusCode.ok: - return receive_message_operation.message() - - raise grpc.experimental.aio.AioRpcError( - receive_initial_metadata_operation.initial_metadata(), - receive_status_on_client_operation.code(), - receive_status_on_client_operation.details(), - receive_status_on_client_operation.trailing_metadata(), - ) + return receive_message_operation.message() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi index cbcd4553864..b52c070553d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi @@ -18,13 +18,13 @@ cdef class AioChannel: self._target = target def __repr__(self): - class_name = self.__class__.__name__ + class_name = self.__class__.__name__ id_ = id(self) return f"<{class_name} {id_}>" def close(self): grpc_channel_destroy(self.channel) - async def unary_unary(self, method, request, timeout): + async def unary_unary(self, method, request): call = _AioCall(self) - return await call.unary_unary(method, request, timeout) + return await call.unary_unary(method, request) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pxd.pxi deleted file mode 100644 index 5772751a3b6..00000000000 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pxd.pxi +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Exceptions for the aio version of the RPC calls.""" - - -cdef class _AioRpcError(Exception): - cdef readonly: - tuple _initial_metadata - int _code - str _details - tuple _trailing_metadata - - cpdef tuple initial_metadata(self) - cpdef int code(self) - cpdef str details(self) - cpdef tuple trailing_metadata(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pyx.pxi deleted file mode 100644 index 95b9144eff9..00000000000 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_error.pyx.pxi +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Exceptions for the aio version of the RPC calls.""" - - -cdef class _AioRpcError(Exception): - - def __cinit__(self, tuple initial_metadata, int code, str details, tuple trailing_metadata): - self._initial_metadata = initial_metadata - self._code = code - self._details = details - self._trailing_metadata = trailing_metadata - - cpdef tuple initial_metadata(self): - return self._initial_metadata - - cpdef int code(self): - return self._code - - cpdef str details(self): - return self._details - - cpdef tuple trailing_metadata(self): - return self._trailing_metadata diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index c4635be72d3..316fb993095 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -63,7 +63,6 @@ include "_cygrpc/aio/iomgr/resolver.pyx.pxi" include "_cygrpc/aio/grpc_aio.pyx.pxi" include "_cygrpc/aio/call.pyx.pxi" include "_cygrpc/aio/channel.pyx.pxi" -include "_cygrpc/aio/rpc_error.pyx.pxi" # diff --git a/src/python/grpcio/grpc/experimental/aio/__init__.py b/src/python/grpcio/grpc/experimental/aio/__init__.py index b94343b8703..6004126549b 100644 --- a/src/python/grpcio/grpc/experimental/aio/__init__.py +++ b/src/python/grpcio/grpc/experimental/aio/__init__.py @@ -14,11 +14,8 @@ """gRPC's Asynchronous Python API.""" import abc -import types import six -import grpc -from grpc._cython import cygrpc from grpc._cython.cygrpc import init_grpc_aio @@ -77,7 +74,6 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): @abc.abstractmethod async def __call__(self, request, - *, timeout=None, metadata=None, credentials=None, @@ -125,25 +121,3 @@ def insecure_channel(target, options=None, compression=None): from grpc.experimental.aio import _channel # pylint: disable=cyclic-import return _channel.Channel(target, () if options is None else options, None, compression) - - -class _AioRpcError: - """Private implementation of AioRpcError""" - - -class AioRpcError: - """An RpcError to be used by the asynchronous API. - - Parent classes: (cygrpc._AioRpcError, RpcError) - """ - # Dynamically registered as subclass of _AioRpcError and RpcError, because the former one is - # only available after the cython code has been compiled. - _class_built = _AioRpcError - - def __new__(cls, *args, **kwargs): - if cls._class_built is _AioRpcError: - cls._class_built = types.new_class( - "AioRpcError", (cygrpc._AioRpcError, grpc.RpcError)) - cls._class_built.__doc__ = cls.__doc__ - - return cls._class_built(*args, **kwargs) diff --git a/src/python/grpcio/grpc/experimental/aio/_channel.py b/src/python/grpcio/grpc/experimental/aio/_channel.py index 4ded2eb1c09..e3c8fcdbf2f 100644 --- a/src/python/grpcio/grpc/experimental/aio/_channel.py +++ b/src/python/grpcio/grpc/experimental/aio/_channel.py @@ -12,42 +12,32 @@ # See the License for the specific language governing permissions and # limitations under the License. """Invocation-side implementation of gRPC Asyncio Python.""" -import asyncio -from typing import Callable, Optional from grpc import _common from grpc._cython import cygrpc from grpc.experimental import aio -SerializingFunction = Callable[[str], bytes] -DeserializingFunction = Callable[[bytes], str] - class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable): - def __init__(self, channel: cygrpc.AioChannel, method: bytes, - request_serializer: SerializingFunction, - response_deserializer: DeserializingFunction) -> None: + def __init__(self, channel, method, request_serializer, + response_deserializer): self._channel = channel self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer - self._loop = asyncio.get_event_loop() - - def _timeout_to_deadline(self, timeout: int) -> Optional[int]: - if timeout is None: - return None - return self._loop.time() + timeout async def __call__(self, request, - *, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None): + if timeout: + raise NotImplementedError("TODO: timeout not implemented yet") + if metadata: raise NotImplementedError("TODO: metadata not implemented yet") @@ -61,11 +51,9 @@ class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable): if compression: raise NotImplementedError("TODO: compression not implemented yet") - serialized_request = _common.serialize(request, - self._request_serializer) - timeout = self._timeout_to_deadline(timeout) - response = await self._channel.unary_unary(self._method, - serialized_request, timeout) + response = await self._channel.unary_unary( + self._method, _common.serialize(request, self._request_serializer)) + return _common.deserialize(response, self._response_deserializer) diff --git a/src/python/grpcio_tests/tests_aio/tests.json b/src/python/grpcio_tests/tests_aio/tests.json index 23439717b18..49d025a5abe 100644 --- a/src/python/grpcio_tests/tests_aio/tests.json +++ b/src/python/grpcio_tests/tests_aio/tests.json @@ -1,6 +1,5 @@ [ "_sanity._sanity_test.AioSanityTest", "unit.channel_test.TestChannel", - "unit.init_test.TestAioRpcError", "unit.init_test.TestInsecureChannel" ] diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index cdf7a4cd49a..6bc53ec625e 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -15,12 +15,9 @@ import logging import unittest -import grpc - from grpc.experimental import aio from tests_aio.unit import test_base from src.proto.grpc.testing import messages_pb2 -from tests.unit.framework.common import test_constants class TestChannel(test_base.AioTestBase): @@ -55,36 +52,6 @@ class TestChannel(test_base.AioTestBase): self.loop.run_until_complete(coro()) - def test_unary_call_times_out(self): - - async def coro(): - async with aio.insecure_channel(self.server_target) as channel: - empty_call_with_sleep = channel.unary_unary( - "/grpc.testing.TestService/EmptyCall", - request_serializer=messages_pb2.SimpleRequest. - SerializeToString, - response_deserializer=messages_pb2.SimpleResponse. - FromString, - ) - timeout = test_constants.SHORT_TIMEOUT / 2 - # TODO: Update once the async server is ready, change the synchronization mechanism by removing the - # sleep() as both components (client & server) will be on the same process. - with self.assertRaises(grpc.RpcError) as exception_context: - await empty_call_with_sleep( - messages_pb2.SimpleRequest(), timeout=timeout) - - status_code, details = grpc.StatusCode.DEADLINE_EXCEEDED.value - self.assertEqual(exception_context.exception.code(), - status_code) - self.assertEqual(exception_context.exception.details(), - details.title()) - self.assertIsNotNone( - exception_context.exception.initial_metadata()) - self.assertIsNotNone( - exception_context.exception.trailing_metadata()) - - self.loop.run_until_complete(coro()) - if __name__ == '__main__': logging.basicConfig() diff --git a/src/python/grpcio_tests/tests_aio/unit/init_test.py b/src/python/grpcio_tests/tests_aio/unit/init_test.py index 8371d44574e..ab580f18e11 100644 --- a/src/python/grpcio_tests/tests_aio/unit/init_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/init_test.py @@ -15,50 +15,10 @@ import logging import unittest -import grpc from grpc.experimental import aio from tests_aio.unit import test_base -class TestAioRpcError(unittest.TestCase): - _TEST_INITIAL_METADATA = ("initial metadata",) - _TEST_TRAILING_METADATA = ("trailing metadata",) - - def test_attributes(self): - aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, - "details", self._TEST_TRAILING_METADATA) - self.assertEqual(aio_rpc_error.initial_metadata(), - self._TEST_INITIAL_METADATA) - self.assertEqual(aio_rpc_error.code(), 0) - self.assertEqual(aio_rpc_error.details(), "details") - self.assertEqual(aio_rpc_error.trailing_metadata(), - self._TEST_TRAILING_METADATA) - - def test_class_hierarchy(self): - aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, - "details", self._TEST_TRAILING_METADATA) - - self.assertIsInstance(aio_rpc_error, grpc.RpcError) - - def test_class_attributes(self): - aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, - "details", self._TEST_TRAILING_METADATA) - self.assertEqual(aio_rpc_error.__class__.__name__, "AioRpcError") - self.assertEqual(aio_rpc_error.__class__.__doc__, - aio.AioRpcError.__doc__) - - def test_class_singleton(self): - first_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, - "details", - self._TEST_TRAILING_METADATA) - second_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, - "details", - self._TEST_TRAILING_METADATA) - - self.assertIs(first_aio_rpc_error.__class__, - second_aio_rpc_error.__class__) - - class TestInsecureChannel(test_base.AioTestBase): def test_insecure_channel(self): diff --git a/src/python/grpcio_tests/tests_aio/unit/sync_server.py b/src/python/grpcio_tests/tests_aio/unit/sync_server.py index 82def8ed4c0..105ded8e76c 100644 --- a/src/python/grpcio_tests/tests_aio/unit/sync_server.py +++ b/src/python/grpcio_tests/tests_aio/unit/sync_server.py @@ -20,7 +20,6 @@ from time import sleep import grpc from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import test_pb2_grpc -from tests.unit.framework.common import test_constants # TODO (https://github.com/grpc/grpc/issues/19762) @@ -30,10 +29,6 @@ class TestServiceServicer(test_pb2_grpc.TestServiceServicer): def UnaryCall(self, request, context): return messages_pb2.SimpleResponse() - def EmptyCall(self, request, context): - while True: - sleep(test_constants.LONG_TIMEOUT) - if __name__ == "__main__": parser = argparse.ArgumentParser(description='Synchronous gRPC server.')