Catch InteralError in _write (#27240)

pull/27746/head
clayg 3 years ago committed by GitHub
parent 4567af504e
commit fbf9801b83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/python/grpcio/grpc/aio/_call.py
  2. 51
      src/python/grpcio_tests/tests_aio/unit/server_test.py

@ -444,6 +444,8 @@ class _StreamRequestMixin(Call):
self._request_serializer)
try:
await self._cython_call.send_serialized_message(serialized_request)
except cygrpc.InternalError:
await self._raise_for_status()
except asyncio.CancelledError:
if not self.cancelled():
self.cancel()

@ -13,9 +13,7 @@
# limitations under the License.
import asyncio
import gc
import logging
import socket
import time
import unittest
@ -40,6 +38,7 @@ _STREAM_STREAM_READER_WRITER = '/test/StreamStreamReaderWriter'
_STREAM_STREAM_EVILLY_MIXED = '/test/StreamStreamEvillyMixed'
_UNIMPLEMENTED_METHOD = '/test/UnimplementedMethod'
_ERROR_IN_STREAM_STREAM = '/test/ErrorInStreamStream'
_ERROR_IN_STREAM_UNARY = '/test/ErrorInStreamUnary'
_ERROR_WITHOUT_RAISE_IN_UNARY_UNARY = '/test/ErrorWithoutRaiseInUnaryUnary'
_ERROR_WITHOUT_RAISE_IN_STREAM_STREAM = '/test/ErrorWithoutRaiseInStreamStream'
@ -91,6 +90,9 @@ class _GenericHandler(grpc.GenericRpcHandler):
_ERROR_IN_STREAM_STREAM:
grpc.stream_stream_rpc_method_handler(
self._error_in_stream_stream),
_ERROR_IN_STREAM_UNARY:
grpc.stream_unary_rpc_method_handler(
self._value_error_in_stream_unary),
_ERROR_WITHOUT_RAISE_IN_UNARY_UNARY:
grpc.unary_unary_rpc_method_handler(
self._error_without_raise_in_unary_unary),
@ -179,6 +181,14 @@ class _GenericHandler(grpc.GenericRpcHandler):
raise RuntimeError('A testing RuntimeError!')
yield _RESPONSE
async def _value_error_in_stream_unary(self, request_iterator, context):
request_count = 0
async for request in request_iterator:
assert _REQUEST == request
request_count += 1
if request_count >= 1:
raise ValueError('The test server has a bug!')
async def _error_without_raise_in_unary_unary(self, request, context):
assert _REQUEST == request
context.set_code(grpc.StatusCode.INTERNAL)
@ -270,6 +280,24 @@ class TestServer(AioTestBase):
self.assertEqual(_RESPONSE, response)
self.assertEqual(await call.code(), grpc.StatusCode.OK)
async def test_stream_unary_async_generator_with_request_iter(self):
stream_unary_call = self._channel.stream_unary(_STREAM_UNARY_ASYNC_GEN)
finished = False
def request_gen():
for _ in range(_NUM_STREAM_REQUESTS):
yield _REQUEST
nonlocal finished
finished = True
call = stream_unary_call(request_gen())
response = await call
self.assertEqual(_RESPONSE, response)
self.assertEqual(await call.code(), grpc.StatusCode.OK)
self.assertEqual(finished, True)
async def test_stream_unary_reader_writer(self):
stream_unary_call = self._channel.stream_unary(
_STREAM_UNARY_READER_WRITER)
@ -468,6 +496,25 @@ class TestServer(AioTestBase):
self.assertEqual(grpc.StatusCode.INTERNAL, await call.code())
async def test_error_in_stream_unary(self):
stream_unary_call = self._channel.stream_unary(_ERROR_IN_STREAM_UNARY)
finished = False
def request_gen():
for _ in range(_NUM_STREAM_REQUESTS):
yield _REQUEST
nonlocal finished
finished = True
call = stream_unary_call(request_gen())
with self.assertRaises(aio.AioRpcError) as exception_context:
await call
rpc_error = exception_context.exception
self.assertEqual(grpc.StatusCode.UNKNOWN, rpc_error.code())
self.assertEqual(finished, False)
async def test_port_binding_exception(self):
server = aio.server(options=(('grpc.so_reuseport', 0),))
port = server.add_insecure_port('localhost:0')

Loading…
Cancel
Save