From bc99ddedf0ab2e05dfbaa263577d1a42adc6fe96 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 4 Dec 2019 18:26:11 -0800 Subject: [PATCH] Adopt reviewer's advices --- .../grpc/_cython/_cygrpc/aio/call.pxd.pxi | 2 ++ .../_cython/_cygrpc/aio/iomgr/socket.pxd.pxi | 2 ++ .../_cython/_cygrpc/aio/rpc_status.pxd.pxi | 2 +- .../grpcio/grpc/experimental/aio/__init__.py | 6 ++++- .../grpc/experimental/aio/_base_call.py | 12 +++------- .../grpcio/grpc/experimental/aio/_call.py | 24 +++++++++---------- src/python/grpcio_tests/tests_aio/tests.json | 2 +- 7 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi index bdf5996bd37..3844797c50e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi @@ -18,6 +18,8 @@ cdef class _AioCall: AioChannel _channel list _references GrpcCallWrapper _grpc_call_wrapper + # Caches the picked event loop, so we can avoid the 30ns overhead each + # time we need access to the event loop. object _loop # Streaming call only attributes: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi index b205e3f70e4..fef0e1ae1cb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi @@ -23,6 +23,8 @@ cdef class _AsyncioSocket: object _task_read object _task_connect char * _read_buffer + # Caches the picked event loop, so we can avoid the 30ns overhead each + # time we need access to the event loop. object _loop # Client-side attributes diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi index a4414b8cfbe..3780d8ddf2f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi @@ -18,7 +18,7 @@ cdef class AioRpcStatus(Exception): cdef readonly: grpc_status_code _code str _details - # On spec, only client-side status has trailing metadata. + # Per the spec, only client-side status has trailing metadata. tuple _trailing_metadata str _debug_error_string diff --git a/src/python/grpcio/grpc/experimental/aio/__init__.py b/src/python/grpcio/grpc/experimental/aio/__init__.py index 3633ad2b598..af45fd2c692 100644 --- a/src/python/grpcio/grpc/experimental/aio/__init__.py +++ b/src/python/grpcio/grpc/experimental/aio/__init__.py @@ -11,7 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""gRPC's Asynchronous Python API.""" +"""gRPC's Asynchronous Python API. + +gRPC Async API objects may only be used on the thread on which they were +created. AsyncIO doesn't provide thread safety for most of its APIs. +""" import abc import six diff --git a/src/python/grpcio/grpc/experimental/aio/_base_call.py b/src/python/grpcio/grpc/experimental/aio/_base_call.py index 87c3ec99399..d76096e8e1d 100644 --- a/src/python/grpcio/grpc/experimental/aio/_base_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_base_call.py @@ -55,8 +55,6 @@ class Call(grpc.RpcContext, metaclass=ABCMeta): async def initial_metadata(self) -> MetadataType: """Accesses the initial metadata sent by the server. - Coroutine continues once the value is available. - Returns: The initial :term:`metadata`. """ @@ -65,8 +63,6 @@ class Call(grpc.RpcContext, metaclass=ABCMeta): async def trailing_metadata(self) -> MetadataType: """Accesses the trailing metadata sent by the server. - Coroutine continues once the value is available. - Returns: The trailing :term:`metadata`. """ @@ -75,8 +71,6 @@ class Call(grpc.RpcContext, metaclass=ABCMeta): async def code(self) -> grpc.StatusCode: """Accesses the status code sent by the server. - Coroutine continues once the value is available. - Returns: The StatusCode value for the RPC. """ @@ -85,8 +79,6 @@ class Call(grpc.RpcContext, metaclass=ABCMeta): async def details(self) -> Text: """Accesses the details sent by the server. - Coroutine continues once the value is available. - Returns: The details string of the RPC. """ @@ -122,7 +114,9 @@ class UnaryStreamCall( async def read(self) -> ResponseType: """Reads one message from the RPC. - Parallel read operations is not allowed. + Concurrent reads in multiple coroutines are not allowed. If you want to + perform read in multiple coroutines, you needs synchronization. So, you + can start another read after current read is finished. Returns: A response message of the RPC. diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index a62b8d61c7a..d3489506a82 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -148,26 +148,26 @@ class Call(_base_call.Call): _code: grpc.StatusCode _status: Awaitable[cygrpc.AioRpcStatus] _initial_metadata: Awaitable[MetadataType] - _cancellation_future: asyncio.Future + _cancellation: asyncio.Future def __init__(self) -> None: self._loop = asyncio.get_event_loop() self._code = None self._status = self._loop.create_future() self._initial_metadata = self._loop.create_future() - self._cancellation_future = self._loop.create_future() + self._cancellation = self._loop.create_future() def cancel(self) -> bool: - """Virtual cancellation method. + """Placeholder cancellation method. The implementation of this method needs to pass the cancellation reason - into self._cancellation_future, using `set_result` instead of + into self._cancellation, using `set_result` instead of `set_exception`. """ raise NotImplementedError() def cancelled(self) -> bool: - return self._cancellation_future.done( + return self._cancellation.done( ) or self._code == grpc.StatusCode.CANCELLED def done(self) -> bool: @@ -286,7 +286,7 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): self._method, serialized_request, self._deadline, - self._cancellation_future, + self._cancellation, self._set_initial_metadata, self._set_status, ) @@ -297,8 +297,8 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): def _cancel(self, status: cygrpc.AioRpcStatus) -> bool: """Forwards the application cancellation reasoning.""" - if not self._status.done() and not self._cancellation_future.done(): - self._cancellation_future.set_result(status) + if not self._status.done() and not self._cancellation.done(): + self._cancellation.set_result(status) self._call.cancel() return True else: @@ -363,12 +363,12 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): self._method, serialized_request, self._deadline, - self._cancellation_future, + self._cancellation, self._set_initial_metadata, self._set_status, ) async for serialized_response in async_gen: - if self._cancellation_future.done(): + if self._cancellation.done(): await self._status if self._status.done(): # Raises pre-maturely if final status received here. Generates @@ -390,8 +390,8 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): and the client calling "cancel" at the same time, this method respects the winner in C-Core. """ - if not self._status.done() and not self._cancellation_future.done(): - self._cancellation_future.set_result(status) + if not self._status.done() and not self._cancellation.done(): + self._cancellation.set_result(status) return True else: return False diff --git a/src/python/grpcio_tests/tests_aio/tests.json b/src/python/grpcio_tests/tests_aio/tests.json index 1a46c9d5e69..5d6634b8a8e 100644 --- a/src/python/grpcio_tests/tests_aio/tests.json +++ b/src/python/grpcio_tests/tests_aio/tests.json @@ -1,8 +1,8 @@ [ "_sanity._sanity_test.AioSanityTest", "unit.aio_rpc_error_test.TestAioRpcError", - "unit.call_test.TestUnaryUnaryCall", "unit.call_test.TestUnaryStreamCall", + "unit.call_test.TestUnaryUnaryCall", "unit.channel_test.TestChannel", "unit.init_test.TestInsecureChannel", "unit.server_test.TestServer"