[Aio] Remove custom IO manager support (#27090)

* [Aio] Remove custom IO manager support

* Patch
pull/27107/head
Lidi Zheng 3 years ago committed by GitHub
parent 5b7c25155e
commit d3d5dec5b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
  2. 20
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
  3. 37
      src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
  4. 256
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
  5. 24
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi
  6. 56
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi
  7. 63
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi
  8. 225
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  9. 25
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi
  10. 48
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi
  11. 3
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  12. 4
      src/python/grpcio/grpc/_cython/cygrpc.pyx

@ -68,9 +68,3 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
cdef void _poll(self) nogil cdef void _poll(self) nogil
cdef shutdown(self) cdef shutdown(self)
cdef class CallbackCompletionQueue(BaseCompletionQueue):
cdef object _shutdown_completed # asyncio.Future
cdef CallbackWrapper _wrapper
cdef object _loop

@ -172,23 +172,3 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
<CallbackWrapper>context.callback_wrapper, <CallbackWrapper>context.callback_wrapper,
event.success event.success
) )
cdef class CallbackCompletionQueue(BaseCompletionQueue):
def __cinit__(self):
self._loop = get_working_loop()
self._shutdown_completed = self._loop.create_future()
self._wrapper = CallbackWrapper(
self._shutdown_completed,
self._loop,
CQ_SHUTDOWN_FAILURE_HANDLER)
self._cq = grpc_completion_queue_create_for_callback(
self._wrapper.c_functor(),
NULL
)
async def shutdown(self):
grpc_completion_queue_shutdown(self._cq)
await self._shutdown_completed
grpc_completion_queue_destroy(self._cq)

@ -19,6 +19,8 @@ cdef _AioState _global_aio_state = _AioState()
class AsyncIOEngine(enum.Enum): class AsyncIOEngine(enum.Enum):
# NOTE(lidiz) the support for custom_io_manager is removed in favor of the
# EventEngine project, which will be the only IO platform in Core.
CUSTOM_IO_MANAGER = 'custom_io_manager' CUSTOM_IO_MANAGER = 'custom_io_manager'
POLLER = 'poller' POLLER = 'poller'
@ -40,29 +42,6 @@ cdef class _AioState:
self.cq = None self.cq = None
cdef _initialize_custom_io_manager():
# Activates asyncio IO manager.
# NOTE(lidiz) Custom IO manager must be activated before the first
# `grpc_init()`. Otherwise, some special configurations in Core won't
# pick up the change, and resulted in SEGFAULT or ABORT.
install_asyncio_iomgr()
# Initializes gRPC Core, must be called before other Core API
grpc_init()
# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
grpc_timer_manager_set_threading(False)
# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(False)
# Creates the only completion queue
_global_aio_state.cq = CallbackCompletionQueue()
cdef _initialize_poller(): cdef _initialize_poller():
# Initializes gRPC Core, must be called before other Core API # Initializes gRPC Core, must be called before other Core API
grpc_init() grpc_init()
@ -80,9 +59,7 @@ cdef _actual_aio_initialization():
_LOGGER.debug('Using %s as I/O engine', _global_aio_state.engine) _LOGGER.debug('Using %s as I/O engine', _global_aio_state.engine)
# Initializes the process-level state accordingly # Initializes the process-level state accordingly
if _global_aio_state.engine is AsyncIOEngine.CUSTOM_IO_MANAGER: if _global_aio_state.engine is AsyncIOEngine.POLLER:
_initialize_custom_io_manager()
elif _global_aio_state.engine is AsyncIOEngine.POLLER:
_initialize_poller() _initialize_poller()
else: else:
raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine) raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine)
@ -98,13 +75,7 @@ def _grpc_shutdown_wrapper(_):
cdef _actual_aio_shutdown(): cdef _actual_aio_shutdown():
if _global_aio_state.engine is AsyncIOEngine.CUSTOM_IO_MANAGER: if _global_aio_state.engine is AsyncIOEngine.POLLER:
future = schedule_coro_threadsafe(
_global_aio_state.cq.shutdown(),
(<CallbackCompletionQueue>_global_aio_state.cq)._loop
)
future.add_done_callback(_grpc_shutdown_wrapper)
elif _global_aio_state.engine is AsyncIOEngine.POLLER:
(<PollerCompletionQueue>_global_aio_state.cq).shutdown() (<PollerCompletionQueue>_global_aio_state.cq).shutdown()
grpc_shutdown() grpc_shutdown()
else: else:

@ -1,256 +0,0 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import platform
from cpython cimport Py_INCREF, Py_DECREF
from libc cimport string
import socket as native_socket
try:
import ipaddress # CPython 3.3 and above
except ImportError:
pass
cdef grpc_socket_vtable asyncio_socket_vtable
cdef grpc_custom_resolver_vtable asyncio_resolver_vtable
cdef grpc_custom_timer_vtable asyncio_timer_vtable
cdef grpc_custom_poller_vtable asyncio_pollset_vtable
cdef bint so_reuse_port
cdef grpc_error_handle asyncio_socket_init(
grpc_custom_socket* grpc_socket,
int domain) with gil:
socket = _AsyncioSocket.create(grpc_socket, None, None)
Py_INCREF(socket)
grpc_socket.impl = <void*>socket
return <grpc_error_handle>0
cdef void asyncio_socket_destroy(grpc_custom_socket* grpc_socket) with gil:
Py_DECREF(<_AsyncioSocket>grpc_socket.impl)
cdef void asyncio_socket_connect(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
size_t addr_len,
grpc_custom_connect_callback connect_cb) with gil:
host, port = sockaddr_to_tuple(addr, addr_len)
socket = <_AsyncioSocket>grpc_socket.impl
socket.connect(host, port, connect_cb)
cdef void asyncio_socket_close(
grpc_custom_socket* grpc_socket,
grpc_custom_close_callback close_cb) with gil:
socket = (<_AsyncioSocket>grpc_socket.impl)
socket.close()
close_cb(grpc_socket)
cdef void asyncio_socket_shutdown(grpc_custom_socket* grpc_socket) with gil:
socket = (<_AsyncioSocket>grpc_socket.impl)
socket.close()
cdef void asyncio_socket_write(
grpc_custom_socket* grpc_socket,
grpc_slice_buffer* slice_buffer,
grpc_custom_write_callback write_cb) with gil:
socket = (<_AsyncioSocket>grpc_socket.impl)
socket.write(slice_buffer, write_cb)
cdef void asyncio_socket_read(
grpc_custom_socket* grpc_socket,
char* buffer_,
size_t length,
grpc_custom_read_callback read_cb) with gil:
socket = (<_AsyncioSocket>grpc_socket.impl)
socket.read(buffer_, length, read_cb)
cdef grpc_error_handle asyncio_socket_getpeername(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
peer = (<_AsyncioSocket>grpc_socket.impl).peername()
cdef grpc_resolved_address c_addr
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
# TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()
cdef grpc_error_handle asyncio_socket_getsockname(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
"""Supplies sock_addr in add_socket_to_server."""
cdef grpc_resolved_address c_addr
socket = (<_AsyncioSocket>grpc_socket.impl)
if socket is None:
peer = ('0.0.0.0', 0)
else:
peer = socket.sockname()
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
# TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()
cdef grpc_error_handle asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil:
(<_AsyncioSocket>grpc_socket.impl).listen()
return grpc_error_none()
def _asyncio_apply_socket_options(object s, int flags):
# Turn SO_REUSEADDR on for TCP sockets; if we want to support UDS, we will
# need to update this function.
s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1)
# SO_REUSEPORT only available in POSIX systems.
if platform.system() != 'Windows':
if GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT & flags:
s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEPORT, 1)
s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
cdef grpc_error_handle asyncio_socket_bind(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
size_t len, int flags) with gil:
host, port = sockaddr_to_tuple(addr, len)
try:
ip = ipaddress.ip_address(host)
if isinstance(ip, ipaddress.IPv6Address):
family = native_socket.AF_INET6
else:
family = native_socket.AF_INET
socket = native_socket.socket(family=family)
_asyncio_apply_socket_options(socket, flags)
socket.bind((host, port))
except IOError as io_error:
socket.close()
return socket_error("bind", str(io_error))
else:
aio_socket = _AsyncioSocket.create_with_py_socket(grpc_socket, socket)
cpython.Py_INCREF(aio_socket) # Py_DECREF in asyncio_socket_destroy
grpc_socket.impl = <void*>aio_socket
return grpc_error_none()
cdef void asyncio_socket_accept(
grpc_custom_socket* grpc_socket,
grpc_custom_socket* grpc_socket_client,
grpc_custom_accept_callback accept_cb) with gil:
(<_AsyncioSocket>grpc_socket.impl).accept(grpc_socket_client, accept_cb)
cdef grpc_error_handle asyncio_resolve(
const char* host,
const char* port,
grpc_resolved_addresses** res) with gil:
result = native_socket.getaddrinfo(host, port)
res[0] = tuples_to_resolvaddr(result)
cdef void asyncio_resolve_async(
grpc_custom_resolver* grpc_resolver,
const char* host,
const char* port) with gil:
resolver = _AsyncioResolver.create(grpc_resolver)
resolver.resolve(host, port)
cdef void asyncio_timer_start(grpc_custom_timer* grpc_timer) with gil:
timer = _AsyncioTimer.create(grpc_timer, grpc_timer.timeout_ms / 1000.0)
grpc_timer.timer = <void*>timer
cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil:
# TODO(https://github.com/grpc/grpc/issues/22278) remove this if condition
if grpc_timer.timer == NULL:
return
else:
timer = <_AsyncioTimer>grpc_timer.timer
timer.stop()
cdef void asyncio_init_loop() with gil:
pass
cdef void asyncio_destroy_loop() with gil:
pass
cdef void asyncio_kick_loop() with gil:
pass
cdef grpc_error* asyncio_run_loop(size_t timeout_ms) with gil:
return grpc_error_none()
def _auth_plugin_callback_wrapper(object cb,
str service_url,
str method_name,
object callback):
get_working_loop().call_soon(cb, service_url, method_name, callback)
def install_asyncio_iomgr():
# Auth plugins invoke user provided logic in another thread by default. We
# need to override that behavior by registering the call to the event loop.
set_async_callback_func(_auth_plugin_callback_wrapper)
asyncio_resolver_vtable.resolve = asyncio_resolve
asyncio_resolver_vtable.resolve_async = asyncio_resolve_async
asyncio_socket_vtable.init = asyncio_socket_init
asyncio_socket_vtable.connect = asyncio_socket_connect
asyncio_socket_vtable.destroy = asyncio_socket_destroy
asyncio_socket_vtable.shutdown = asyncio_socket_shutdown
asyncio_socket_vtable.close = asyncio_socket_close
asyncio_socket_vtable.write = asyncio_socket_write
asyncio_socket_vtable.read = asyncio_socket_read
asyncio_socket_vtable.getpeername = asyncio_socket_getpeername
asyncio_socket_vtable.getsockname = asyncio_socket_getsockname
asyncio_socket_vtable.bind = asyncio_socket_bind
asyncio_socket_vtable.listen = asyncio_socket_listen
asyncio_socket_vtable.accept = asyncio_socket_accept
asyncio_timer_vtable.start = asyncio_timer_start
asyncio_timer_vtable.stop = asyncio_timer_stop
asyncio_pollset_vtable.init = asyncio_init_loop
asyncio_pollset_vtable.poll = asyncio_run_loop
asyncio_pollset_vtable.kick = asyncio_kick_loop
asyncio_pollset_vtable.shutdown = asyncio_destroy_loop
grpc_custom_iomgr_init(
&asyncio_socket_vtable,
&asyncio_resolver_vtable,
&asyncio_timer_vtable,
&asyncio_pollset_vtable
)

@ -1,24 +0,0 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _AsyncioResolver:
cdef:
object _loop
grpc_custom_resolver* _grpc_resolver
object _task_resolve
@staticmethod
cdef _AsyncioResolver create(grpc_custom_resolver* grpc_resolver)
cdef void resolve(self, const char* host, const char* port)

@ -1,56 +0,0 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _AsyncioResolver:
def __cinit__(self):
self._loop = get_working_loop()
self._grpc_resolver = NULL
self._task_resolve = None
@staticmethod
cdef _AsyncioResolver create(grpc_custom_resolver* grpc_resolver):
resolver = _AsyncioResolver()
resolver._grpc_resolver = grpc_resolver
return resolver
def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
return f"<{class_name} {id_}>"
async def _async_resolve(self, bytes host, bytes port):
self._task_resolve = None
try:
resolved = await self._loop.getaddrinfo(host, port)
except Exception as e:
grpc_custom_resolve_callback(
<grpc_custom_resolver*>self._grpc_resolver,
NULL,
grpc_socket_error("Resolve address [{}:{}] failed: {}: {}".format(
host, port, type(e), str(e)).encode())
)
else:
grpc_custom_resolve_callback(
<grpc_custom_resolver*>self._grpc_resolver,
tuples_to_resolvaddr(resolved),
<grpc_error_handle>0
)
cdef void resolve(self, const char* host, const char* port):
assert not self._task_resolve
self._task_resolve = self._loop.create_task(
self._async_resolve(host, port)
)

@ -1,63 +0,0 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _AsyncioSocket:
cdef:
# Common attributes
grpc_custom_socket * _grpc_socket
grpc_custom_read_callback _grpc_read_cb
grpc_custom_write_callback _grpc_write_cb
object _reader
object _writer
object _task_read
object _task_write
object _task_connect
object _task_listen
char * _read_buffer
# Caches the picked event loop, so we can avoid the 30ns overhead each
# time we need access to the event loop.
object _loop
# TODO(lidiz) Drop after 3.6 deprecation. Python 3.7 introduces methods
# like `is_closing()` to help graceful shutdown.
bint _closed
# Client-side attributes
grpc_custom_connect_callback _grpc_connect_cb
# Server-side attributes
grpc_custom_accept_callback _grpc_accept_cb
grpc_custom_socket * _grpc_client_socket
object _server
object _py_socket
object _peername
@staticmethod
cdef _AsyncioSocket create(
grpc_custom_socket * grpc_socket,
object reader,
object writer)
@staticmethod
cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket)
cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb)
cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb)
cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb)
cdef bint is_connected(self)
cdef void close(self)
cdef accept(self, grpc_custom_socket* grpc_socket_client, grpc_custom_accept_callback grpc_accept_cb)
cdef listen(self)
cdef tuple peername(self)
cdef tuple sockname(self)

@ -1,225 +0,0 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket as native_socket
from libc cimport string
cdef int _ASYNCIO_STREAM_DEFAULT_SOCKET_BACKLOG = 100
# TODO(https://github.com/grpc/grpc/issues/21348) Better flow control needed.
cdef class _AsyncioSocket:
def __cinit__(self):
self._grpc_socket = NULL
self._grpc_connect_cb = NULL
self._grpc_read_cb = NULL
self._grpc_write_cb = NULL
self._reader = None
self._writer = None
self._task_connect = None
self._task_read = None
self._task_write = None
self._task_listen = None
self._read_buffer = NULL
self._server = None
self._py_socket = None
self._peername = None
self._closed = False
self._loop = get_working_loop()
@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,
object reader,
object writer):
socket = _AsyncioSocket()
socket._grpc_socket = grpc_socket
socket._reader = reader
socket._writer = writer
if writer is not None:
socket._peername = writer.get_extra_info('peername')
return socket
@staticmethod
cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket):
socket = _AsyncioSocket()
socket._grpc_socket = grpc_socket
socket._py_socket = py_socket
return socket
def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
connected = self.is_connected()
return f"<{class_name} {id_} connected={connected}>"
async def _async_connect(self, object host, object port,):
self._task_connect = None
try:
self._reader, self._writer = await asyncio.open_connection(host, port)
except Exception as e:
self._grpc_connect_cb(
<grpc_custom_socket*>self._grpc_socket,
grpc_socket_error("Socket connect failed: {}: {}".format(type(e), str(e)).encode())
)
else:
# gRPC default posix implementation disables nagle
# algorithm.
sock = self._writer.transport.get_extra_info('socket')
sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
self._grpc_connect_cb(
<grpc_custom_socket*>self._grpc_socket,
<grpc_error_handle>0
)
cdef void connect(self,
object host,
object port,
grpc_custom_connect_callback grpc_connect_cb):
assert not self._reader
assert not self._task_connect
self._task_connect = self._loop.create_task(
self._async_connect(host, port)
)
self._grpc_connect_cb = grpc_connect_cb
async def _async_read(self, size_t length):
self._task_read = None
try:
inbound_buffer = await self._reader.read(n=length)
except ConnectionError as e:
self._grpc_read_cb(
<grpc_custom_socket*>self._grpc_socket,
-1,
grpc_socket_error("Read failed: {}".format(e).encode())
)
else:
string.memcpy(
<void*>self._read_buffer,
<char*>inbound_buffer,
len(inbound_buffer)
)
self._grpc_read_cb(
<grpc_custom_socket*>self._grpc_socket,
len(inbound_buffer),
<grpc_error_handle>0
)
cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb):
assert not self._task_read
self._grpc_read_cb = grpc_read_cb
self._read_buffer = buffer_
self._task_read = self._loop.create_task(self._async_read(length))
async def _async_write(self, bytearray outbound_buffer):
self._writer.write(outbound_buffer)
self._task_write = None
try:
await self._writer.drain()
self._grpc_write_cb(
<grpc_custom_socket*>self._grpc_socket,
<grpc_error_handle>0
)
except ConnectionError as connection_error:
self._grpc_write_cb(
<grpc_custom_socket*>self._grpc_socket,
grpc_socket_error("Socket write failed: {}".format(connection_error).encode()),
)
cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb):
"""Performs write to network socket in AsyncIO.
For each socket, Core guarantees there'll be only one ongoing write.
When the write is finished, we need to call grpc_write_cb to notify
Core that the work is done.
"""
assert not self._task_write
cdef char* start
cdef bytearray outbound_buffer = bytearray()
for i in range(g_slice_buffer.count):
start = grpc_slice_buffer_start(g_slice_buffer, i)
length = grpc_slice_buffer_length(g_slice_buffer, i)
outbound_buffer.extend(<bytes>start[:length])
self._grpc_write_cb = grpc_write_cb
self._task_write = self._loop.create_task(self._async_write(outbound_buffer))
cdef bint is_connected(self):
return self._reader and not self._reader._transport.is_closing()
cdef void close(self):
if self._closed:
return
else:
self._closed = True
if self.is_connected():
self._writer.close()
if self._task_listen and not self._task_listen.done():
self._task_listen.close()
if self._server:
self._server.close()
# NOTE(lidiz) If the asyncio.Server is created from a Python socket,
# the server.close() won't release the fd until the close() is called
# for the Python socket.
if self._py_socket:
self._py_socket.close()
def _new_connection_callback(self, object reader, object writer):
# If the socket is closed, stop.
if self._closed:
return
# Close the connection if server is not started yet.
if self._grpc_accept_cb == NULL:
writer.close()
return
client_socket = _AsyncioSocket.create(
self._grpc_client_socket,
reader,
writer,
)
self._grpc_client_socket.impl = <void*>client_socket
cpython.Py_INCREF(client_socket) # Py_DECREF in asyncio_socket_destroy
# Accept callback expects to be called with:
# * grpc_custom_socket: A grpc custom socket for server
# * grpc_custom_socket: A grpc custom socket for client (with new Socket instance)
# * grpc_error: An error object
self._grpc_accept_cb(self._grpc_socket, self._grpc_client_socket, grpc_error_none())
cdef listen(self):
self._py_socket.listen(_ASYNCIO_STREAM_DEFAULT_SOCKET_BACKLOG)
async def create_asyncio_server():
self._server = await asyncio.start_server(
self._new_connection_callback,
sock=self._py_socket,
)
self._task_listen = self._loop.create_task(create_asyncio_server())
cdef accept(self,
grpc_custom_socket* grpc_socket_client,
grpc_custom_accept_callback grpc_accept_cb):
self._grpc_client_socket = grpc_socket_client
self._grpc_accept_cb = grpc_accept_cb
cdef tuple peername(self):
return self._peername
cdef tuple sockname(self):
return self._py_socket.getsockname()

@ -1,25 +0,0 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _AsyncioTimer:
cdef:
grpc_custom_timer * _grpc_timer
object _timer_future
bint _active
object _loop
@staticmethod
cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout)
cdef stop(self)

@ -1,48 +0,0 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _AsyncioTimer:
def __cinit__(self):
self._grpc_timer = NULL
self._timer_future = None
self._active = False
self._loop = get_working_loop()
cpython.Py_INCREF(self)
@staticmethod
cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout):
timer = _AsyncioTimer()
timer._grpc_timer = grpc_timer
timer._timer_future = timer._loop.call_later(timeout, timer.on_time_up)
timer._active = True
return timer
def on_time_up(self):
self._active = False
grpc_custom_timer_callback(self._grpc_timer, <grpc_error_handle>0)
cpython.Py_DECREF(self)
def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
return f"<{class_name} {id_} deadline={self._deadline} active={self._active}>"
cdef stop(self):
if not self._active:
return
self._timer_future.cancel()
self._active = False
cpython.Py_DECREF(self)

@ -42,9 +42,6 @@ IF UNAME_SYSNAME != "Windows":
include "_cygrpc/fork_posix.pxd.pxi" include "_cygrpc/fork_posix.pxd.pxi"
# Following pxi files are part of the Aio module # Following pxi files are part of the Aio module
include "_cygrpc/aio/iomgr/socket.pxd.pxi"
include "_cygrpc/aio/iomgr/timer.pxd.pxi"
include "_cygrpc/aio/iomgr/resolver.pxd.pxi"
include "_cygrpc/aio/completion_queue.pxd.pxi" include "_cygrpc/aio/completion_queue.pxd.pxi"
include "_cygrpc/aio/rpc_status.pxd.pxi" include "_cygrpc/aio/rpc_status.pxd.pxi"
include "_cygrpc/aio/grpc_aio.pxd.pxi" include "_cygrpc/aio/grpc_aio.pxd.pxi"

@ -68,10 +68,6 @@ ELSE:
include "_cygrpc/fork_posix.pyx.pxi" include "_cygrpc/fork_posix.pyx.pxi"
# Following pxi files are part of the Aio module # Following pxi files are part of the Aio module
include "_cygrpc/aio/iomgr/iomgr.pyx.pxi"
include "_cygrpc/aio/iomgr/socket.pyx.pxi"
include "_cygrpc/aio/iomgr/timer.pyx.pxi"
include "_cygrpc/aio/iomgr/resolver.pyx.pxi"
include "_cygrpc/aio/common.pyx.pxi" include "_cygrpc/aio/common.pyx.pxi"
include "_cygrpc/aio/rpc_status.pyx.pxi" include "_cygrpc/aio/rpc_status.pyx.pxi"
include "_cygrpc/aio/completion_queue.pyx.pxi" include "_cygrpc/aio/completion_queue.pyx.pxi"

Loading…
Cancel
Save