Adopt reviewer's advices

pull/21232/head
Lidi Zheng 5 years ago
parent 1a3916bc45
commit bc99ddedf0
  1. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi
  2. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi
  3. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/rpc_status.pxd.pxi
  4. 6
      src/python/grpcio/grpc/experimental/aio/__init__.py
  5. 12
      src/python/grpcio/grpc/experimental/aio/_base_call.py
  6. 24
      src/python/grpcio/grpc/experimental/aio/_call.py
  7. 2
      src/python/grpcio_tests/tests_aio/tests.json

@ -18,6 +18,8 @@ cdef class _AioCall:
AioChannel _channel
list _references
GrpcCallWrapper _grpc_call_wrapper
# Caches the picked event loop, so we can avoid the 30ns overhead each
# time we need access to the event loop.
object _loop
# Streaming call only attributes:

@ -23,6 +23,8 @@ cdef class _AsyncioSocket:
object _task_read
object _task_connect
char * _read_buffer
# Caches the picked event loop, so we can avoid the 30ns overhead each
# time we need access to the event loop.
object _loop
# Client-side attributes

@ -18,7 +18,7 @@ cdef class AioRpcStatus(Exception):
cdef readonly:
grpc_status_code _code
str _details
# On spec, only client-side status has trailing metadata.
# Per the spec, only client-side status has trailing metadata.
tuple _trailing_metadata
str _debug_error_string

@ -11,7 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""gRPC's Asynchronous Python API."""
"""gRPC's Asynchronous Python API.
gRPC Async API objects may only be used on the thread on which they were
created. AsyncIO doesn't provide thread safety for most of its APIs.
"""
import abc
import six

@ -55,8 +55,6 @@ class Call(grpc.RpcContext, metaclass=ABCMeta):
async def initial_metadata(self) -> MetadataType:
"""Accesses the initial metadata sent by the server.
Coroutine continues once the value is available.
Returns:
The initial :term:`metadata`.
"""
@ -65,8 +63,6 @@ class Call(grpc.RpcContext, metaclass=ABCMeta):
async def trailing_metadata(self) -> MetadataType:
"""Accesses the trailing metadata sent by the server.
Coroutine continues once the value is available.
Returns:
The trailing :term:`metadata`.
"""
@ -75,8 +71,6 @@ class Call(grpc.RpcContext, metaclass=ABCMeta):
async def code(self) -> grpc.StatusCode:
"""Accesses the status code sent by the server.
Coroutine continues once the value is available.
Returns:
The StatusCode value for the RPC.
"""
@ -85,8 +79,6 @@ class Call(grpc.RpcContext, metaclass=ABCMeta):
async def details(self) -> Text:
"""Accesses the details sent by the server.
Coroutine continues once the value is available.
Returns:
The details string of the RPC.
"""
@ -122,7 +114,9 @@ class UnaryStreamCall(
async def read(self) -> ResponseType:
"""Reads one message from the RPC.
Parallel read operations is not allowed.
Concurrent reads in multiple coroutines are not allowed. If you want to
perform read in multiple coroutines, you needs synchronization. So, you
can start another read after current read is finished.
Returns:
A response message of the RPC.

@ -148,26 +148,26 @@ class Call(_base_call.Call):
_code: grpc.StatusCode
_status: Awaitable[cygrpc.AioRpcStatus]
_initial_metadata: Awaitable[MetadataType]
_cancellation_future: asyncio.Future
_cancellation: asyncio.Future
def __init__(self) -> None:
self._loop = asyncio.get_event_loop()
self._code = None
self._status = self._loop.create_future()
self._initial_metadata = self._loop.create_future()
self._cancellation_future = self._loop.create_future()
self._cancellation = self._loop.create_future()
def cancel(self) -> bool:
"""Virtual cancellation method.
"""Placeholder cancellation method.
The implementation of this method needs to pass the cancellation reason
into self._cancellation_future, using `set_result` instead of
into self._cancellation, using `set_result` instead of
`set_exception`.
"""
raise NotImplementedError()
def cancelled(self) -> bool:
return self._cancellation_future.done(
return self._cancellation.done(
) or self._code == grpc.StatusCode.CANCELLED
def done(self) -> bool:
@ -286,7 +286,7 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall):
self._method,
serialized_request,
self._deadline,
self._cancellation_future,
self._cancellation,
self._set_initial_metadata,
self._set_status,
)
@ -297,8 +297,8 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall):
def _cancel(self, status: cygrpc.AioRpcStatus) -> bool:
"""Forwards the application cancellation reasoning."""
if not self._status.done() and not self._cancellation_future.done():
self._cancellation_future.set_result(status)
if not self._status.done() and not self._cancellation.done():
self._cancellation.set_result(status)
self._call.cancel()
return True
else:
@ -363,12 +363,12 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall):
self._method,
serialized_request,
self._deadline,
self._cancellation_future,
self._cancellation,
self._set_initial_metadata,
self._set_status,
)
async for serialized_response in async_gen:
if self._cancellation_future.done():
if self._cancellation.done():
await self._status
if self._status.done():
# Raises pre-maturely if final status received here. Generates
@ -390,8 +390,8 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall):
and the client calling "cancel" at the same time, this method respects
the winner in C-Core.
"""
if not self._status.done() and not self._cancellation_future.done():
self._cancellation_future.set_result(status)
if not self._status.done() and not self._cancellation.done():
self._cancellation.set_result(status)
return True
else:
return False

@ -1,8 +1,8 @@
[
"_sanity._sanity_test.AioSanityTest",
"unit.aio_rpc_error_test.TestAioRpcError",
"unit.call_test.TestUnaryUnaryCall",
"unit.call_test.TestUnaryStreamCall",
"unit.call_test.TestUnaryUnaryCall",
"unit.channel_test.TestChannel",
"unit.init_test.TestInsecureChannel",
"unit.server_test.TestServer"

Loading…
Cancel
Save