|
|
|
@ -781,6 +781,40 @@ cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHan |
|
|
|
|
InternalError) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cdef class _ConcurrentRpcLimiter: |
|
|
|
|
|
|
|
|
|
def __cinit__(self, int maximum_concurrent_rpcs, object loop): |
|
|
|
|
if maximum_concurrent_rpcs <= 0: |
|
|
|
|
raise ValueError("maximum_concurrent_rpcs should be a postive integer") |
|
|
|
|
self._maximum_concurrent_rpcs = maximum_concurrent_rpcs |
|
|
|
|
self._active_rpcs = 0 |
|
|
|
|
self._active_rpcs_condition = asyncio.Condition() |
|
|
|
|
self._loop = loop |
|
|
|
|
|
|
|
|
|
async def check_before_request_call(self): |
|
|
|
|
await self._active_rpcs_condition.acquire() |
|
|
|
|
try: |
|
|
|
|
predicate = lambda: self._active_rpcs < self._maximum_concurrent_rpcs |
|
|
|
|
await self._active_rpcs_condition.wait_for(predicate) |
|
|
|
|
self._active_rpcs += 1 |
|
|
|
|
finally: |
|
|
|
|
self._active_rpcs_condition.release() |
|
|
|
|
|
|
|
|
|
async def _decrease_active_rpcs_count_with_lock(self): |
|
|
|
|
await self._active_rpcs_condition.acquire() |
|
|
|
|
try: |
|
|
|
|
self._active_rpcs -= 1 |
|
|
|
|
self._active_rpcs_condition.notify() |
|
|
|
|
finally: |
|
|
|
|
self._active_rpcs_condition.release() |
|
|
|
|
|
|
|
|
|
def _decrease_active_rpcs_count(self, unused_future): |
|
|
|
|
self._loop.create_task(self._decrease_active_rpcs_count_with_lock()) |
|
|
|
|
|
|
|
|
|
def decrease_once_finished(self, object rpc_task): |
|
|
|
|
rpc_task.add_done_callback(self._decrease_active_rpcs_count) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cdef class AioServer: |
|
|
|
|
|
|
|
|
|
def __init__(self, loop, thread_pool, generic_handlers, interceptors, |
|
|
|
@ -815,9 +849,9 @@ cdef class AioServer: |
|
|
|
|
self._interceptors = () |
|
|
|
|
|
|
|
|
|
self._thread_pool = thread_pool |
|
|
|
|
|
|
|
|
|
if maximum_concurrent_rpcs: |
|
|
|
|
raise NotImplementedError() |
|
|
|
|
if maximum_concurrent_rpcs is not None: |
|
|
|
|
self._limiter = _ConcurrentRpcLimiter(maximum_concurrent_rpcs, |
|
|
|
|
loop) |
|
|
|
|
|
|
|
|
|
def add_generic_rpc_handlers(self, object generic_rpc_handlers): |
|
|
|
|
self._generic_handlers.extend(generic_rpc_handlers) |
|
|
|
@ -860,6 +894,9 @@ cdef class AioServer: |
|
|
|
|
if self._status != AIO_SERVER_STATUS_RUNNING: |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
if self._limiter is not None: |
|
|
|
|
await self._limiter.check_before_request_call() |
|
|
|
|
|
|
|
|
|
# Accepts new request from Core |
|
|
|
|
rpc_state = await self._request_call() |
|
|
|
|
|
|
|
|
@ -874,7 +911,7 @@ cdef class AioServer: |
|
|
|
|
self._loop) |
|
|
|
|
|
|
|
|
|
# Fires off a task that listens on the cancellation from client. |
|
|
|
|
self._loop.create_task( |
|
|
|
|
rpc_task = self._loop.create_task( |
|
|
|
|
_schedule_rpc_coro( |
|
|
|
|
rpc_coro, |
|
|
|
|
rpc_state, |
|
|
|
@ -882,6 +919,9 @@ cdef class AioServer: |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if self._limiter is not None: |
|
|
|
|
self._limiter.decrease_once_finished(rpc_task) |
|
|
|
|
|
|
|
|
|
def _serving_task_crash_handler(self, object task): |
|
|
|
|
"""Shutdown the server immediately if unexpectedly exited.""" |
|
|
|
|
if task.cancelled(): |
|
|
|
|