diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 7699cea85cd..cde2a95fe71 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -125,7 +125,7 @@ cdef class _ServicerContext: if self._rpc_state.metadata_sent: raise RuntimeError('Send initial metadata failed: already sent') else: - await _send_initial_metadata(self._rpc_state, metadata, self._loop) + await _send_initial_metadata(self._rpc_state, metadata, _EMPTY_FLAG, self._loop) self._rpc_state.metadata_sent = True async def abort(self, diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 3add587be6c..d09eb3b5e08 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -357,7 +357,9 @@ class _StreamRequestMixin(Call): # Rpc status should be exposed through other API. Exceptions raised # within this Task won't be retrieved by another coroutine. It's # better to suppress the error than spamming users' screen. - _LOGGER.debug('Exception while consuming of the request_iterator: %s', rpc_error) + _LOGGER.debug( + 'Exception while consuming of the request_iterator: %s', + rpc_error) async def write(self, request: RequestType) -> None: if self.done(): @@ -406,13 +408,13 @@ class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall): def __init__(self, request: RequestType, deadline: Optional[float], metadata: MetadataType, credentials: Optional[grpc.CallCredentials], - wait_for_ready: Optional[bool], - channel: cygrpc.AioChannel, method: bytes, - request_serializer: SerializingFunction, + wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, + method: bytes, request_serializer: SerializingFunction, response_deserializer: DeserializingFunction, loop: asyncio.AbstractEventLoop) -> None: - super().__init__(channel.call(method, deadline, credentials, wait_for_ready), metadata, - request_serializer, response_deserializer, loop) + super().__init__( + channel.call(method, deadline, credentials, wait_for_ready), + metadata, request_serializer, response_deserializer, loop) self._request = request self._init_unary_response_mixin(self._invoke()) @@ -449,13 +451,13 @@ class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall): def __init__(self, request: RequestType, deadline: Optional[float], metadata: MetadataType, credentials: Optional[grpc.CallCredentials], - wait_for_ready: Optional[bool], - channel: cygrpc.AioChannel, method: bytes, - request_serializer: SerializingFunction, + wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, + method: bytes, request_serializer: SerializingFunction, response_deserializer: DeserializingFunction, loop: asyncio.AbstractEventLoop) -> None: - super().__init__(channel.call(method, deadline, credentials, wait_for_ready), metadata, - request_serializer, response_deserializer, loop) + super().__init__( + channel.call(method, deadline, credentials, wait_for_ready), + metadata, request_serializer, response_deserializer, loop) self._request = request self._send_unary_request_task = loop.create_task( self._send_unary_request()) @@ -485,13 +487,13 @@ class StreamUnaryCall(_StreamRequestMixin, _UnaryResponseMixin, Call, request_async_iterator: Optional[AsyncIterable[RequestType]], deadline: Optional[float], metadata: MetadataType, credentials: Optional[grpc.CallCredentials], - wait_for_ready: Optional[bool], - channel: cygrpc.AioChannel, method: bytes, - request_serializer: SerializingFunction, + wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, + method: bytes, request_serializer: SerializingFunction, response_deserializer: DeserializingFunction, loop: asyncio.AbstractEventLoop) -> None: - super().__init__(channel.call(method, deadline, credentials, wait_for_ready), metadata, - request_serializer, response_deserializer, loop) + super().__init__( + channel.call(method, deadline, credentials, wait_for_ready), + metadata, request_serializer, response_deserializer, loop) self._init_stream_request_mixin(request_async_iterator) self._init_unary_response_mixin(self._conduct_rpc()) @@ -524,13 +526,13 @@ class StreamStreamCall(_StreamRequestMixin, _StreamResponseMixin, Call, request_async_iterator: Optional[AsyncIterable[RequestType]], deadline: Optional[float], metadata: MetadataType, credentials: Optional[grpc.CallCredentials], - wait_for_ready: Optional[bool], - channel: cygrpc.AioChannel, method: bytes, - request_serializer: SerializingFunction, + wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, + method: bytes, request_serializer: SerializingFunction, response_deserializer: DeserializingFunction, loop: asyncio.AbstractEventLoop) -> None: - super().__init__(channel.call(method, deadline, credentials, wait_for_ready), metadata, - request_serializer, response_deserializer, loop) + super().__init__( + channel.call(method, deadline, credentials, wait_for_ready), + metadata, request_serializer, response_deserializer, loop) self._initializer = self._loop.create_task(self._prepare_rpc()) self._init_stream_request_mixin(request_async_iterator) self._init_stream_response_mixin(self._initializer) diff --git a/src/python/grpcio/grpc/experimental/aio/_channel.py b/src/python/grpcio/grpc/experimental/aio/_channel.py index 547a5d83b29..3ac35b4a9fa 100644 --- a/src/python/grpcio/grpc/experimental/aio/_channel.py +++ b/src/python/grpcio/grpc/experimental/aio/_channel.py @@ -109,17 +109,16 @@ class UnaryUnaryMultiCallable(_BaseMultiCallable): if not self._interceptors: return UnaryUnaryCall(request, _timeout_to_deadline(timeout), - metadata, credentials, wait_for_ready, self._channel, - self._method, self._request_serializer, + metadata, credentials, wait_for_ready, + self._channel, self._method, + self._request_serializer, self._response_deserializer, self._loop) else: - return InterceptedUnaryUnaryCall(self._interceptors, request, - timeout, metadata, credentials, - wait_for_ready, - self._channel, self._method, - self._request_serializer, - self._response_deserializer, - self._loop) + return InterceptedUnaryUnaryCall( + self._interceptors, request, timeout, metadata, credentials, + wait_for_ready, self._channel, self._method, + self._request_serializer, self._response_deserializer, + self._loop) class UnaryStreamMultiCallable(_BaseMultiCallable): @@ -159,8 +158,8 @@ class UnaryStreamMultiCallable(_BaseMultiCallable): if metadata is None: metadata = _IMMUTABLE_EMPTY_TUPLE - return UnaryStreamCall(request, deadline, metadata, credentials,wait_for_ready, - self._channel, self._method, + return UnaryStreamCall(request, deadline, metadata, credentials, + wait_for_ready, self._channel, self._method, self._request_serializer, self._response_deserializer, self._loop) @@ -207,8 +206,8 @@ class StreamUnaryMultiCallable(_BaseMultiCallable): metadata = _IMMUTABLE_EMPTY_TUPLE return StreamUnaryCall(request_async_iterator, deadline, metadata, - credentials, wait_for_ready, self._channel, self._method, - self._request_serializer, + credentials, wait_for_ready, self._channel, + self._method, self._request_serializer, self._response_deserializer, self._loop) @@ -254,8 +253,8 @@ class StreamStreamMultiCallable(_BaseMultiCallable): metadata = _IMMUTABLE_EMPTY_TUPLE return StreamStreamCall(request_async_iterator, deadline, metadata, - credentials, wait_for_ready, self._channel, self._method, - self._request_serializer, + credentials, wait_for_ready, self._channel, + self._method, self._request_serializer, self._response_deserializer, self._loop) diff --git a/src/python/grpcio/grpc/experimental/aio/_interceptor.py b/src/python/grpcio/grpc/experimental/aio/_interceptor.py index 87a28fae796..a822ca69653 100644 --- a/src/python/grpcio/grpc/experimental/aio/_interceptor.py +++ b/src/python/grpcio/grpc/experimental/aio/_interceptor.py @@ -33,13 +33,14 @@ _LOCAL_CANCELLATION_DETAILS = 'Locally cancelled by application!' class ClientCallDetails( collections.namedtuple( 'ClientCallDetails', - ('method', 'timeout', 'metadata', 'credentials')), + ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready')), grpc.ClientCallDetails): method: Text timeout: Optional[float] metadata: Optional[MetadataType] credentials: Optional[grpc.CallCredentials] + wait_for_ready: Optional[bool] class UnaryUnaryClientInterceptor(metaclass=ABCMeta): @@ -108,28 +109,29 @@ class InterceptedUnaryUnaryCall(_base_call.UnaryUnaryCall): request: RequestType, timeout: Optional[float], metadata: MetadataType, credentials: Optional[grpc.CallCredentials], - channel: cygrpc.AioChannel, method: bytes, - request_serializer: SerializingFunction, + wait_for_ready: Optional[bool], channel: cygrpc.AioChannel, + method: bytes, request_serializer: SerializingFunction, response_deserializer: DeserializingFunction, loop: asyncio.AbstractEventLoop) -> None: self._channel = channel self._loop = loop self._interceptors_task = asyncio.ensure_future(self._invoke( - interceptors, method, timeout, metadata, credentials, request, - request_serializer, response_deserializer), + interceptors, method, timeout, metadata, credentials, + wait_for_ready, request, request_serializer, response_deserializer), loop=loop) def __del__(self): self.cancel() # pylint: disable=too-many-arguments - async def _invoke( - self, interceptors: Sequence[UnaryUnaryClientInterceptor], - method: bytes, timeout: Optional[float], - metadata: Optional[MetadataType], - credentials: Optional[grpc.CallCredentials], request: RequestType, - request_serializer: SerializingFunction, - response_deserializer: DeserializingFunction) -> UnaryUnaryCall: + async def _invoke(self, interceptors: Sequence[UnaryUnaryClientInterceptor], + method: bytes, timeout: Optional[float], + metadata: Optional[MetadataType], + credentials: Optional[grpc.CallCredentials], + wait_for_ready: Optional[bool], request: RequestType, + request_serializer: SerializingFunction, + response_deserializer: DeserializingFunction + ) -> UnaryUnaryCall: """Run the RPC call wrapped in interceptors""" async def _run_interceptor( @@ -154,12 +156,13 @@ class InterceptedUnaryUnaryCall(_base_call.UnaryUnaryCall): return UnaryUnaryCall( request, _timeout_to_deadline(client_call_details.timeout), client_call_details.metadata, - client_call_details.credentials, self._channel, + client_call_details.credentials, + client_call_details.wait_for_ready, self._channel, client_call_details.method, request_serializer, response_deserializer, self._loop) client_call_details = ClientCallDetails(method, timeout, metadata, - credentials) + credentials, wait_for_ready) return await _run_interceptor(iter(interceptors), client_call_details, request) diff --git a/src/python/grpcio_tests/tests_aio/unit/_common.py b/src/python/grpcio_tests/tests_aio/unit/_common.py index 535f3d1b157..05d50b4e92c 100644 --- a/src/python/grpcio_tests/tests_aio/unit/_common.py +++ b/src/python/grpcio_tests/tests_aio/unit/_common.py @@ -26,9 +26,11 @@ def seen_metadatum(expected: MetadatumType, actual: MetadataType): return metadata_dict.get(expected[0]) == expected[1] -async def block_until_certain_state(channel: aio.Channel, expected_state: grpc.ChannelConnectivity): +async def block_until_certain_state(channel: aio.Channel, + expected_state: grpc.ChannelConnectivity): state = channel.get_state() while state != expected_state: - import logging;logging.debug('Get %s want %s', state, expected_state) + import logging + logging.debug('Get %s want %s', state, expected_state) await channel.wait_for_state_change(state) state = channel.get_state() diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index a1bd243f81c..967edff204c 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -80,17 +80,16 @@ class TestUnaryUnaryCall(_MulticallableTestMixin, AioTestBase): async with aio.insecure_channel(_UNREACHABLE_TARGET) as channel: stub = test_pb2_grpc.TestServiceStub(channel) - call = stub.UnaryCall(messages_pb2.SimpleRequest(), timeout=0.1) + call = stub.UnaryCall(messages_pb2.SimpleRequest()) with self.assertRaises(grpc.RpcError) as exception_context: await call - self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, + self.assertEqual(grpc.StatusCode.UNAVAILABLE, exception_context.exception.code()) self.assertTrue(call.done()) - self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, await - call.code()) + self.assertEqual(grpc.StatusCode.UNAVAILABLE, await call.code()) # Exception is cached at call object level, reentrance # returns again the same exception diff --git a/src/python/grpcio_tests/tests_aio/unit/connectivity_test.py b/src/python/grpcio_tests/tests_aio/unit/connectivity_test.py index 388be97b2b5..7acf53b95c7 100644 --- a/src/python/grpcio_tests/tests_aio/unit/connectivity_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/connectivity_test.py @@ -23,6 +23,7 @@ import grpc from grpc.experimental import aio from tests.unit.framework.common import test_constants +from tests_aio.unit import _common from tests_aio.unit._constants import UNREACHABLE_TARGET from tests_aio.unit._test_base import AioTestBase from tests_aio.unit._test_server import start_test_server @@ -56,8 +57,8 @@ class TestConnectivityState(AioTestBase): # Should not time out await asyncio.wait_for( - _common.block_until_certain_state(channel, - grpc.ChannelConnectivity.READY), + _common.block_until_certain_state( + channel, grpc.ChannelConnectivity.READY), test_constants.SHORT_TIMEOUT) async def test_timeout(self): @@ -68,8 +69,8 @@ class TestConnectivityState(AioTestBase): # If timed out, the function should return None. with self.assertRaises(asyncio.TimeoutError): await asyncio.wait_for( - _common.block_until_certain_state(channel, - grpc.ChannelConnectivity.READY), + _common.block_until_certain_state( + channel, grpc.ChannelConnectivity.READY), test_constants.SHORT_TIMEOUT) async def test_shutdown(self): diff --git a/src/python/grpcio_tests/tests_aio/unit/interceptor_test.py b/src/python/grpcio_tests/tests_aio/unit/interceptor_test.py index 6d1ae543b34..e2b055edcf1 100644 --- a/src/python/grpcio_tests/tests_aio/unit/interceptor_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/interceptor_test.py @@ -132,7 +132,8 @@ class TestUnaryUnaryClientInterceptor(AioTestBase): method=client_call_details.method, timeout=_constants.UNARY_CALL_WITH_SLEEP_VALUE / 2, metadata=client_call_details.metadata, - credentials=client_call_details.credentials) + credentials=client_call_details.credentials, + wait_for_ready=client_call_details.wait_for_ready) return await continuation(new_client_call_details, request) interceptor = TimeoutInterceptor() @@ -173,7 +174,8 @@ class TestUnaryUnaryClientInterceptor(AioTestBase): method=client_call_details.method, timeout=_constants.UNARY_CALL_WITH_SLEEP_VALUE / 2, metadata=client_call_details.metadata, - credentials=client_call_details.credentials) + credentials=client_call_details.credentials, + wait_for_ready=client_call_details.wait_for_ready) try: call = await continuation(new_client_call_details, request) @@ -187,7 +189,8 @@ class TestUnaryUnaryClientInterceptor(AioTestBase): method=client_call_details.method, timeout=None, metadata=client_call_details.metadata, - credentials=client_call_details.credentials) + credentials=client_call_details.credentials, + wait_for_ready=client_call_details.wait_for_ready) call = await continuation(new_client_call_details, request) self.calls.append(call) @@ -552,6 +555,7 @@ class TestInterceptedUnaryUnaryCall(AioTestBase): metadata=client_call_details.metadata + _INITIAL_METADATA_TO_INJECT, credentials=client_call_details.credentials, + wait_for_ready=client_call_details.wait_for_ready, ) return await continuation(new_details, request) diff --git a/src/python/grpcio_tests/tests_aio/unit/wait_for_ready_test.py b/src/python/grpcio_tests/tests_aio/unit/wait_for_ready_test.py index b5decf7dd01..69e0a39a5af 100644 --- a/src/python/grpcio_tests/tests_aio/unit/wait_for_ready_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/wait_for_ready_test.py @@ -32,8 +32,11 @@ _NUM_STREAM_RESPONSES = 5 _REQUEST_PAYLOAD_SIZE = 7 _RESPONSE_PAYLOAD_SIZE = 42 + async def _perform_unary_unary(stub, wait_for_ready): - await stub.UnaryCall(messages_pb2.SimpleRequest(), timeout=test_constants.SHORT_TIMEOUT, wait_for_ready=wait_for_ready) + await stub.UnaryCall(messages_pb2.SimpleRequest(), + timeout=test_constants.SHORT_TIMEOUT, + wait_for_ready=wait_for_ready) async def _perform_unary_stream(stub, wait_for_ready): @@ -42,7 +45,9 @@ async def _perform_unary_stream(stub, wait_for_ready): request.response_parameters.append( messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) - call = stub.StreamingOutputCall(request, timeout=test_constants.SHORT_TIMEOUT, wait_for_ready=wait_for_ready) + call = stub.StreamingOutputCall(request, + timeout=test_constants.SHORT_TIMEOUT, + wait_for_ready=wait_for_ready) for _ in range(_NUM_STREAM_RESPONSES): await call.read() @@ -57,11 +62,14 @@ async def _perform_stream_unary(stub, wait_for_ready): for _ in range(_NUM_STREAM_RESPONSES): yield request - await stub.StreamingInputCall(gen(), timeout=test_constants.SHORT_TIMEOUT, wait_for_ready=wait_for_ready) + await stub.StreamingInputCall(gen(), + timeout=test_constants.SHORT_TIMEOUT, + wait_for_ready=wait_for_ready) async def _perform_stream_stream(stub, wait_for_ready): - call = stub.FullDuplexCall(timeout=test_constants.SHORT_TIMEOUT, wait_for_ready=wait_for_ready) + call = stub.FullDuplexCall(timeout=test_constants.SHORT_TIMEOUT, + wait_for_ready=wait_for_ready) request = messages_pb2.StreamingOutputCallRequest() request.response_parameters.append( @@ -117,8 +125,7 @@ class TestWaitForReady(AioTestBase): # Wait for TRANSIENT_FAILURE, and RPC is not aborting await _common.block_until_certain_state( - self._channel, - grpc.ChannelConnectivity.TRANSIENT_FAILURE) + self._channel, grpc.ChannelConnectivity.TRANSIENT_FAILURE) try: # Start the server