|
|
|
@ -44,6 +44,7 @@ _NUM_SERVER_CREATED = 100 |
|
|
|
|
_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH = 'grpc.max_receive_message_length' |
|
|
|
|
_MAX_MESSAGE_LENGTH = 1024 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _TestPointerWrapper(object): |
|
|
|
|
|
|
|
|
|
def __int__(self): |
|
|
|
@ -58,9 +59,10 @@ _TEST_CHANNEL_ARGS = ( |
|
|
|
|
('arg6', _TestPointerWrapper()), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_INVALID_TEST_CHANNEL_ARGS = [ |
|
|
|
|
{'foo': 'bar'}, |
|
|
|
|
{ |
|
|
|
|
'foo': 'bar' |
|
|
|
|
}, |
|
|
|
|
(('key',),), |
|
|
|
|
'str', |
|
|
|
|
] |
|
|
|
@ -97,13 +99,16 @@ class TestChannelArgument(AioTestBase): |
|
|
|
|
try: |
|
|
|
|
result = await test_if_reuse_port_enabled(server) |
|
|
|
|
if fact == _ENABLE_REUSE_PORT and not result: |
|
|
|
|
self.fail('Enabled reuse port in options, but not observed in socket') |
|
|
|
|
self.fail( |
|
|
|
|
'Enabled reuse port in options, but not observed in socket' |
|
|
|
|
) |
|
|
|
|
elif fact == _DISABLE_REUSE_PORT and result: |
|
|
|
|
self.fail('Disabled reuse port in options, but observed in socket') |
|
|
|
|
self.fail( |
|
|
|
|
'Disabled reuse port in options, but observed in socket' |
|
|
|
|
) |
|
|
|
|
finally: |
|
|
|
|
await server.stop(None) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def test_client(self): |
|
|
|
|
aio.insecure_channel('[::]:0', options=_TEST_CHANNEL_ARGS) |
|
|
|
|
|
|
|
|
@ -120,33 +125,35 @@ class TestChannelArgument(AioTestBase): |
|
|
|
|
async def test_max_message_length_applied(self): |
|
|
|
|
address, server = await start_test_server() |
|
|
|
|
|
|
|
|
|
async with aio.insecure_channel(address, options=( |
|
|
|
|
(_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, _MAX_MESSAGE_LENGTH), |
|
|
|
|
)) as channel: |
|
|
|
|
async with aio.insecure_channel( |
|
|
|
|
address, |
|
|
|
|
options=((_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, |
|
|
|
|
_MAX_MESSAGE_LENGTH),)) as channel: |
|
|
|
|
stub = test_pb2_grpc.TestServiceStub(channel) |
|
|
|
|
|
|
|
|
|
request = messages_pb2.StreamingOutputCallRequest() |
|
|
|
|
# First request will pass |
|
|
|
|
request.response_parameters.append( |
|
|
|
|
messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH//2,) |
|
|
|
|
) |
|
|
|
|
messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH // 2,)) |
|
|
|
|
# Second request should fail |
|
|
|
|
request.response_parameters.append( |
|
|
|
|
messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH*2,) |
|
|
|
|
) |
|
|
|
|
messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH * 2,)) |
|
|
|
|
|
|
|
|
|
call = stub.StreamingOutputCall(request) |
|
|
|
|
|
|
|
|
|
response = await call.read() |
|
|
|
|
self.assertEqual(_MAX_MESSAGE_LENGTH//2, len(response.payload.body)) |
|
|
|
|
self.assertEqual(_MAX_MESSAGE_LENGTH // 2, |
|
|
|
|
len(response.payload.body)) |
|
|
|
|
|
|
|
|
|
with self.assertRaises(aio.AioRpcError) as exception_context: |
|
|
|
|
await call.read() |
|
|
|
|
rpc_error = exception_context.exception |
|
|
|
|
self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, rpc_error.code()) |
|
|
|
|
self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, |
|
|
|
|
rpc_error.code()) |
|
|
|
|
self.assertIn(str(_MAX_MESSAGE_LENGTH), rpc_error.details()) |
|
|
|
|
|
|
|
|
|
self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, await call.code()) |
|
|
|
|
self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, await |
|
|
|
|
call.code()) |
|
|
|
|
|
|
|
|
|
await server.stop(None) |
|
|
|
|
|
|
|
|
|