Merge pull request #21543 from Skyscanner/allow_calling_none_existing_servers

[Aio] Call correctly the connect CB error when an error happens
pull/21444/head
Pau Freixes 5 years ago committed by GitHub
commit a85d001225
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  2. 6
      src/python/grpcio_tests/tests_aio/unit/_test_server.py
  3. 17
      src/python/grpcio_tests/tests_aio/unit/call_test.py
  4. 34
      src/python/grpcio_tests/tests_aio/unit/channel_test.py

@ -66,6 +66,7 @@ cdef class _AsyncioSocket:
<grpc_custom_socket*>self._grpc_socket, <grpc_custom_socket*>self._grpc_socket,
grpc_socket_error("Socket connect failed: {}".format(e).encode()) grpc_socket_error("Socket connect failed: {}".format(e).encode())
) )
return
finally: finally:
self._task_connect = None self._task_connect = None

@ -27,12 +27,6 @@ class _TestServiceServicer(test_pb2_grpc.TestServiceServicer):
async def UnaryCall(self, request, context): async def UnaryCall(self, request, context):
return messages_pb2.SimpleResponse() return messages_pb2.SimpleResponse()
# TODO(lidizheng) The semantic of this call is not matching its description
# See src/proto/grpc/testing/test.proto
async def EmptyCall(self, request, context):
while True:
await asyncio.sleep(test_constants.LONG_TIMEOUT)
async def StreamingOutputCall( async def StreamingOutputCall(
self, request: messages_pb2.StreamingOutputCallRequest, context): self, request: messages_pb2.StreamingOutputCallRequest, context):
for response_parameters in request.response_parameters: for response_parameters in request.response_parameters:

@ -31,6 +31,7 @@ _NUM_STREAM_RESPONSES = 5
_RESPONSE_PAYLOAD_SIZE = 42 _RESPONSE_PAYLOAD_SIZE = 42
_LOCAL_CANCEL_DETAILS_EXPECTATION = 'Locally cancelled by application!' _LOCAL_CANCEL_DETAILS_EXPECTATION = 'Locally cancelled by application!'
_RESPONSE_INTERVAL_US = test_constants.SHORT_TIMEOUT * 1000 * 1000 _RESPONSE_INTERVAL_US = test_constants.SHORT_TIMEOUT * 1000 * 1000
_UNREACHABLE_TARGET = '0.1:1111'
class TestUnaryUnaryCall(AioTestBase): class TestUnaryUnaryCall(AioTestBase):
@ -63,20 +64,14 @@ class TestUnaryUnaryCall(AioTestBase):
self.assertIs(response, response_retry) self.assertIs(response, response_retry)
async def test_call_rpc_error(self): async def test_call_rpc_error(self):
async with aio.insecure_channel(self._server_target) as channel: async with aio.insecure_channel(_UNREACHABLE_TARGET) as channel:
empty_call_with_sleep = channel.unary_unary( hi = channel.unary_unary(
"/grpc.testing.TestService/EmptyCall", '/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.SerializeToString, request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString, response_deserializer=messages_pb2.SimpleResponse.FromString,
) )
timeout = test_constants.SHORT_TIMEOUT / 2
# TODO(https://github.com/grpc/grpc/issues/20869 call = hi(messages_pb2.SimpleRequest(), timeout=0.1)
# Update once the async server is ready, change the
# synchronization mechanism by removing the sleep(<timeout>)
# as both components (client & server) will be on the same
# process.
call = empty_call_with_sleep(
messages_pb2.SimpleRequest(), timeout=timeout)
with self.assertRaises(grpc.RpcError) as exception_context: with self.assertRaises(grpc.RpcError) as exception_context:
await call await call

@ -27,11 +27,10 @@ from tests_aio.unit._test_server import start_test_server
from tests_aio.unit._test_base import AioTestBase from tests_aio.unit._test_base import AioTestBase
_UNARY_CALL_METHOD = '/grpc.testing.TestService/UnaryCall' _UNARY_CALL_METHOD = '/grpc.testing.TestService/UnaryCall'
_EMPTY_CALL_METHOD = '/grpc.testing.TestService/EmptyCall'
_STREAMING_OUTPUT_CALL_METHOD = '/grpc.testing.TestService/StreamingOutputCall' _STREAMING_OUTPUT_CALL_METHOD = '/grpc.testing.TestService/StreamingOutputCall'
_NUM_STREAM_RESPONSES = 5 _NUM_STREAM_RESPONSES = 5
_RESPONSE_PAYLOAD_SIZE = 42 _RESPONSE_PAYLOAD_SIZE = 42
_UNREACHABLE_TARGET = '0.1:1111'
class TestChannel(AioTestBase): class TestChannel(AioTestBase):
@ -62,21 +61,15 @@ class TestChannel(AioTestBase):
self.assertIsInstance(response, messages_pb2.SimpleResponse) self.assertIsInstance(response, messages_pb2.SimpleResponse)
async def test_unary_call_times_out(self): async def test_unary_call_times_out(self):
async with aio.insecure_channel(self._server_target) as channel: async with aio.insecure_channel(_UNREACHABLE_TARGET) as channel:
empty_call_with_sleep = channel.unary_unary( hi = channel.unary_unary(
_EMPTY_CALL_METHOD, _UNARY_CALL_METHOD,
request_serializer=messages_pb2.SimpleRequest.SerializeToString, request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString, response_deserializer=messages_pb2.SimpleResponse.FromString,
) )
timeout = test_constants.SHORT_TIMEOUT / 2
# TODO(https://github.com/grpc/grpc/issues/20869)
# Update once the async server is ready, change the
# synchronization mechanism by removing the sleep(<timeout>)
# as both components (client & server) will be on the same
# process.
with self.assertRaises(grpc.RpcError) as exception_context: with self.assertRaises(grpc.RpcError) as exception_context:
await empty_call_with_sleep( await hi(messages_pb2.SimpleRequest(), timeout=1.0)
messages_pb2.SimpleRequest(), timeout=timeout)
_, details = grpc.StatusCode.DEADLINE_EXCEEDED.value # pylint: disable=unused-variable _, details = grpc.StatusCode.DEADLINE_EXCEEDED.value # pylint: disable=unused-variable
self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED,
@ -87,19 +80,6 @@ class TestChannel(AioTestBase):
self.assertIsNotNone( self.assertIsNotNone(
exception_context.exception.trailing_metadata()) exception_context.exception.trailing_metadata())
@unittest.skip('https://github.com/grpc/grpc/issues/20818')
async def test_call_to_the_void(self):
channel = aio.insecure_channel('0.1.1.1:1111')
hi = channel.unary_unary(
_UNARY_CALL_METHOD,
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString)
response = await hi(messages_pb2.SimpleRequest())
self.assertIsInstance(response, messages_pb2.SimpleResponse)
await channel.close()
async def test_unary_stream(self): async def test_unary_stream(self):
channel = aio.insecure_channel(self._server_target) channel = aio.insecure_channel(self._server_target)
stub = test_pb2_grpc.TestServiceStub(channel) stub = test_pb2_grpc.TestServiceStub(channel)
@ -127,5 +107,5 @@ class TestChannel(AioTestBase):
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.WARN)
unittest.main(verbosity=2) unittest.main(verbosity=2)

Loading…
Cancel
Save