diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 9cab7052318..a0693921461 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -35,7 +35,7 @@ _LOCAL_CANCELLATION_DETAILS = 'Locally cancelled by application!' _GC_CANCELLATION_DETAILS = 'Cancelled upon garbage collection!' _RPC_ALREADY_FINISHED_DETAILS = 'RPC already finished.' _RPC_HALF_CLOSED_DETAILS = 'RPC is half closed after calling "done_writing".' -_API_STYLE_ERROR = 'Please don\'t mix two styles of API for streaming requests' +_API_STYLE_ERROR = 'The iterator and read/write APIs may not be mixed on a single RPC.' _OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n' '\tstatus = {}\n' diff --git a/src/python/grpcio/grpc/experimental/aio/_channel.py b/src/python/grpcio/grpc/experimental/aio/_channel.py index 731a3351b48..97ffa833bbf 100644 --- a/src/python/grpcio/grpc/experimental/aio/_channel.py +++ b/src/python/grpcio/grpc/experimental/aio/_channel.py @@ -256,11 +256,12 @@ class Channel(_base_channel.Channel): if invalid_interceptors: raise ValueError( - "Interceptor must be "+\ - "UnaryUnaryClientInterceptors or "+\ - "UnaryUnaryClientInterceptors or "+\ - "StreamUnaryClientInterceptors. The following are invalid: {}"\ - .format(invalid_interceptors)) + "Interceptor must be " + + "{} or ".format(UnaryUnaryClientInterceptor.__name__) + + "{} or ".format(UnaryStreamClientInterceptor.__name__) + + "{}. ".format(StreamUnaryClientInterceptor.__name__) + + "The following are invalid: {}".format(invalid_interceptors) + ) self._loop = asyncio.get_event_loop() self._channel = cygrpc.AioChannel( diff --git a/src/python/grpcio/grpc/experimental/aio/_interceptor.py b/src/python/grpcio/grpc/experimental/aio/_interceptor.py index eff5e03fc1e..e4969ddb4a5 100644 --- a/src/python/grpcio/grpc/experimental/aio/_interceptor.py +++ b/src/python/grpcio/grpc/experimental/aio/_interceptor.py @@ -104,7 +104,7 @@ class UnaryUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta): Args: continuation: A coroutine that proceeds with the invocation by - executing the next interceptor in chain or invoking the + executing the next interceptor in the chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use @@ -141,7 +141,7 @@ class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta): Args: continuation: A coroutine that proceeds with the invocation by - executing the next interceptor in chain or invoking the + executing the next interceptor in the chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use @@ -174,9 +174,14 @@ class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta): ) -> StreamUnaryCall: """Intercepts a stream-unary invocation asynchronously. + Within the interceptor the usage of the call methods like `write` or + even awaiting the call should be done carefully, since the caller + could be expecting an untouched call, for example for start writing + messages to it. + Args: continuation: A coroutine that proceeds with the invocation by - executing the next interceptor in chain or invoking the + executing the next interceptor in the chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use @@ -564,7 +569,7 @@ class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin, _write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]] _write_to_iterator_queue: Optional[asyncio.Queue] - _FINISH_ITERATOR_SENTINEL = tuple() + _FINISH_ITERATOR_SENTINEL = object() # pylint: disable=too-many-arguments def __init__(self, interceptors: Sequence[StreamUnaryClientInterceptor], @@ -577,11 +582,11 @@ class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin, loop: asyncio.AbstractEventLoop) -> None: self._loop = loop self._channel = channel - if not request_iterator: + if request_iterator is None: # We provide our own request iterator which is a proxy # of the futures writes that will be done by the caller. self._write_to_iterator_queue = asyncio.Queue(maxsize=1) - self._write_to_iterator_async_gen = self._proxies_writes_as_a_request_iteerator( + self._write_to_iterator_async_gen = self._proxy_writes_as_request_iterator( ) request_iterator = self._write_to_iterator_async_gen else: @@ -636,7 +641,7 @@ class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin, def time_remaining(self) -> Optional[float]: raise NotImplementedError() - async def _proxies_writes_as_a_request_iteerator(self): + async def _proxy_writes_as_request_iterator(self): await self._interceptors_task while True: