Apply feedback review

pull/22821/head
Pau Freixes 5 years ago
parent 7b869e8442
commit 39d8fa3f0b
  1. 2
      src/python/grpcio/grpc/experimental/aio/_call.py
  2. 11
      src/python/grpcio/grpc/experimental/aio/_channel.py
  3. 19
      src/python/grpcio/grpc/experimental/aio/_interceptor.py

@ -35,7 +35,7 @@ _LOCAL_CANCELLATION_DETAILS = 'Locally cancelled by application!'
_GC_CANCELLATION_DETAILS = 'Cancelled upon garbage collection!'
_RPC_ALREADY_FINISHED_DETAILS = 'RPC already finished.'
_RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".'
_API_STYLE_ERROR = 'Please don\'t mix two styles of API for streaming requests'
_API_STYLE_ERROR = 'The iterator and read/write APIs may not be mixed on a single RPC.'
_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n'
'\tstatus = {}\n'

@ -256,11 +256,12 @@ class Channel(_base_channel.Channel):
if invalid_interceptors:
raise ValueError(
"Interceptor must be "+\
"UnaryUnaryClientInterceptors or "+\
"UnaryUnaryClientInterceptors or "+\
"StreamUnaryClientInterceptors. The following are invalid: {}"\
.format(invalid_interceptors))
"Interceptor must be " +
"{} or ".format(UnaryUnaryClientInterceptor.__name__) +
"{} or ".format(UnaryStreamClientInterceptor.__name__) +
"{}. ".format(StreamUnaryClientInterceptor.__name__) +
"The following are invalid: {}".format(invalid_interceptors)
)
self._loop = asyncio.get_event_loop()
self._channel = cygrpc.AioChannel(

@ -104,7 +104,7 @@ class UnaryUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
Args:
continuation: A coroutine that proceeds with the invocation by
executing the next interceptor in chain or invoking the
executing the next interceptor in the chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
@ -141,7 +141,7 @@ class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
Args:
continuation: A coroutine that proceeds with the invocation by
executing the next interceptor in chain or invoking the
executing the next interceptor in the chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
@ -174,9 +174,14 @@ class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
) -> StreamUnaryCall:
"""Intercepts a stream-unary invocation asynchronously.
Within the interceptor the usage of the call methods like `write` or
even awaiting the call should be done carefully, since the caller
could be expecting an untouched call, for example for start writing
messages to it.
Args:
continuation: A coroutine that proceeds with the invocation by
executing the next interceptor in chain or invoking the
executing the next interceptor in the chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
@ -564,7 +569,7 @@ class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin,
_write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]]
_write_to_iterator_queue: Optional[asyncio.Queue]
_FINISH_ITERATOR_SENTINEL = tuple()
_FINISH_ITERATOR_SENTINEL = object()
# pylint: disable=too-many-arguments
def __init__(self, interceptors: Sequence[StreamUnaryClientInterceptor],
@ -577,11 +582,11 @@ class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin,
loop: asyncio.AbstractEventLoop) -> None:
self._loop = loop
self._channel = channel
if not request_iterator:
if request_iterator is None:
# We provide our own request iterator which is a proxy
# of the futures writes that will be done by the caller.
self._write_to_iterator_queue = asyncio.Queue(maxsize=1)
self._write_to_iterator_async_gen = self._proxies_writes_as_a_request_iteerator(
self._write_to_iterator_async_gen = self._proxy_writes_as_request_iterator(
)
request_iterator = self._write_to_iterator_async_gen
else:
@ -636,7 +641,7 @@ class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin,
def time_remaining(self) -> Optional[float]:
raise NotImplementedError()
async def _proxies_writes_as_a_request_iteerator(self):
async def _proxy_writes_as_request_iterator(self):
await self._interceptors_task
while True:

Loading…
Cancel
Save