diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi index a5189ad5267..712a589798e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -68,9 +68,3 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): cdef void _poll(self) nogil cdef shutdown(self) - - -cdef class CallbackCompletionQueue(BaseCompletionQueue): - cdef object _shutdown_completed # asyncio.Future - cdef CallbackWrapper _wrapper - cdef object _loop diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index e3c31882d49..eb33e41aebb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -172,23 +172,3 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): context.callback_wrapper, event.success ) - - -cdef class CallbackCompletionQueue(BaseCompletionQueue): - - def __cinit__(self): - self._loop = get_working_loop() - self._shutdown_completed = self._loop.create_future() - self._wrapper = CallbackWrapper( - self._shutdown_completed, - self._loop, - CQ_SHUTDOWN_FAILURE_HANDLER) - self._cq = grpc_completion_queue_create_for_callback( - self._wrapper.c_functor(), - NULL - ) - - async def shutdown(self): - grpc_completion_queue_shutdown(self._cq) - await self._shutdown_completed - grpc_completion_queue_destroy(self._cq) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi index 9aec95520fb..7f9f52da7c0 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -19,6 +19,8 @@ cdef _AioState _global_aio_state = _AioState() class AsyncIOEngine(enum.Enum): + # NOTE(lidiz) the support for custom_io_manager is removed in favor of the + # EventEngine project, which will be the only IO platform in Core. CUSTOM_IO_MANAGER = 'custom_io_manager' POLLER = 'poller' @@ -40,29 +42,6 @@ cdef class _AioState: self.cq = None -cdef _initialize_custom_io_manager(): - # Activates asyncio IO manager. - # NOTE(lidiz) Custom IO manager must be activated before the first - # `grpc_init()`. Otherwise, some special configurations in Core won't - # pick up the change, and resulted in SEGFAULT or ABORT. - install_asyncio_iomgr() - - # Initializes gRPC Core, must be called before other Core API - grpc_init() - - # Timers are triggered by the Asyncio loop. We disable - # the background thread that is being used by the native - # gRPC iomgr. - grpc_timer_manager_set_threading(False) - - # gRPC callbaks are executed within the same thread used by the Asyncio - # event loop, as it is being done by the other Asyncio callbacks. - Executor.SetThreadingAll(False) - - # Creates the only completion queue - _global_aio_state.cq = CallbackCompletionQueue() - - cdef _initialize_poller(): # Initializes gRPC Core, must be called before other Core API grpc_init() @@ -80,9 +59,7 @@ cdef _actual_aio_initialization(): _LOGGER.debug('Using %s as I/O engine', _global_aio_state.engine) # Initializes the process-level state accordingly - if _global_aio_state.engine is AsyncIOEngine.CUSTOM_IO_MANAGER: - _initialize_custom_io_manager() - elif _global_aio_state.engine is AsyncIOEngine.POLLER: + if _global_aio_state.engine is AsyncIOEngine.POLLER: _initialize_poller() else: raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine) @@ -98,13 +75,7 @@ def _grpc_shutdown_wrapper(_): cdef _actual_aio_shutdown(): - if _global_aio_state.engine is AsyncIOEngine.CUSTOM_IO_MANAGER: - future = schedule_coro_threadsafe( - _global_aio_state.cq.shutdown(), - (_global_aio_state.cq)._loop - ) - future.add_done_callback(_grpc_shutdown_wrapper) - elif _global_aio_state.engine is AsyncIOEngine.POLLER: + if _global_aio_state.engine is AsyncIOEngine.POLLER: (_global_aio_state.cq).shutdown() grpc_shutdown() else: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi deleted file mode 100644 index 09232767544..00000000000 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ /dev/null @@ -1,256 +0,0 @@ -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import platform - -from cpython cimport Py_INCREF, Py_DECREF -from libc cimport string - -import socket as native_socket -try: - import ipaddress # CPython 3.3 and above -except ImportError: - pass - -cdef grpc_socket_vtable asyncio_socket_vtable -cdef grpc_custom_resolver_vtable asyncio_resolver_vtable -cdef grpc_custom_timer_vtable asyncio_timer_vtable -cdef grpc_custom_poller_vtable asyncio_pollset_vtable -cdef bint so_reuse_port - - -cdef grpc_error_handle asyncio_socket_init( - grpc_custom_socket* grpc_socket, - int domain) with gil: - socket = _AsyncioSocket.create(grpc_socket, None, None) - Py_INCREF(socket) - grpc_socket.impl = socket - return 0 - - -cdef void asyncio_socket_destroy(grpc_custom_socket* grpc_socket) with gil: - Py_DECREF(<_AsyncioSocket>grpc_socket.impl) - - -cdef void asyncio_socket_connect( - grpc_custom_socket* grpc_socket, - const grpc_sockaddr* addr, - size_t addr_len, - grpc_custom_connect_callback connect_cb) with gil: - host, port = sockaddr_to_tuple(addr, addr_len) - socket = <_AsyncioSocket>grpc_socket.impl - socket.connect(host, port, connect_cb) - - -cdef void asyncio_socket_close( - grpc_custom_socket* grpc_socket, - grpc_custom_close_callback close_cb) with gil: - socket = (<_AsyncioSocket>grpc_socket.impl) - socket.close() - close_cb(grpc_socket) - - -cdef void asyncio_socket_shutdown(grpc_custom_socket* grpc_socket) with gil: - socket = (<_AsyncioSocket>grpc_socket.impl) - socket.close() - - -cdef void asyncio_socket_write( - grpc_custom_socket* grpc_socket, - grpc_slice_buffer* slice_buffer, - grpc_custom_write_callback write_cb) with gil: - socket = (<_AsyncioSocket>grpc_socket.impl) - socket.write(slice_buffer, write_cb) - - -cdef void asyncio_socket_read( - grpc_custom_socket* grpc_socket, - char* buffer_, - size_t length, - grpc_custom_read_callback read_cb) with gil: - socket = (<_AsyncioSocket>grpc_socket.impl) - socket.read(buffer_, length, read_cb) - - -cdef grpc_error_handle asyncio_socket_getpeername( - grpc_custom_socket* grpc_socket, - const grpc_sockaddr* addr, - int* length) with gil: - peer = (<_AsyncioSocket>grpc_socket.impl).peername() - - cdef grpc_resolved_address c_addr - hostname = str_to_bytes(peer[0]) - grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) - # TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy - string.memcpy(addr, c_addr.addr, c_addr.len) - length[0] = c_addr.len - return grpc_error_none() - - -cdef grpc_error_handle asyncio_socket_getsockname( - grpc_custom_socket* grpc_socket, - const grpc_sockaddr* addr, - int* length) with gil: - """Supplies sock_addr in add_socket_to_server.""" - cdef grpc_resolved_address c_addr - socket = (<_AsyncioSocket>grpc_socket.impl) - if socket is None: - peer = ('0.0.0.0', 0) - else: - peer = socket.sockname() - hostname = str_to_bytes(peer[0]) - grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) - # TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy - string.memcpy(addr, c_addr.addr, c_addr.len) - length[0] = c_addr.len - return grpc_error_none() - - -cdef grpc_error_handle asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil: - (<_AsyncioSocket>grpc_socket.impl).listen() - return grpc_error_none() - - -def _asyncio_apply_socket_options(object s, int flags): - # Turn SO_REUSEADDR on for TCP sockets; if we want to support UDS, we will - # need to update this function. - s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1) - # SO_REUSEPORT only available in POSIX systems. - if platform.system() != 'Windows': - if GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT & flags: - s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEPORT, 1) - s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) - - -cdef grpc_error_handle asyncio_socket_bind( - grpc_custom_socket* grpc_socket, - const grpc_sockaddr* addr, - size_t len, int flags) with gil: - host, port = sockaddr_to_tuple(addr, len) - try: - ip = ipaddress.ip_address(host) - if isinstance(ip, ipaddress.IPv6Address): - family = native_socket.AF_INET6 - else: - family = native_socket.AF_INET - - socket = native_socket.socket(family=family) - _asyncio_apply_socket_options(socket, flags) - socket.bind((host, port)) - except IOError as io_error: - socket.close() - return socket_error("bind", str(io_error)) - else: - aio_socket = _AsyncioSocket.create_with_py_socket(grpc_socket, socket) - cpython.Py_INCREF(aio_socket) # Py_DECREF in asyncio_socket_destroy - grpc_socket.impl = aio_socket - return grpc_error_none() - - -cdef void asyncio_socket_accept( - grpc_custom_socket* grpc_socket, - grpc_custom_socket* grpc_socket_client, - grpc_custom_accept_callback accept_cb) with gil: - (<_AsyncioSocket>grpc_socket.impl).accept(grpc_socket_client, accept_cb) - - -cdef grpc_error_handle asyncio_resolve( - const char* host, - const char* port, - grpc_resolved_addresses** res) with gil: - result = native_socket.getaddrinfo(host, port) - res[0] = tuples_to_resolvaddr(result) - - -cdef void asyncio_resolve_async( - grpc_custom_resolver* grpc_resolver, - const char* host, - const char* port) with gil: - resolver = _AsyncioResolver.create(grpc_resolver) - resolver.resolve(host, port) - - -cdef void asyncio_timer_start(grpc_custom_timer* grpc_timer) with gil: - timer = _AsyncioTimer.create(grpc_timer, grpc_timer.timeout_ms / 1000.0) - grpc_timer.timer = timer - - -cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil: - # TODO(https://github.com/grpc/grpc/issues/22278) remove this if condition - if grpc_timer.timer == NULL: - return - else: - timer = <_AsyncioTimer>grpc_timer.timer - timer.stop() - - -cdef void asyncio_init_loop() with gil: - pass - - -cdef void asyncio_destroy_loop() with gil: - pass - - -cdef void asyncio_kick_loop() with gil: - pass - - -cdef grpc_error* asyncio_run_loop(size_t timeout_ms) with gil: - return grpc_error_none() - - -def _auth_plugin_callback_wrapper(object cb, - str service_url, - str method_name, - object callback): - get_working_loop().call_soon(cb, service_url, method_name, callback) - - -def install_asyncio_iomgr(): - # Auth plugins invoke user provided logic in another thread by default. We - # need to override that behavior by registering the call to the event loop. - set_async_callback_func(_auth_plugin_callback_wrapper) - - asyncio_resolver_vtable.resolve = asyncio_resolve - asyncio_resolver_vtable.resolve_async = asyncio_resolve_async - - asyncio_socket_vtable.init = asyncio_socket_init - asyncio_socket_vtable.connect = asyncio_socket_connect - asyncio_socket_vtable.destroy = asyncio_socket_destroy - asyncio_socket_vtable.shutdown = asyncio_socket_shutdown - asyncio_socket_vtable.close = asyncio_socket_close - asyncio_socket_vtable.write = asyncio_socket_write - asyncio_socket_vtable.read = asyncio_socket_read - asyncio_socket_vtable.getpeername = asyncio_socket_getpeername - asyncio_socket_vtable.getsockname = asyncio_socket_getsockname - asyncio_socket_vtable.bind = asyncio_socket_bind - asyncio_socket_vtable.listen = asyncio_socket_listen - asyncio_socket_vtable.accept = asyncio_socket_accept - - asyncio_timer_vtable.start = asyncio_timer_start - asyncio_timer_vtable.stop = asyncio_timer_stop - - asyncio_pollset_vtable.init = asyncio_init_loop - asyncio_pollset_vtable.poll = asyncio_run_loop - asyncio_pollset_vtable.kick = asyncio_kick_loop - asyncio_pollset_vtable.shutdown = asyncio_destroy_loop - - grpc_custom_iomgr_init( - &asyncio_socket_vtable, - &asyncio_resolver_vtable, - &asyncio_timer_vtable, - &asyncio_pollset_vtable - ) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi deleted file mode 100644 index 51730c15976..00000000000 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cdef class _AsyncioResolver: - cdef: - object _loop - grpc_custom_resolver* _grpc_resolver - object _task_resolve - - @staticmethod - cdef _AsyncioResolver create(grpc_custom_resolver* grpc_resolver) - - cdef void resolve(self, const char* host, const char* port) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi deleted file mode 100644 index 64e3e6396e9..00000000000 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -cdef class _AsyncioResolver: - def __cinit__(self): - self._loop = get_working_loop() - self._grpc_resolver = NULL - self._task_resolve = None - - @staticmethod - cdef _AsyncioResolver create(grpc_custom_resolver* grpc_resolver): - resolver = _AsyncioResolver() - resolver._grpc_resolver = grpc_resolver - return resolver - - def __repr__(self): - class_name = self.__class__.__name__ - id_ = id(self) - return f"<{class_name} {id_}>" - - async def _async_resolve(self, bytes host, bytes port): - self._task_resolve = None - try: - resolved = await self._loop.getaddrinfo(host, port) - except Exception as e: - grpc_custom_resolve_callback( - self._grpc_resolver, - NULL, - grpc_socket_error("Resolve address [{}:{}] failed: {}: {}".format( - host, port, type(e), str(e)).encode()) - ) - else: - grpc_custom_resolve_callback( - self._grpc_resolver, - tuples_to_resolvaddr(resolved), - 0 - ) - - cdef void resolve(self, const char* host, const char* port): - assert not self._task_resolve - - self._task_resolve = self._loop.create_task( - self._async_resolve(host, port) - ) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi deleted file mode 100644 index cfab5549b2e..00000000000 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -cdef class _AsyncioSocket: - cdef: - # Common attributes - grpc_custom_socket * _grpc_socket - grpc_custom_read_callback _grpc_read_cb - grpc_custom_write_callback _grpc_write_cb - object _reader - object _writer - object _task_read - object _task_write - object _task_connect - object _task_listen - char * _read_buffer - # Caches the picked event loop, so we can avoid the 30ns overhead each - # time we need access to the event loop. - object _loop - # TODO(lidiz) Drop after 3.6 deprecation. Python 3.7 introduces methods - # like `is_closing()` to help graceful shutdown. - bint _closed - - # Client-side attributes - grpc_custom_connect_callback _grpc_connect_cb - - # Server-side attributes - grpc_custom_accept_callback _grpc_accept_cb - grpc_custom_socket * _grpc_client_socket - object _server - object _py_socket - object _peername - - @staticmethod - cdef _AsyncioSocket create( - grpc_custom_socket * grpc_socket, - object reader, - object writer) - @staticmethod - cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket) - - cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb) - cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb) - cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb) - cdef bint is_connected(self) - cdef void close(self) - - cdef accept(self, grpc_custom_socket* grpc_socket_client, grpc_custom_accept_callback grpc_accept_cb) - cdef listen(self) - cdef tuple peername(self) - cdef tuple sockname(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi deleted file mode 100644 index 43817ef87bf..00000000000 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ /dev/null @@ -1,225 +0,0 @@ -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import socket as native_socket - -from libc cimport string - -cdef int _ASYNCIO_STREAM_DEFAULT_SOCKET_BACKLOG = 100 - - -# TODO(https://github.com/grpc/grpc/issues/21348) Better flow control needed. -cdef class _AsyncioSocket: - def __cinit__(self): - self._grpc_socket = NULL - self._grpc_connect_cb = NULL - self._grpc_read_cb = NULL - self._grpc_write_cb = NULL - self._reader = None - self._writer = None - self._task_connect = None - self._task_read = None - self._task_write = None - self._task_listen = None - self._read_buffer = NULL - self._server = None - self._py_socket = None - self._peername = None - self._closed = False - self._loop = get_working_loop() - - @staticmethod - cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket, - object reader, - object writer): - socket = _AsyncioSocket() - socket._grpc_socket = grpc_socket - socket._reader = reader - socket._writer = writer - if writer is not None: - socket._peername = writer.get_extra_info('peername') - return socket - - @staticmethod - cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket): - socket = _AsyncioSocket() - socket._grpc_socket = grpc_socket - socket._py_socket = py_socket - return socket - - def __repr__(self): - class_name = self.__class__.__name__ - id_ = id(self) - connected = self.is_connected() - return f"<{class_name} {id_} connected={connected}>" - - async def _async_connect(self, object host, object port,): - self._task_connect = None - try: - self._reader, self._writer = await asyncio.open_connection(host, port) - except Exception as e: - self._grpc_connect_cb( - self._grpc_socket, - grpc_socket_error("Socket connect failed: {}: {}".format(type(e), str(e)).encode()) - ) - else: - # gRPC default posix implementation disables nagle - # algorithm. - sock = self._writer.transport.get_extra_info('socket') - sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) - - self._grpc_connect_cb( - self._grpc_socket, - 0 - ) - - cdef void connect(self, - object host, - object port, - grpc_custom_connect_callback grpc_connect_cb): - assert not self._reader - assert not self._task_connect - - self._task_connect = self._loop.create_task( - self._async_connect(host, port) - ) - self._grpc_connect_cb = grpc_connect_cb - - async def _async_read(self, size_t length): - self._task_read = None - try: - inbound_buffer = await self._reader.read(n=length) - except ConnectionError as e: - self._grpc_read_cb( - self._grpc_socket, - -1, - grpc_socket_error("Read failed: {}".format(e).encode()) - ) - else: - string.memcpy( - self._read_buffer, - inbound_buffer, - len(inbound_buffer) - ) - self._grpc_read_cb( - self._grpc_socket, - len(inbound_buffer), - 0 - ) - - cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb): - assert not self._task_read - - self._grpc_read_cb = grpc_read_cb - self._read_buffer = buffer_ - self._task_read = self._loop.create_task(self._async_read(length)) - - async def _async_write(self, bytearray outbound_buffer): - self._writer.write(outbound_buffer) - self._task_write = None - try: - await self._writer.drain() - self._grpc_write_cb( - self._grpc_socket, - 0 - ) - except ConnectionError as connection_error: - self._grpc_write_cb( - self._grpc_socket, - grpc_socket_error("Socket write failed: {}".format(connection_error).encode()), - ) - - cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb): - """Performs write to network socket in AsyncIO. - - For each socket, Core guarantees there'll be only one ongoing write. - When the write is finished, we need to call grpc_write_cb to notify - Core that the work is done. - """ - assert not self._task_write - cdef char* start - cdef bytearray outbound_buffer = bytearray() - for i in range(g_slice_buffer.count): - start = grpc_slice_buffer_start(g_slice_buffer, i) - length = grpc_slice_buffer_length(g_slice_buffer, i) - outbound_buffer.extend(start[:length]) - - self._grpc_write_cb = grpc_write_cb - self._task_write = self._loop.create_task(self._async_write(outbound_buffer)) - - cdef bint is_connected(self): - return self._reader and not self._reader._transport.is_closing() - - cdef void close(self): - if self._closed: - return - else: - self._closed = True - if self.is_connected(): - self._writer.close() - if self._task_listen and not self._task_listen.done(): - self._task_listen.close() - if self._server: - self._server.close() - # NOTE(lidiz) If the asyncio.Server is created from a Python socket, - # the server.close() won't release the fd until the close() is called - # for the Python socket. - if self._py_socket: - self._py_socket.close() - - def _new_connection_callback(self, object reader, object writer): - # If the socket is closed, stop. - if self._closed: - return - - # Close the connection if server is not started yet. - if self._grpc_accept_cb == NULL: - writer.close() - return - - client_socket = _AsyncioSocket.create( - self._grpc_client_socket, - reader, - writer, - ) - - self._grpc_client_socket.impl = client_socket - cpython.Py_INCREF(client_socket) # Py_DECREF in asyncio_socket_destroy - # Accept callback expects to be called with: - # * grpc_custom_socket: A grpc custom socket for server - # * grpc_custom_socket: A grpc custom socket for client (with new Socket instance) - # * grpc_error: An error object - self._grpc_accept_cb(self._grpc_socket, self._grpc_client_socket, grpc_error_none()) - - cdef listen(self): - self._py_socket.listen(_ASYNCIO_STREAM_DEFAULT_SOCKET_BACKLOG) - async def create_asyncio_server(): - self._server = await asyncio.start_server( - self._new_connection_callback, - sock=self._py_socket, - ) - - self._task_listen = self._loop.create_task(create_asyncio_server()) - - cdef accept(self, - grpc_custom_socket* grpc_socket_client, - grpc_custom_accept_callback grpc_accept_cb): - self._grpc_client_socket = grpc_socket_client - self._grpc_accept_cb = grpc_accept_cb - - cdef tuple peername(self): - return self._peername - - cdef tuple sockname(self): - return self._py_socket.getsockname() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi deleted file mode 100644 index 76c3be0c57c..00000000000 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cdef class _AsyncioTimer: - cdef: - grpc_custom_timer * _grpc_timer - object _timer_future - bint _active - object _loop - - @staticmethod - cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout) - - cdef stop(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi deleted file mode 100644 index f2f7f492901..00000000000 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi +++ /dev/null @@ -1,48 +0,0 @@ -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -cdef class _AsyncioTimer: - def __cinit__(self): - self._grpc_timer = NULL - self._timer_future = None - self._active = False - self._loop = get_working_loop() - cpython.Py_INCREF(self) - - @staticmethod - cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout): - timer = _AsyncioTimer() - timer._grpc_timer = grpc_timer - timer._timer_future = timer._loop.call_later(timeout, timer.on_time_up) - timer._active = True - return timer - - def on_time_up(self): - self._active = False - grpc_custom_timer_callback(self._grpc_timer, 0) - cpython.Py_DECREF(self) - - def __repr__(self): - class_name = self.__class__.__name__ - id_ = id(self) - return f"<{class_name} {id_} deadline={self._deadline} active={self._active}>" - - cdef stop(self): - if not self._active: - return - - self._timer_future.cancel() - self._active = False - cpython.Py_DECREF(self) diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index 166be370227..70c5b5d31d0 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -42,9 +42,6 @@ IF UNAME_SYSNAME != "Windows": include "_cygrpc/fork_posix.pxd.pxi" # Following pxi files are part of the Aio module -include "_cygrpc/aio/iomgr/socket.pxd.pxi" -include "_cygrpc/aio/iomgr/timer.pxd.pxi" -include "_cygrpc/aio/iomgr/resolver.pxd.pxi" include "_cygrpc/aio/completion_queue.pxd.pxi" include "_cygrpc/aio/rpc_status.pxd.pxi" include "_cygrpc/aio/grpc_aio.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index b958975bb80..4c0452d700f 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -68,10 +68,6 @@ ELSE: include "_cygrpc/fork_posix.pyx.pxi" # Following pxi files are part of the Aio module -include "_cygrpc/aio/iomgr/iomgr.pyx.pxi" -include "_cygrpc/aio/iomgr/socket.pyx.pxi" -include "_cygrpc/aio/iomgr/timer.pyx.pxi" -include "_cygrpc/aio/iomgr/resolver.pyx.pxi" include "_cygrpc/aio/common.pyx.pxi" include "_cygrpc/aio/rpc_status.pyx.pxi" include "_cygrpc/aio/completion_queue.pyx.pxi"