[Python AIO] Raise resource_exhausted error in case of concurrent RPC limit exceeded (#35376)

Fix: https://github.com/grpc/grpc/issues/30424

Currently the implementation of `maximum_concurrent_rpcs` flag in AIO is different with docstring, in implementation, we're waiting for RPCs to finish and continue execution instead of raising `resource_exhausted` as mentioned in docstring.

This PR changes the implementation of `maximum_concurrent_rpcs` flag in AIO stack to match the docstring.

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #35376

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35376 from XuanWang-Amos:fix_aio_concurrent_rpc 5b2b59fb9f
PiperOrigin-RevId: 601490418
pull/35658/head^2
Xuan Wang 1 year ago committed by Copybara-Service
parent 80f3a90556
commit bdfacdec7f
  1. 3
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi
  2. 52
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  3. 29
      src/python/grpcio_tests/tests_aio/unit/server_test.py

@ -72,8 +72,7 @@ cdef enum AioServerStatus:
cdef class _ConcurrentRpcLimiter:
cdef int _maximum_concurrent_rpcs
cdef int _active_rpcs
cdef object _active_rpcs_condition # asyncio.Condition
cdef object _loop # asyncio.EventLoop
cdef bint limiter_concurrency_exceeded
cdef class AioServer:

@ -783,7 +783,7 @@ async def _schedule_rpc_coro(object rpc_coro,
async def _handle_rpc(list generic_handlers, tuple interceptors,
RPCState rpc_state, object loop):
RPCState rpc_state, object loop, bint concurrency_exceeded):
cdef object method_handler
# Finds the method handler (application logic)
method_handler = await _find_method_handler(
@ -804,6 +804,18 @@ async def _handle_rpc(list generic_handlers, tuple interceptors,
)
return
if concurrency_exceeded:
rpc_state.status_sent = True
await _send_error_status_from_server(
rpc_state,
StatusCode.resource_exhausted,
'Concurrent RPC limit exceeded!',
_IMMUTABLE_EMPTY_METADATA,
rpc_state.create_send_initial_metadata_op_if_not_sent(),
loop
)
return
# Handles unary-unary case
if not method_handler.request_streaming and not method_handler.response_streaming:
await _handle_unary_unary_rpc(method_handler,
@ -847,33 +859,23 @@ cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHan
cdef class _ConcurrentRpcLimiter:
def __cinit__(self, int maximum_concurrent_rpcs, object loop):
def __cinit__(self, int maximum_concurrent_rpcs):
if maximum_concurrent_rpcs <= 0:
raise ValueError("maximum_concurrent_rpcs should be a postive integer")
self._maximum_concurrent_rpcs = maximum_concurrent_rpcs
self._active_rpcs = 0
self._active_rpcs_condition = asyncio.Condition()
self._loop = loop
self.limiter_concurrency_exceeded = False
async def check_before_request_call(self):
await self._active_rpcs_condition.acquire()
try:
predicate = lambda: self._active_rpcs < self._maximum_concurrent_rpcs
await self._active_rpcs_condition.wait_for(predicate)
def check_before_request_call(self):
if self._active_rpcs >= self._maximum_concurrent_rpcs:
self.limiter_concurrency_exceeded = True
else:
self._active_rpcs += 1
finally:
self._active_rpcs_condition.release()
async def _decrease_active_rpcs_count_with_lock(self):
await self._active_rpcs_condition.acquire()
try:
self._active_rpcs -= 1
self._active_rpcs_condition.notify()
finally:
self._active_rpcs_condition.release()
def _decrease_active_rpcs_count(self, unused_future):
self._loop.create_task(self._decrease_active_rpcs_count_with_lock())
self._active_rpcs -= 1
if self._active_rpcs < self._maximum_concurrent_rpcs:
self.limiter_concurrency_exceeded = False
def decrease_once_finished(self, object rpc_task):
rpc_task.add_done_callback(self._decrease_active_rpcs_count)
@ -915,8 +917,7 @@ cdef class AioServer:
self._thread_pool = thread_pool
if maximum_concurrent_rpcs is not None:
self._limiter = _ConcurrentRpcLimiter(maximum_concurrent_rpcs,
loop)
self._limiter = _ConcurrentRpcLimiter(maximum_concurrent_rpcs)
def add_generic_rpc_handlers(self, object generic_rpc_handlers):
self._generic_handlers.extend(generic_rpc_handlers)
@ -959,8 +960,10 @@ cdef class AioServer:
if self._status != AIO_SERVER_STATUS_RUNNING:
break
concurrency_exceeded = False
if self._limiter is not None:
await self._limiter.check_before_request_call()
self._limiter.check_before_request_call()
concurrency_exceeded = self._limiter.limiter_concurrency_exceeded
# Accepts new request from Core
rpc_state = await self._request_call()
@ -973,7 +976,8 @@ cdef class AioServer:
rpc_coro = _handle_rpc(self._generic_handlers,
self._interceptors,
rpc_state,
self._loop)
self._loop,
concurrency_exceeded)
# Fires off a task that listens on the cancellation from client.
rpc_task = self._loop.create_task(

@ -588,14 +588,29 @@ class TestServer(AioTestBase):
coro_wrapper(channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST))
)
rpc_tasks.append(task)
await_tasks = asyncio.wait(
rpc_tasks, return_when=asyncio.FIRST_EXCEPTION
await_tasks = asyncio.wait(rpc_tasks, return_when=asyncio.ALL_COMPLETED)
done, _ = await await_tasks
exceptions = []
for task in done:
exception = task.exception()
if exception:
exceptions.append(exception)
# Check whether the number of tasks raised RESOURCE_EXHAUSTED is roughly
# 2 * _MAXIMUM_CONCURRENT_RPCS.
self.assertAlmostEqual(
len(exceptions),
2 * _MAXIMUM_CONCURRENT_RPCS,
delta=_MAXIMUM_CONCURRENT_RPCS,
)
# Each batch took test_constants.SHORT_TIMEOUT /2
start_time = time.time()
await await_tasks
elapsed_time = time.time() - start_time
self.assertGreater(elapsed_time, test_constants.SHORT_TIMEOUT * 3 / 2)
for exception in exceptions:
self.assertTrue(isinstance(exception, aio.AioRpcError))
self.assertEqual(
grpc.StatusCode.RESOURCE_EXHAUSTED, exception.code()
)
self.assertIn("Concurrent RPC limit exceeded", exception.details())
# Clean-up
await channel.close()
await server.stop(0)

Loading…
Cancel
Save