[Aio] Unary unary client call barebones implementation

Implement the minimal stuff for making a unary call with the new
experimental gRPC Python implementation for Asyncio, called Aio.

What has been added:

- Minimal iomgr code for performing the required network and timer
calls.
- Minimal Cython code implementing the channel, call and the callback
context.
- Minimal Python code that mimics the synchronous implementation but
designed to be asynchronous.

Testing considerations:

Tests have to be executed using the `GRPC_ENABLE_FORK_SUPPORT=0`
environment variable for skipping the fork handles installed by
the core library. This is due to the usage of a syncrhonous server
used as a fixture executed in another process.

Co-authored-by: Manuel Miranda <manuel.miranda@skyscanner.net>
Co-authored-by: Mariano Anaya <mariano.anaya@skyscanner.net>
Co-authored-by: Zhanghui Mao <zhanghui.mao@skyscanner.net>
Co-authored-by: Lidi Zheng <lidiz@google.com>
pull/19960/head
Pau Freixes 5 years ago
parent 1bfdbc1f6c
commit a44e6d76b7
  1. 1
      AUTHORS
  2. 3
      setup.py
  3. 16
      src/python/grpcio/grpc/_cython/BUILD.bazel
  4. 27
      src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi
  5. 149
      src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi
  6. 20
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi
  7. 18
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi
  8. 30
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi
  9. 25
      src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi
  10. 37
      src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
  11. 185
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
  12. 23
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi
  13. 61
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi
  14. 34
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi
  15. 134
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  16. 25
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi
  17. 45
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi
  18. 10
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  19. 109
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi
  20. 46
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi
  21. 124
      src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi
  22. 62
      src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi
  23. 11
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  24. 19
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  25. 12
      src/python/grpcio/grpc/experimental/BUILD.bazel
  26. 123
      src/python/grpcio/grpc/experimental/aio/__init__.py
  27. 105
      src/python/grpcio/grpc/experimental/aio/_channel.py
  28. 29
      src/python/grpcio_tests/commands.py
  29. 1
      src/python/grpcio_tests/setup.py
  30. 7
      src/python/grpcio_tests/tests/_sanity/_sanity_test.py
  31. 21
      src/python/grpcio_tests/tests_aio/__init__.py
  32. 13
      src/python/grpcio_tests/tests_aio/_sanity/__init__.py
  33. 27
      src/python/grpcio_tests/tests_aio/_sanity/_sanity_test.py
  34. 5
      src/python/grpcio_tests/tests_aio/tests.json
  35. 13
      src/python/grpcio_tests/tests_aio/unit/__init__.py
  36. 58
      src/python/grpcio_tests/tests_aio/unit/channel_test.py
  37. 35
      src/python/grpcio_tests/tests_aio/unit/init_test.py
  38. 50
      src/python/grpcio_tests/tests_aio/unit/sync_server.py
  39. 101
      src/python/grpcio_tests/tests_aio/unit/test_base.py
  40. 38
      tools/run_tests/run_tests.py

@ -1,3 +1,4 @@
Dropbox, Inc.
Google Inc.
Skyscanner Ltd.
WeWork Companies Inc.

@ -265,6 +265,7 @@ if 'darwin' in sys.platform and PY3:
r'macosx-10.7-\1',
util.get_platform())
def cython_extensions_and_necessity():
cython_module_files = [os.path.join(PYTHON_STEM,
name.replace('.', '/') + '.pyx')
@ -295,6 +296,8 @@ def cython_extensions_and_necessity():
need_cython = BUILD_WITH_CYTHON
if not BUILD_WITH_CYTHON:
need_cython = need_cython or not commands.check_and_update_cythonization(extensions)
# TODO: the strategy for conditional compiling and exposing the aio Cython
# dependencies will be revisited by https://github.com/grpc/grpc/issues/19728
return commands.try_cythonize(extensions, linetracing=ENABLE_CYTHON_TRACING, mandatory=BUILD_WITH_CYTHON), need_cython
CYTHON_EXTENSION_MODULES, need_cython = cython_extensions_and_necessity()

@ -8,6 +8,20 @@ pyx_library(
"__init__.py",
"_cygrpc/_hooks.pxd.pxi",
"_cygrpc/_hooks.pyx.pxi",
"_cygrpc/aio/call.pxd.pxi",
"_cygrpc/aio/call.pyx.pxi",
"_cygrpc/aio/callbackcontext.pxd.pxi",
"_cygrpc/aio/channel.pxd.pxi",
"_cygrpc/aio/channel.pyx.pxi",
"_cygrpc/aio/grpc_aio.pxd.pxi",
"_cygrpc/aio/grpc_aio.pyx.pxi",
"_cygrpc/aio/iomgr/iomgr.pyx.pxi",
"_cygrpc/aio/iomgr/resolver.pxd.pxi",
"_cygrpc/aio/iomgr/resolver.pyx.pxi",
"_cygrpc/aio/iomgr/socket.pxd.pxi",
"_cygrpc/aio/iomgr/socket.pyx.pxi",
"_cygrpc/aio/iomgr/timer.pxd.pxi",
"_cygrpc/aio/iomgr/timer.pyx.pxi",
"_cygrpc/arguments.pxd.pxi",
"_cygrpc/arguments.pyx.pxi",
"_cygrpc/call.pxd.pxi",
@ -27,6 +41,8 @@ pyx_library(
"_cygrpc/grpc_gevent.pxd.pxi",
"_cygrpc/grpc_gevent.pyx.pxi",
"_cygrpc/grpc_string.pyx.pxi",
"_cygrpc/iomgr.pxd.pxi",
"_cygrpc/iomgr.pyx.pxi",
"_cygrpc/metadata.pxd.pxi",
"_cygrpc/metadata.pyx.pxi",
"_cygrpc/operation.pxd.pxi",

@ -0,0 +1,27 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _AioCall:
cdef:
AioChannel _channel
CallbackContext _watcher_call
grpc_completion_queue * _cq
grpc_experimental_completion_queue_functor _functor
object _waiter_call
@staticmethod
cdef void functor_run(grpc_experimental_completion_queue_functor* functor, int succeed)
@staticmethod
cdef void watcher_call_functor_run(grpc_experimental_completion_queue_functor* functor, int succeed)

@ -0,0 +1,149 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cimport cpython
_EMPTY_FLAGS = 0
_EMPTY_METADATA = ()
_OP_ARRAY_LENGTH = 6
cdef class _AioCall:
def __cinit__(self, AioChannel channel):
self._channel = channel
self._functor.functor_run = _AioCall.functor_run
self._cq = grpc_completion_queue_create_for_callback(
<grpc_experimental_completion_queue_functor *> &self._functor,
NULL
)
self._watcher_call.functor.functor_run = _AioCall.watcher_call_functor_run
self._watcher_call.waiter = <cpython.PyObject *> self
self._waiter_call = None
def __dealloc__(self):
grpc_completion_queue_shutdown(self._cq)
grpc_completion_queue_destroy(self._cq)
def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
return f"<{class_name} {id_}>"
@staticmethod
cdef void functor_run(grpc_experimental_completion_queue_functor* functor, int succeed):
pass
@staticmethod
cdef void watcher_call_functor_run(grpc_experimental_completion_queue_functor* functor, int succeed):
call = <_AioCall>(<CallbackContext *>functor).waiter
assert call._waiter_call
if succeed == 0:
call._waiter_call.set_exception(Exception("Some error ocurred"))
else:
call._waiter_call.set_result(None)
async def unary_unary(self, method, request):
cdef grpc_call * call
cdef grpc_slice method_slice
cdef grpc_op * ops
cdef Operation initial_metadata_operation
cdef Operation send_message_operation
cdef Operation send_close_from_client_operation
cdef Operation receive_initial_metadata_operation
cdef Operation receive_message_operation
cdef Operation receive_status_on_client_operation
cdef grpc_call_error call_status
method_slice = grpc_slice_from_copied_buffer(
<const char *> method,
<size_t> len(method)
)
call = grpc_channel_create_call(
self._channel.channel,
NULL,
0,
self._cq,
method_slice,
NULL,
_timespec_from_time(None),
NULL
)
grpc_slice_unref(method_slice)
ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * _OP_ARRAY_LENGTH)
initial_metadata_operation = SendInitialMetadataOperation(_EMPTY_METADATA, GRPC_INITIAL_METADATA_USED_MASK)
initial_metadata_operation.c()
ops[0] = <grpc_op> initial_metadata_operation.c_op
send_message_operation = SendMessageOperation(request, _EMPTY_FLAGS)
send_message_operation.c()
ops[1] = <grpc_op> send_message_operation.c_op
send_close_from_client_operation = SendCloseFromClientOperation(_EMPTY_FLAGS)
send_close_from_client_operation.c()
ops[2] = <grpc_op> send_close_from_client_operation.c_op
receive_initial_metadata_operation = ReceiveInitialMetadataOperation(_EMPTY_FLAGS)
receive_initial_metadata_operation.c()
ops[3] = <grpc_op> receive_initial_metadata_operation.c_op
receive_message_operation = ReceiveMessageOperation(_EMPTY_FLAGS)
receive_message_operation.c()
ops[4] = <grpc_op> receive_message_operation.c_op
receive_status_on_client_operation = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
receive_status_on_client_operation.c()
ops[5] = <grpc_op> receive_status_on_client_operation.c_op
self._waiter_call = asyncio.get_event_loop().create_future()
call_status = grpc_call_start_batch(
call,
ops,
_OP_ARRAY_LENGTH,
&self._watcher_call.functor,
NULL
)
try:
if call_status != GRPC_CALL_OK:
self._waiter_call = None
raise Exception("Error with grpc_call_start_batch {}".format(call_status))
await self._waiter_call
finally:
initial_metadata_operation.un_c()
send_message_operation.un_c()
send_close_from_client_operation.un_c()
receive_initial_metadata_operation.un_c()
receive_message_operation.un_c()
receive_status_on_client_operation.un_c()
grpc_call_unref(call)
gpr_free(ops)
return receive_message_operation.message()

@ -0,0 +1,20 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cimport cpython
cdef struct CallbackContext:
grpc_experimental_completion_queue_functor functor
cpython.PyObject *waiter

@ -0,0 +1,18 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class AioChannel:
cdef:
grpc_channel * channel
bytes _target

@ -0,0 +1,30 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class AioChannel:
def __cinit__(self, bytes target):
self.channel = grpc_insecure_channel_create(<char *>target, NULL, NULL)
self._target = target
def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
return f"<{class_name} {id_}>"
def close(self):
grpc_channel_destroy(self.channel)
async def unary_unary(self, method, request):
call = _AioCall(self)
return await call.unary_unary(method, request)

@ -0,0 +1,25 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++
cdef extern from "src/core/lib/iomgr/timer_manager.h":
void grpc_timer_manager_set_threading(bint enabled);
cdef extern from "src/core/lib/iomgr/iomgr_internal.h":
void grpc_set_default_iomgr_platform();
cdef extern from "src/core/lib/iomgr/executor.h" namespace "grpc_core":
cdef cppclass Executor:
@staticmethod
void SetThreadingAll(bint enable);

@ -0,0 +1,37 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef bint _grpc_aio_initialized = 0
def init_grpc_aio():
global _grpc_aio_initialized
if _grpc_aio_initialized:
return
install_asyncio_iomgr()
grpc_init()
# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
grpc_timer_manager_set_threading(0)
# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(0)
_grpc_aio_initialized = 1

@ -0,0 +1,185 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cpython cimport Py_INCREF, Py_DECREF
from libc cimport string
cdef grpc_socket_vtable asyncio_socket_vtable
cdef grpc_custom_resolver_vtable asyncio_resolver_vtable
cdef grpc_custom_timer_vtable asyncio_timer_vtable
cdef grpc_custom_poller_vtable asyncio_pollset_vtable
cdef grpc_error* asyncio_socket_init(
grpc_custom_socket* grpc_socket,
int domain) with gil:
socket = _AsyncioSocket.create(grpc_socket)
Py_INCREF(socket)
grpc_socket.impl = <void*>socket
return <grpc_error*>0
cdef void asyncio_socket_destroy(grpc_custom_socket* grpc_socket) with gil:
Py_DECREF(<_AsyncioSocket>grpc_socket.impl)
cdef void asyncio_socket_connect(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
size_t addr_len,
grpc_custom_connect_callback connect_cb) with gil:
host, port = sockaddr_to_tuple(addr, addr_len)
socket = <_AsyncioSocket>grpc_socket.impl
socket.connect(host, port, connect_cb)
cdef void asyncio_socket_close(
grpc_custom_socket* grpc_socket,
grpc_custom_close_callback close_cb) with gil:
socket = (<_AsyncioSocket>grpc_socket.impl)
socket.close()
close_cb(grpc_socket)
cdef void asyncio_socket_shutdown(grpc_custom_socket* grpc_socket) with gil:
socket = (<_AsyncioSocket>grpc_socket.impl)
socket.close()
cdef void asyncio_socket_write(
grpc_custom_socket* grpc_socket,
grpc_slice_buffer* slice_buffer,
grpc_custom_write_callback write_cb) with gil:
socket = (<_AsyncioSocket>grpc_socket.impl)
socket.write(slice_buffer, write_cb)
cdef void asyncio_socket_read(
grpc_custom_socket* grpc_socket,
char* buffer_,
size_t length,
grpc_custom_read_callback read_cb) with gil:
socket = (<_AsyncioSocket>grpc_socket.impl)
socket.read(buffer_, length, read_cb)
cdef grpc_error* asyncio_socket_getpeername(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
raise NotImplemented()
cdef grpc_error* asyncio_socket_getsockname(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
raise NotImplemented()
cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil:
raise NotImplemented()
cdef grpc_error* asyncio_socket_bind(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
size_t len, int flags) with gil:
raise NotImplemented()
cdef void asyncio_socket_accept(
grpc_custom_socket* grpc_socket,
grpc_custom_socket* grpc_socket_client,
grpc_custom_accept_callback accept_cb) with gil:
raise NotImplemented()
cdef grpc_error* asyncio_resolve(
char* host,
char* port,
grpc_resolved_addresses** res) with gil:
raise NotImplemented()
cdef void asyncio_resolve_async(
grpc_custom_resolver* grpc_resolver,
char* host,
char* port) with gil:
resolver = _AsyncioResolver.create(grpc_resolver)
resolver.resolve(host, port)
cdef void asyncio_timer_start(grpc_custom_timer* grpc_timer) with gil:
timer = _AsyncioTimer.create(grpc_timer, grpc_timer.timeout_ms / 1000.0)
Py_INCREF(timer)
grpc_timer.timer = <void*>timer
cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil:
timer = <_AsyncioTimer>grpc_timer.timer
timer.stop()
Py_DECREF(timer)
cdef void asyncio_init_loop() with gil:
pass
cdef void asyncio_destroy_loop() with gil:
pass
cdef void asyncio_kick_loop() with gil:
pass
cdef void asyncio_run_loop(size_t timeout_ms) with gil:
pass
def install_asyncio_iomgr():
asyncio_resolver_vtable.resolve = asyncio_resolve
asyncio_resolver_vtable.resolve_async = asyncio_resolve_async
asyncio_socket_vtable.init = asyncio_socket_init
asyncio_socket_vtable.connect = asyncio_socket_connect
asyncio_socket_vtable.destroy = asyncio_socket_destroy
asyncio_socket_vtable.shutdown = asyncio_socket_shutdown
asyncio_socket_vtable.close = asyncio_socket_close
asyncio_socket_vtable.write = asyncio_socket_write
asyncio_socket_vtable.read = asyncio_socket_read
asyncio_socket_vtable.getpeername = asyncio_socket_getpeername
asyncio_socket_vtable.getsockname = asyncio_socket_getsockname
asyncio_socket_vtable.bind = asyncio_socket_bind
asyncio_socket_vtable.listen = asyncio_socket_listen
asyncio_socket_vtable.accept = asyncio_socket_accept
asyncio_timer_vtable.start = asyncio_timer_start
asyncio_timer_vtable.stop = asyncio_timer_stop
asyncio_pollset_vtable.init = asyncio_init_loop
asyncio_pollset_vtable.poll = asyncio_run_loop
asyncio_pollset_vtable.kick = asyncio_kick_loop
asyncio_pollset_vtable.shutdown = asyncio_destroy_loop
grpc_custom_iomgr_init(
&asyncio_socket_vtable,
&asyncio_resolver_vtable,
&asyncio_timer_vtable,
&asyncio_pollset_vtable
)

@ -0,0 +1,23 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _AsyncioResolver:
cdef:
grpc_custom_resolver* _grpc_resolver
object _task_resolve
@staticmethod
cdef _AsyncioResolver create(grpc_custom_resolver* grpc_resolver)
cdef void resolve(self, char* host, char* port)

@ -0,0 +1,61 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _AsyncioResolver:
def __cinit__(self):
self._grpc_resolver = NULL
self._task_resolve = None
@staticmethod
cdef _AsyncioResolver create(grpc_custom_resolver* grpc_resolver):
resolver = _AsyncioResolver()
resolver._grpc_resolver = grpc_resolver
return resolver
def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
return f"<{class_name} {id_}>"
def _resolve_cb(self, future):
error = False
try:
res = future.result()
except Exception as e:
error = True
finally:
self._task_resolve = None
if not error:
grpc_custom_resolve_callback(
<grpc_custom_resolver*>self._grpc_resolver,
tuples_to_resolvaddr(res),
<grpc_error*>0
)
else:
grpc_custom_resolve_callback(
<grpc_custom_resolver*>self._grpc_resolver,
NULL,
grpc_socket_error("getaddrinfo {}".format(str(e)).encode())
)
cdef void resolve(self, char* host, char* port):
assert not self._task_resolve
loop = asyncio.get_event_loop()
self._task_resolve = asyncio.ensure_future(
loop.getaddrinfo(host, port)
)
self._task_resolve.add_done_callback(self._resolve_cb)

@ -0,0 +1,34 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _AsyncioSocket:
cdef:
grpc_custom_socket * _grpc_socket
grpc_custom_connect_callback _grpc_connect_cb
grpc_custom_read_callback _grpc_read_cb
object _reader
object _writer
object _task_read
object _task_connect
char * _read_buffer
@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket)
cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb)
cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb)
cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb)
cdef bint is_connected(self)
cdef void close(self)

@ -0,0 +1,134 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
from libc cimport string
cdef class _AsyncioSocket:
def __cinit__(self):
self._grpc_socket = NULL
self._grpc_connect_cb = NULL
self._grpc_read_cb = NULL
self._reader = None
self._writer = None
self._task_connect = None
self._task_read = None
self._read_buffer = NULL
@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket):
socket = _AsyncioSocket()
socket._grpc_socket = grpc_socket
return socket
def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
connected = self.is_connected()
return f"<{class_name} {id_} connected={connected}>"
def _connect_cb(self, future):
error = False
try:
self._reader, self._writer = future.result()
except Exception as e:
error = True
finally:
self._task_connect = None
if not error:
# gRPC default posix implementation disables nagle
# algorithm.
sock = self._writer.transport.get_extra_info('socket')
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
self._grpc_connect_cb(
<grpc_custom_socket*>self._grpc_socket,
<grpc_error*>0
)
else:
self._grpc_connect_cb(
<grpc_custom_socket*>self._grpc_socket,
grpc_socket_error("connect {}".format(str(e)).encode())
)
def _read_cb(self, future):
error = False
try:
buffer_ = future.result()
except Exception as e:
error = True
error_msg = str(e)
finally:
self._task_read = None
if not error:
string.memcpy(
<void*>self._read_buffer,
<char*>buffer_,
len(buffer_)
)
self._grpc_read_cb(
<grpc_custom_socket*>self._grpc_socket,
len(buffer_),
<grpc_error*>0
)
else:
self._grpc_read_cb(
<grpc_custom_socket*>self._grpc_socket,
-1,
grpc_socket_error("read {}".format(error_msg).encode())
)
cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb):
assert not self._task_connect
self._task_connect = asyncio.ensure_future(
asyncio.open_connection(host, port)
)
self._grpc_connect_cb = grpc_connect_cb
self._task_connect.add_done_callback(self._connect_cb)
cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb):
assert not self._task_read
self._task_read = asyncio.ensure_future(
self._reader.read(n=length)
)
self._grpc_read_cb = grpc_read_cb
self._task_read.add_done_callback(self._read_cb)
self._read_buffer = buffer_
cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb):
cdef char* start
buffer_ = bytearray()
for i in range(g_slice_buffer.count):
start = grpc_slice_buffer_start(g_slice_buffer, i)
length = grpc_slice_buffer_length(g_slice_buffer, i)
buffer_.extend(<bytes>start[:length])
self._writer.write(buffer_)
grpc_write_cb(
<grpc_custom_socket*>self._grpc_socket,
<grpc_error*>0
)
cdef bint is_connected(self):
return self._reader and not self._reader._transport.is_closing()
cdef void close(self):
if self.is_connected():
self._writer.close()

@ -0,0 +1,25 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _AsyncioTimer:
cdef:
grpc_custom_timer * _grpc_timer
object _deadline
object _timer_handler
int _active
@staticmethod
cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, deadline)
cdef stop(self)

@ -0,0 +1,45 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _AsyncioTimer:
def __cinit__(self):
self._grpc_timer = NULL
self._timer_handler = None
self._active = 0
@staticmethod
cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, deadline):
timer = _AsyncioTimer()
timer._grpc_timer = grpc_timer
timer._deadline = deadline
timer._timer_handler = asyncio.get_event_loop().call_later(deadline, timer._on_deadline)
timer._active = 1
return timer
def _on_deadline(self):
self._active = 0
grpc_custom_timer_callback(self._grpc_timer, <grpc_error*>0)
def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
return f"<{class_name} {id_} deadline={self._deadline} active={self._active}>"
cdef stop(self):
if self._active == 0:
return
self._timer_handler.cancel()
self._active = 0

@ -41,6 +41,11 @@ cdef extern from "grpc/byte_buffer_reader.h":
pass
cdef extern from "grpc/impl/codegen/grpc_types.h":
ctypedef struct grpc_experimental_completion_queue_functor:
void (*functor_run)(grpc_experimental_completion_queue_functor*, int);
cdef extern from "grpc/grpc.h":
ctypedef struct grpc_slice:
@ -325,6 +330,7 @@ cdef extern from "grpc/grpc.h":
ctypedef struct grpc_op:
grpc_op_type type "op"
uint32_t flags
void * reserved
grpc_op_data data
void grpc_init() nogil
@ -350,6 +356,10 @@ cdef extern from "grpc/grpc.h":
void grpc_completion_queue_shutdown(grpc_completion_queue *cq) nogil
void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil
grpc_completion_queue *grpc_completion_queue_create_for_callback(
grpc_experimental_completion_queue_functor* shutdown_callback,
void *reserved) nogil
grpc_call_error grpc_call_start_batch(
grpc_call *call, const grpc_op *ops, size_t nops, void *tag,
void *reserved) nogil

@ -13,115 +13,6 @@
# limitations under the License.
# distutils: language=c++
cdef extern from "grpc/impl/codegen/slice.h":
struct grpc_slice_buffer:
int count
cdef extern from "src/core/lib/iomgr/error.h":
struct grpc_error:
pass
cdef extern from "src/core/lib/iomgr/gevent_util.h":
grpc_error* grpc_socket_error(char* error)
char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i)
int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i)
cdef extern from "src/core/lib/iomgr/sockaddr.h":
ctypedef struct grpc_sockaddr:
pass
cdef extern from "src/core/lib/iomgr/resolve_address.h":
ctypedef struct grpc_resolved_addresses:
size_t naddrs
grpc_resolved_address* addrs
ctypedef struct grpc_resolved_address:
char[128] addr
size_t len
cdef extern from "src/core/lib/iomgr/resolve_address_custom.h":
struct grpc_custom_resolver:
pass
struct grpc_custom_resolver_vtable:
grpc_error* (*resolve)(char* host, char* port, grpc_resolved_addresses** res);
void (*resolve_async)(grpc_custom_resolver* resolver, char* host, char* port);
void grpc_custom_resolve_callback(grpc_custom_resolver* resolver,
grpc_resolved_addresses* result,
grpc_error* error);
cdef extern from "src/core/lib/iomgr/tcp_custom.h":
struct grpc_custom_socket:
void* impl
# We don't care about the rest of the fields
ctypedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket,
grpc_error* error)
ctypedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket,
grpc_error* error)
ctypedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket,
size_t nread, grpc_error* error)
ctypedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket,
grpc_custom_socket* client,
grpc_error* error)
ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket)
struct grpc_socket_vtable:
grpc_error* (*init)(grpc_custom_socket* socket, int domain);
void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t len, grpc_custom_connect_callback cb);
void (*destroy)(grpc_custom_socket* socket);
void (*shutdown)(grpc_custom_socket* socket);
void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb);
void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices,
grpc_custom_write_callback cb);
void (*read)(grpc_custom_socket* socket, char* buffer, size_t length,
grpc_custom_read_callback cb);
grpc_error* (*getpeername)(grpc_custom_socket* socket,
const grpc_sockaddr* addr, int* len);
grpc_error* (*getsockname)(grpc_custom_socket* socket,
const grpc_sockaddr* addr, int* len);
grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t len, int flags);
grpc_error* (*listen)(grpc_custom_socket* socket);
void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client,
grpc_custom_accept_callback cb);
cdef extern from "src/core/lib/iomgr/timer_custom.h":
struct grpc_custom_timer:
void* timer
int timeout_ms
# We don't care about the rest of the fields
struct grpc_custom_timer_vtable:
void (*start)(grpc_custom_timer* t);
void (*stop)(grpc_custom_timer* t);
void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error);
cdef extern from "src/core/lib/iomgr/pollset_custom.h":
struct grpc_custom_poller_vtable:
void (*init)()
void (*poll)(size_t timeout_ms)
void (*kick)()
void (*shutdown)()
cdef extern from "src/core/lib/iomgr/iomgr_custom.h":
void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
grpc_custom_resolver_vtable* resolver,
grpc_custom_timer_vtable* timer,
grpc_custom_poller_vtable* poller);
cdef extern from "src/core/lib/iomgr/sockaddr_utils.h":
int grpc_sockaddr_get_port(const grpc_resolved_address *addr);
int grpc_sockaddr_to_string(char **out, const grpc_resolved_address *addr,
int normalize);
void grpc_string_to_sockaddr(grpc_resolved_address *out, char* addr, int port);
int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr,
int port)
const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* resolved_addr)
cdef class TimerWrapper:
cdef grpc_custom_timer *c_timer

@ -15,7 +15,6 @@
cimport cpython
from libc cimport string
from libc.stdlib cimport malloc, free
import errno
gevent_g = None
gevent_socket = None
@ -24,51 +23,6 @@ gevent_event = None
g_event = None
g_pool = None
cdef grpc_error* grpc_error_none():
return <grpc_error*>0
cdef grpc_error* socket_error(str syscall, str err):
error_str = "{} failed: {}".format(syscall, err)
error_bytes = str_to_bytes(error_str)
return grpc_socket_error(error_bytes)
cdef resolved_addr_to_tuple(grpc_resolved_address* address):
cdef char* res_str
port = grpc_sockaddr_get_port(address)
str_len = grpc_sockaddr_to_string(&res_str, address, 0)
byte_str = _decode(<bytes>res_str[:str_len])
if byte_str.endswith(':' + str(port)):
byte_str = byte_str[:(0 - len(str(port)) - 1)]
byte_str = byte_str.lstrip('[')
byte_str = byte_str.rstrip(']')
byte_str = '{}'.format(byte_str)
return byte_str, port
cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length):
cdef grpc_resolved_address c_addr
string.memcpy(<void*>c_addr.addr, <void*> address, length)
c_addr.len = length
return resolved_addr_to_tuple(&c_addr)
cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length):
cdef grpc_resolved_address c_addr
string.memcpy(<void*>c_addr.addr, <void*> address, length)
c_addr.len = length
return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4'
cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups):
cdef grpc_resolved_addresses* addresses
tups_set = set((tup[4][0], tup[4][1]) for tup in tups)
addresses = <grpc_resolved_addresses*> malloc(sizeof(grpc_resolved_addresses))
addresses.naddrs = len(tups_set)
addresses.addrs = <grpc_resolved_address*> malloc(sizeof(grpc_resolved_address) * len(tups_set))
i = 0
for tup in set(tups_set):
hostname = str_to_bytes(tup[0])
grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1])
i += 1
return addresses
def _spawn_greenlet(*args):
greenlet = g_pool.spawn(*args)

@ -0,0 +1,124 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++
cdef extern from "grpc/impl/codegen/slice.h":
struct grpc_slice_buffer:
int count
cdef extern from "src/core/lib/iomgr/error.h":
struct grpc_error:
pass
# TODO(https://github.com/grpc/grpc/issues/20135) Change the filename
# for something more meaningful.
cdef extern from "src/core/lib/iomgr/gevent_util.h":
grpc_error* grpc_socket_error(char* error)
char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i)
int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i)
cdef extern from "src/core/lib/iomgr/sockaddr.h":
ctypedef struct grpc_sockaddr:
pass
cdef extern from "src/core/lib/iomgr/resolve_address.h":
ctypedef struct grpc_resolved_addresses:
size_t naddrs
grpc_resolved_address* addrs
ctypedef struct grpc_resolved_address:
char[128] addr
size_t len
cdef extern from "src/core/lib/iomgr/resolve_address_custom.h":
struct grpc_custom_resolver:
pass
struct grpc_custom_resolver_vtable:
grpc_error* (*resolve)(char* host, char* port, grpc_resolved_addresses** res);
void (*resolve_async)(grpc_custom_resolver* resolver, char* host, char* port);
void grpc_custom_resolve_callback(grpc_custom_resolver* resolver,
grpc_resolved_addresses* result,
grpc_error* error);
cdef extern from "src/core/lib/iomgr/tcp_custom.h":
struct grpc_custom_socket:
void* impl
# We don't care about the rest of the fields
ctypedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket,
grpc_error* error)
ctypedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket,
grpc_error* error)
ctypedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket,
size_t nread, grpc_error* error)
ctypedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket,
grpc_custom_socket* client,
grpc_error* error)
ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket)
struct grpc_socket_vtable:
grpc_error* (*init)(grpc_custom_socket* socket, int domain);
void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t len, grpc_custom_connect_callback cb);
void (*destroy)(grpc_custom_socket* socket);
void (*shutdown)(grpc_custom_socket* socket);
void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb);
void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices,
grpc_custom_write_callback cb);
void (*read)(grpc_custom_socket* socket, char* buffer, size_t length,
grpc_custom_read_callback cb);
grpc_error* (*getpeername)(grpc_custom_socket* socket,
const grpc_sockaddr* addr, int* len);
grpc_error* (*getsockname)(grpc_custom_socket* socket,
const grpc_sockaddr* addr, int* len);
grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t len, int flags);
grpc_error* (*listen)(grpc_custom_socket* socket);
void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client,
grpc_custom_accept_callback cb);
cdef extern from "src/core/lib/iomgr/timer_custom.h":
struct grpc_custom_timer:
void* timer
int timeout_ms
# We don't care about the rest of the fields
struct grpc_custom_timer_vtable:
void (*start)(grpc_custom_timer* t);
void (*stop)(grpc_custom_timer* t);
void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error);
cdef extern from "src/core/lib/iomgr/pollset_custom.h":
struct grpc_custom_poller_vtable:
void (*init)()
void (*poll)(size_t timeout_ms)
void (*kick)()
void (*shutdown)()
cdef extern from "src/core/lib/iomgr/iomgr_custom.h":
void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
grpc_custom_resolver_vtable* resolver,
grpc_custom_timer_vtable* timer,
grpc_custom_poller_vtable* poller);
cdef extern from "src/core/lib/iomgr/sockaddr_utils.h":
int grpc_sockaddr_get_port(const grpc_resolved_address *addr);
int grpc_sockaddr_to_string(char **out, const grpc_resolved_address *addr,
int normalize);
void grpc_string_to_sockaddr(grpc_resolved_address *out, char* addr, int port);
int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr,
int port)
const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* resolved_addr)

@ -0,0 +1,62 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++
from libc cimport string
from libc.stdlib cimport malloc
cdef grpc_error* grpc_error_none():
return <grpc_error*>0
cdef grpc_error* socket_error(str syscall, str err):
error_str = "{} failed: {}".format(syscall, err)
error_bytes = str_to_bytes(error_str)
return grpc_socket_error(error_bytes)
cdef resolved_addr_to_tuple(grpc_resolved_address* address):
cdef char* res_str
port = grpc_sockaddr_get_port(address)
str_len = grpc_sockaddr_to_string(&res_str, address, 0)
byte_str = _decode(<bytes>res_str[:str_len])
if byte_str.endswith(':' + str(port)):
byte_str = byte_str[:(0 - len(str(port)) - 1)]
byte_str = byte_str.lstrip('[')
byte_str = byte_str.rstrip(']')
byte_str = '{}'.format(byte_str)
return byte_str, port
cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length):
cdef grpc_resolved_address c_addr
string.memcpy(<void*>c_addr.addr, <void*> address, length)
c_addr.len = length
return resolved_addr_to_tuple(&c_addr)
cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length):
cdef grpc_resolved_address c_addr
string.memcpy(<void*>c_addr.addr, <void*> address, length)
c_addr.len = length
return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4'
cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups):
cdef grpc_resolved_addresses* addresses
tups_set = set((tup[4][0], tup[4][1]) for tup in tups)
addresses = <grpc_resolved_addresses*> malloc(sizeof(grpc_resolved_addresses))
addresses.naddrs = len(tups_set)
addresses.addrs = <grpc_resolved_address*> malloc(sizeof(grpc_resolved_address) * len(tups_set))
i = 0
for tup in set(tups_set):
hostname = str_to_bytes(tup[0])
grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1])
i += 1
return addresses

@ -32,7 +32,18 @@ include "_cygrpc/time.pxd.pxi"
include "_cygrpc/vtable.pxd.pxi"
include "_cygrpc/_hooks.pxd.pxi"
include "_cygrpc/iomgr.pxd.pxi"
include "_cygrpc/grpc_gevent.pxd.pxi"
IF UNAME_SYSNAME != "Windows":
include "_cygrpc/fork_posix.pxd.pxi"
# Following pxi files are part of the Aio module
include "_cygrpc/aio/iomgr/socket.pxd.pxi"
include "_cygrpc/aio/iomgr/timer.pxd.pxi"
include "_cygrpc/aio/iomgr/resolver.pxd.pxi"
include "_cygrpc/aio/grpc_aio.pxd.pxi"
include "_cygrpc/aio/callbackcontext.pxd.pxi"
include "_cygrpc/aio/call.pxd.pxi"
include "_cygrpc/aio/channel.pxd.pxi"

@ -17,6 +17,13 @@ cimport cpython
import os.path
import sys
try:
import asyncio
except ImportError:
# TODO(https://github.com/grpc/grpc/issues/19728) Improve how Aio Cython is
# distributed without breaking none compatible Python versions. For now, if
# Asyncio package is not available we just skip it.
pass
# TODO(atash): figure out why the coverage tool gets confused about the Cython
# coverage plugin when the following files don't have a '.pxi' suffix.
@ -39,6 +46,8 @@ include "_cygrpc/time.pyx.pxi"
include "_cygrpc/vtable.pyx.pxi"
include "_cygrpc/_hooks.pyx.pxi"
include "_cygrpc/iomgr.pyx.pxi"
include "_cygrpc/grpc_gevent.pyx.pxi"
IF UNAME_SYSNAME == "Windows":
@ -46,6 +55,16 @@ IF UNAME_SYSNAME == "Windows":
ELSE:
include "_cygrpc/fork_posix.pyx.pxi"
# Following pxi files are part of the Aio module
include "_cygrpc/aio/iomgr/iomgr.pyx.pxi"
include "_cygrpc/aio/iomgr/socket.pyx.pxi"
include "_cygrpc/aio/iomgr/timer.pyx.pxi"
include "_cygrpc/aio/iomgr/resolver.pyx.pxi"
include "_cygrpc/aio/grpc_aio.pyx.pxi"
include "_cygrpc/aio/call.pyx.pxi"
include "_cygrpc/aio/channel.pyx.pxi"
#
# initialize gRPC
#

@ -1,9 +1,21 @@
package(default_visibility = ["//visibility:public"])
py_library(
name = "aio",
srcs = [
"aio/__init__.py",
"aio/_channel.py",
],
deps = [
"//src/python/grpcio/grpc/_cython:cygrpc",
],
)
py_library(
name = "experimental",
srcs = ["__init__.py",],
deps = [
":aio",
":gevent",
":session_cache",
],

@ -0,0 +1,123 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""gRPC's Asynchronous Python API."""
import abc
import six
from grpc._cython.cygrpc import init_grpc_aio
class Channel(six.with_metaclass(abc.ABCMeta)):
"""Asynchronous Channel implementation."""
@abc.abstractmethod
def unary_unary(self,
method,
request_serializer=None,
response_deserializer=None):
"""Creates a UnaryUnaryMultiCallable for a unary-unary method.
Args:
method: The name of the RPC method.
request_serializer: Optional behaviour for serializing the request
message. Request goes unserialized in case None is passed.
response_deserializer: Optional behaviour for deserializing the
response message. Response goes undeserialized in case None
is passed.
Returns:
A UnaryUnaryMultiCallable value for the named unary-unary method.
"""
raise NotImplementedError()
@abc.abstractmethod
async def close(self):
"""Closes this Channel and releases all resources held by it.
Closing the Channel will proactively terminate all RPCs active with the
Channel and it is not valid to invoke new RPCs with the Channel.
This method is idempotent.
"""
raise NotImplementedError()
@abc.abstractmethod
async def __aenter__(self):
"""Starts an asynchronous context manager.
Returns:
Channel the channel that was instantiated.
"""
raise NotImplementedError()
@abc.abstractmethod
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Finishes the asynchronous context manager by closing gracefully the channel."""
raise NotImplementedError()
class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
"""Affords invoking a unary-unary RPC from client-side in an asynchronous way."""
@abc.abstractmethod
async def __call__(self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None):
"""Asynchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: An optional duration of time in seconds to allow
for the RPC.
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC. Only valid for
secure Channel.
wait_for_ready: This is an EXPERIMENTAL argument. An optional
flag to enable wait for ready mechanism
compression: An element of grpc.compression, e.g.
grpc.compression.Gzip. This is an EXPERIMENTAL option.
Returns:
The response value for the RPC.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
raised RpcError will also be a Call for the RPC affording the RPC's
metadata, status code, and details.
"""
raise NotImplementedError()
def insecure_channel(target, options=None, compression=None):
"""Creates an insecure asynchronous Channel to a server.
Args:
target: The server address
options: An optional list of key-value pairs (channel args
in gRPC Core runtime) to configure the channel.
compression: An optional value indicating the compression method to be
used over the lifetime of the channel. This is an EXPERIMENTAL option.
Returns:
A Channel.
"""
from grpc.experimental.aio import _channel # pylint: disable=cyclic-import
return _channel.Channel(target, ()
if options is None else options, None, compression)

@ -0,0 +1,105 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Invocation-side implementation of gRPC Asyncio Python."""
from grpc import _common
from grpc._cython import cygrpc
from grpc.experimental import aio
class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable):
def __init__(self, channel, method, request_serializer,
response_deserializer):
self._channel = channel
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
async def __call__(self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None):
if timeout:
raise NotImplementedError("TODO: timeout not implemented yet")
if metadata:
raise NotImplementedError("TODO: metadata not implemented yet")
if credentials:
raise NotImplementedError("TODO: credentials not implemented yet")
if wait_for_ready:
raise NotImplementedError(
"TODO: wait_for_ready not implemented yet")
if compression:
raise NotImplementedError("TODO: compression not implemented yet")
response = await self._channel.unary_unary(
self._method, _common.serialize(request, self._request_serializer))
return _common.deserialize(response, self._response_deserializer)
class Channel(aio.Channel):
"""A cygrpc.AioChannel-backed implementation of grpc.experimental.aio.Channel."""
def __init__(self, target, options, credentials, compression):
"""Constructor.
Args:
target: The target to which to connect.
options: Configuration options for the channel.
credentials: A cygrpc.ChannelCredentials or None.
compression: An optional value indicating the compression method to be
used over the lifetime of the channel.
"""
if options:
raise NotImplementedError("TODO: options not implemented yet")
if credentials:
raise NotImplementedError("TODO: credentials not implemented yet")
if compression:
raise NotImplementedError("TODO: compression not implemented yet")
self._channel = cygrpc.AioChannel(_common.encode(target))
def unary_unary(self,
method,
request_serializer=None,
response_deserializer=None):
return UnaryUnaryMultiCallable(self._channel, _common.encode(method),
request_serializer,
response_deserializer)
async def _close(self):
# TODO: Send cancellation status
self._channel.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._close()
async def close(self):
await self._close()

@ -107,6 +107,35 @@ class TestLite(setuptools.Command):
self.distribution.fetch_build_eggs(self.distribution.tests_require)
class TestAio(setuptools.Command):
"""Command to run aio tests without fetching or building anything."""
description = 'run aio tests without fetching or building anything.'
user_options = []
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
self._add_eggs_to_path()
import tests
loader = tests.Loader()
loader.loadTestsFromNames(['tests_aio'])
runner = tests.Runner()
result = runner.run(loader.suite)
if not result.wasSuccessful():
sys.exit('Test failure')
def _add_eggs_to_path(self):
"""Fetch install and test requirements"""
self.distribution.fetch_build_eggs(self.distribution.install_requires)
self.distribution.fetch_build_eggs(self.distribution.tests_require)
class TestGevent(setuptools.Command):
"""Command to run tests w/gevent."""

@ -58,6 +58,7 @@ COMMAND_CLASS = {
'run_interop': commands.RunInterop,
'test_lite': commands.TestLite,
'test_gevent': commands.TestGevent,
'test_aio': commands.TestAio,
}
PACKAGE_DATA = {

@ -25,17 +25,20 @@ class SanityTest(unittest.TestCase):
maxDiff = 32768
TEST_PKG_MODULE_NAME = 'tests'
TEST_PKG_PATH = 'tests'
def testTestsJsonUpToDate(self):
"""Autodiscovers all test suites and checks that tests.json is up to date"""
loader = tests.Loader()
loader.loadTestsFromNames(['tests'])
loader.loadTestsFromNames([self.TEST_PKG_MODULE_NAME])
test_suite_names = sorted({
test_case_class.id().rsplit('.', 1)[0]
for test_case_class in tests._loader.iterate_suite_cases(
loader.suite)
})
tests_json_string = pkgutil.get_data('tests', 'tests.json')
tests_json_string = pkgutil.get_data(self.TEST_PKG_PATH, 'tests.json')
tests_json = json.loads(tests_json_string.decode()
if six.PY3 else tests_json_string)

@ -0,0 +1,21 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from tests import _loader
from tests import _runner
Loader = _loader.Loader
Runner = _runner.Runner

@ -0,0 +1,13 @@
# Copyright 2019 The gRPC Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,27 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from tests._sanity import _sanity_test
class AioSanityTest(_sanity_test.SanityTest):
TEST_PKG_MODULE_NAME = 'tests_aio'
TEST_PKG_PATH = 'tests_aio'
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,5 @@
[
"_sanity._sanity_test.AioSanityTest",
"unit.channel_test.TestChannel",
"unit.init_test.TestInsecureChannel"
]

@ -0,0 +1,13 @@
# Copyright 2019 The gRPC Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,58 @@
# Copyright 2019 The gRPC Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import unittest
from grpc.experimental import aio
from tests_aio.unit import test_base
from src.proto.grpc.testing import messages_pb2
class TestChannel(test_base.AioTestBase):
def test_async_context(self):
async def coro():
async with aio.insecure_channel(self.server_target) as channel:
hi = channel.unary_unary(
'/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.
SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString
)
await hi(messages_pb2.SimpleRequest())
self.loop.run_until_complete(coro())
def test_unary_unary(self):
async def coro():
channel = aio.insecure_channel(self.server_target)
hi = channel.unary_unary(
'/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString)
response = await hi(messages_pb2.SimpleRequest())
self.assertEqual(type(response), messages_pb2.SimpleResponse)
await channel.close()
self.loop.run_until_complete(coro())
if __name__ == '__main__':
logging.basicConfig()
unittest.main(verbosity=2)

@ -0,0 +1,35 @@
# Copyright 2019 The gRPC Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import unittest
from grpc.experimental import aio
from tests_aio.unit import test_base
class TestInsecureChannel(test_base.AioTestBase):
def test_insecure_channel(self):
async def coro():
channel = aio.insecure_channel(self.server_target)
self.assertIsInstance(channel, aio.Channel)
self.loop.run_until_complete(coro())
if __name__ == '__main__':
logging.basicConfig()
unittest.main(verbosity=2)

@ -0,0 +1,50 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
from concurrent import futures
from time import sleep
import grpc
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
# TODO (https://github.com/grpc/grpc/issues/19762)
# Change for an asynchronous server version once it's implemented.
class TestServiceServicer(test_pb2_grpc.TestServiceServicer):
def UnaryCall(self, request, context):
return messages_pb2.SimpleResponse()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Synchronous gRPC server.')
parser.add_argument(
'--host_and_port',
required=True,
type=str,
nargs=1,
help='the host and port to listen.')
args = parser.parse_args()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(('grpc.so_reuseport', 1),))
test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(),
server)
server.add_insecure_port(args.host_and_port[0])
server.start()
server.wait_for_termination()

@ -0,0 +1,101 @@
# Copyright 2019 The gRPC Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import subprocess
import asyncio
import unittest
import socket
from grpc.experimental import aio
from tests_aio.unit import sync_server
def _get_free_loopback_tcp_port():
if socket.has_ipv6:
tcp_socket = socket.socket(socket.AF_INET6)
host = "::1"
host_target = "[::1]"
else:
tcp_socket = socket.socket(socket.AF_INET)
host = "127.0.0.1"
host_target = host
tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
tcp_socket.bind((host, 0))
address_tuple = tcp_socket.getsockname()
return tcp_socket, "%s:%s" % (host_target, address_tuple[1])
class _Server:
"""_Server is an wrapper for a sync-server subprocess.
The synchronous server is executed in another process which initializes
implicitly the grpc using the synchronous configuration. Both worlds
can not coexist within the same process.
"""
def __init__(self, host_and_port): # pylint: disable=W0621
self._host_and_port = host_and_port
self._handle = None
def start(self):
assert self._handle is None
try:
from google3.pyglib import resources
executable = resources.GetResourceFilename(
"google3/third_party/py/grpc/sync_server")
args = [executable, '--host_and_port', self._host_and_port]
except ImportError:
executable = sys.executable
directory, _ = os.path.split(os.path.abspath(__file__))
filename = directory + '/sync_server.py'
args = [
executable, filename, '--host_and_port', self._host_and_port
]
self._handle = subprocess.Popen(args)
def terminate(self):
if not self._handle:
return
self._handle.terminate()
self._handle.wait()
self._handle = None
class AioTestBase(unittest.TestCase):
def setUp(self):
self._socket, self._target = _get_free_loopback_tcp_port()
self._server = _Server(self._target)
self._server.start()
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
aio.init_grpc_aio()
def tearDown(self):
self._server.terminate()
self._socket.close()
@property
def loop(self):
return self._loop
@property
def server_target(self):
return self._target

@ -703,6 +703,10 @@ class PythonConfig(
class PythonLanguage(object):
_DEFAULT_COMMAND = 'test_lite'
_TEST_SPECS_FILE = 'src/python/grpcio_tests/tests/tests.json'
_TEST_FOLDER = 'test'
def configure(self, config, args):
self.config = config
self.args = args
@ -710,8 +714,7 @@ class PythonLanguage(object):
def test_specs(self):
# load list of known test suites
with open(
'src/python/grpcio_tests/tests/tests.json') as tests_json_file:
with open(self._TEST_SPECS_FILE) as tests_json_file:
tests_json = json.load(tests_json_file)
environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS)
return [
@ -721,7 +724,8 @@ class PythonLanguage(object):
environ=dict(
list(environment.items()) + [(
'GRPC_PYTHON_TESTRUNNER_FILTER', str(suite_name))]),
shortname='%s.test.%s' % (config.name, suite_name),
shortname='%s.%s.%s' % (config.name, self._TEST_FOLDER,
suite_name),
) for suite_name in tests_json for config in self.pythons
]
@ -789,7 +793,7 @@ class PythonLanguage(object):
venv_relative_python = ['bin/python']
toolchain = ['unix']
test_command = 'test_lite'
test_command = self._DEFAULT_COMMAND
if args.iomgr_platform == 'gevent':
test_command = 'test_gevent'
runner = [
@ -882,6 +886,31 @@ class PythonLanguage(object):
return 'python'
class PythonAioLanguage(PythonLanguage):
_DEFAULT_COMMAND = 'test_aio'
_TEST_SPECS_FILE = 'src/python/grpcio_tests/tests_aio/tests.json'
_TEST_FOLDER = 'test_aio'
def configure(self, config, args):
self.config = config
self.args = args
self.pythons = self._get_pythons(self.args)
def _get_pythons(self, args):
"""Get python runtimes to test with, based on current platform, architecture, compiler etc."""
if args.compiler not in ('python3.6', 'python3.7', 'python3.8'):
raise Exception('Compiler %s not supported.' % args.compiler)
if args.iomgr_platform not in ('native'):
raise Exception(
'Iomgr platform %s not supported.' % args.iomgr_platform)
return super()._get_pythons(args)
def __str__(self):
return 'python_aio'
class RubyLanguage(object):
def configure(self, config, args):
@ -1269,6 +1298,7 @@ _LANGUAGES = {
'php': PhpLanguage(),
'php7': Php7Language(),
'python': PythonLanguage(),
'python-aio': PythonAioLanguage(),
'ruby': RubyLanguage(),
'csharp': CSharpLanguage(),
'objc': ObjCLanguage(),

Loading…
Cancel
Save