diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi index b5b8c5036cd..708a2745fdf 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -1,4 +1,16 @@ -cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME) +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. cdef class BaseCompletionQueue: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index 85ae0c4561a..4b8b326a2da 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -92,6 +92,7 @@ cdef class CallbackCompletionQueue(BaseCompletionQueue): return self._cq async def shutdown(self): + _LOGGER.debug('CallbackCompletionQueue shutdown') grpc_completion_queue_shutdown(self._cq) await self._shutdown_completed grpc_completion_queue_destroy(self._cq) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi index 44064ef95ce..fe36a4e63aa 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -27,7 +27,7 @@ grpc_aio_engine = None class AsyncIOEngine(enum.Enum): DEFAULT = 'default' CUSTOM_IO_MANAGER = 'custom' - CQ_POLLER = 'poller' + POLLER = 'poller' def init_grpc_aio(): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi index cd425d2e941..cfab5549b2e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi @@ -24,10 +24,14 @@ cdef class _AsyncioSocket: object _task_read object _task_write object _task_connect + object _task_listen char * _read_buffer # Caches the picked event loop, so we can avoid the 30ns overhead each # time we need access to the event loop. object _loop + # TODO(lidiz) Drop after 3.6 deprecation. Python 3.7 introduces methods + # like `is_closing()` to help graceful shutdown. + bint _closed # Client-side attributes grpc_custom_connect_callback _grpc_connect_cb diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 65ee6e24e59..f46e657c263 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -31,10 +31,12 @@ cdef class _AsyncioSocket: self._task_connect = None self._task_read = None self._task_write = None + self._task_listen = None self._read_buffer = NULL self._server = None self._py_socket = None self._peername = None + self._closed = False @staticmethod cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket, @@ -159,8 +161,14 @@ cdef class _AsyncioSocket: return self._reader and not self._reader._transport.is_closing() cdef void close(self): + if self._closed: + return + else: + self._closed = True if self.is_connected(): self._writer.close() + if self._task_listen and not self._task_listen.done(): + self._task_listen.close() if self._server: self._server.close() # NOTE(lidiz) If the asyncio.Server is created from a Python socket, @@ -170,6 +178,10 @@ cdef class _AsyncioSocket: self._py_socket.close() def _new_connection_callback(self, object reader, object writer): + # If the socket is closed, stop. + if self._closed: + return + # Close the connection if server is not started yet. if self._grpc_accept_cb == NULL: writer.close() @@ -197,7 +209,7 @@ cdef class _AsyncioSocket: sock=self._py_socket, ) - grpc_aio_loop().create_task(create_asyncio_server()) + self._task_listen = grpc_aio_loop().create_task(create_asyncio_server()) cdef accept(self, grpc_custom_socket* grpc_socket_client, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index d8ed9a41fe7..1b4f61a9c36 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -754,6 +754,7 @@ cdef class AioServer: grace: An optional float indicating the length of grace period in seconds. """ + _LOGGER.debug('server shutdown') if self._status == AIO_SERVER_STATUS_READY or self._status == AIO_SERVER_STATUS_STOPPED: return