diff --git a/AUTHORS b/AUTHORS index 0e8797391f2..0c5fee8c739 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1,3 +1,4 @@ Dropbox, Inc. Google Inc. +Skyscanner Ltd. WeWork Companies Inc. diff --git a/setup.py b/setup.py index 609b18e01b5..eed516a1f11 100644 --- a/setup.py +++ b/setup.py @@ -265,6 +265,7 @@ if 'darwin' in sys.platform and PY3: r'macosx-10.7-\1', util.get_platform()) + def cython_extensions_and_necessity(): cython_module_files = [os.path.join(PYTHON_STEM, name.replace('.', '/') + '.pyx') @@ -295,6 +296,8 @@ def cython_extensions_and_necessity(): need_cython = BUILD_WITH_CYTHON if not BUILD_WITH_CYTHON: need_cython = need_cython or not commands.check_and_update_cythonization(extensions) + # TODO: the strategy for conditional compiling and exposing the aio Cython + # dependencies will be revisited by https://github.com/grpc/grpc/issues/19728 return commands.try_cythonize(extensions, linetracing=ENABLE_CYTHON_TRACING, mandatory=BUILD_WITH_CYTHON), need_cython CYTHON_EXTENSION_MODULES, need_cython = cython_extensions_and_necessity() diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel index 18b1c92b9a7..e3cb89a81d3 100644 --- a/src/python/grpcio/grpc/_cython/BUILD.bazel +++ b/src/python/grpcio/grpc/_cython/BUILD.bazel @@ -8,6 +8,20 @@ pyx_library( "__init__.py", "_cygrpc/_hooks.pxd.pxi", "_cygrpc/_hooks.pyx.pxi", + "_cygrpc/aio/call.pxd.pxi", + "_cygrpc/aio/call.pyx.pxi", + "_cygrpc/aio/callbackcontext.pxd.pxi", + "_cygrpc/aio/channel.pxd.pxi", + "_cygrpc/aio/channel.pyx.pxi", + "_cygrpc/aio/grpc_aio.pxd.pxi", + "_cygrpc/aio/grpc_aio.pyx.pxi", + "_cygrpc/aio/iomgr/iomgr.pyx.pxi", + "_cygrpc/aio/iomgr/resolver.pxd.pxi", + "_cygrpc/aio/iomgr/resolver.pyx.pxi", + "_cygrpc/aio/iomgr/socket.pxd.pxi", + "_cygrpc/aio/iomgr/socket.pyx.pxi", + "_cygrpc/aio/iomgr/timer.pxd.pxi", + "_cygrpc/aio/iomgr/timer.pyx.pxi", "_cygrpc/arguments.pxd.pxi", "_cygrpc/arguments.pyx.pxi", "_cygrpc/call.pxd.pxi", @@ -27,6 +41,8 @@ pyx_library( "_cygrpc/grpc_gevent.pxd.pxi", "_cygrpc/grpc_gevent.pyx.pxi", "_cygrpc/grpc_string.pyx.pxi", + "_cygrpc/iomgr.pxd.pxi", + "_cygrpc/iomgr.pyx.pxi", "_cygrpc/metadata.pxd.pxi", "_cygrpc/metadata.pyx.pxi", "_cygrpc/operation.pxd.pxi", diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi new file mode 100644 index 00000000000..1166551fd5c --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi @@ -0,0 +1,27 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _AioCall: + cdef: + AioChannel _channel + CallbackContext _watcher_call + grpc_completion_queue * _cq + grpc_experimental_completion_queue_functor _functor + object _waiter_call + + @staticmethod + cdef void functor_run(grpc_experimental_completion_queue_functor* functor, int succeed) + @staticmethod + cdef void watcher_call_functor_run(grpc_experimental_completion_queue_functor* functor, int succeed) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi new file mode 100644 index 00000000000..9530a47f389 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -0,0 +1,149 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cimport cpython + +_EMPTY_FLAGS = 0 +_EMPTY_METADATA = () +_OP_ARRAY_LENGTH = 6 + + +cdef class _AioCall: + + + def __cinit__(self, AioChannel channel): + self._channel = channel + self._functor.functor_run = _AioCall.functor_run + + self._cq = grpc_completion_queue_create_for_callback( + &self._functor, + NULL + ) + + self._watcher_call.functor.functor_run = _AioCall.watcher_call_functor_run + self._watcher_call.waiter = self + self._waiter_call = None + + def __dealloc__(self): + grpc_completion_queue_shutdown(self._cq) + grpc_completion_queue_destroy(self._cq) + + def __repr__(self): + class_name = self.__class__.__name__ + id_ = id(self) + return f"<{class_name} {id_}>" + + @staticmethod + cdef void functor_run(grpc_experimental_completion_queue_functor* functor, int succeed): + pass + + @staticmethod + cdef void watcher_call_functor_run(grpc_experimental_completion_queue_functor* functor, int succeed): + call = <_AioCall>(functor).waiter + + assert call._waiter_call + + if succeed == 0: + call._waiter_call.set_exception(Exception("Some error ocurred")) + else: + call._waiter_call.set_result(None) + + async def unary_unary(self, method, request): + cdef grpc_call * call + cdef grpc_slice method_slice + cdef grpc_op * ops + + cdef Operation initial_metadata_operation + cdef Operation send_message_operation + cdef Operation send_close_from_client_operation + cdef Operation receive_initial_metadata_operation + cdef Operation receive_message_operation + cdef Operation receive_status_on_client_operation + + cdef grpc_call_error call_status + + + method_slice = grpc_slice_from_copied_buffer( + method, + len(method) + ) + + call = grpc_channel_create_call( + self._channel.channel, + NULL, + 0, + self._cq, + method_slice, + NULL, + _timespec_from_time(None), + NULL + ) + + grpc_slice_unref(method_slice) + + ops = gpr_malloc(sizeof(grpc_op) * _OP_ARRAY_LENGTH) + + initial_metadata_operation = SendInitialMetadataOperation(_EMPTY_METADATA, GRPC_INITIAL_METADATA_USED_MASK) + initial_metadata_operation.c() + ops[0] = initial_metadata_operation.c_op + + send_message_operation = SendMessageOperation(request, _EMPTY_FLAGS) + send_message_operation.c() + ops[1] = send_message_operation.c_op + + send_close_from_client_operation = SendCloseFromClientOperation(_EMPTY_FLAGS) + send_close_from_client_operation.c() + ops[2] = send_close_from_client_operation.c_op + + receive_initial_metadata_operation = ReceiveInitialMetadataOperation(_EMPTY_FLAGS) + receive_initial_metadata_operation.c() + ops[3] = receive_initial_metadata_operation.c_op + + receive_message_operation = ReceiveMessageOperation(_EMPTY_FLAGS) + receive_message_operation.c() + ops[4] = receive_message_operation.c_op + + receive_status_on_client_operation = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) + receive_status_on_client_operation.c() + ops[5] = receive_status_on_client_operation.c_op + + self._waiter_call = asyncio.get_event_loop().create_future() + + call_status = grpc_call_start_batch( + call, + ops, + _OP_ARRAY_LENGTH, + &self._watcher_call.functor, + NULL + ) + + try: + if call_status != GRPC_CALL_OK: + self._waiter_call = None + raise Exception("Error with grpc_call_start_batch {}".format(call_status)) + + await self._waiter_call + + finally: + initial_metadata_operation.un_c() + send_message_operation.un_c() + send_close_from_client_operation.un_c() + receive_initial_metadata_operation.un_c() + receive_message_operation.un_c() + receive_status_on_client_operation.un_c() + + grpc_call_unref(call) + gpr_free(ops) + + return receive_message_operation.message() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi new file mode 100644 index 00000000000..8e52c856dd2 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi @@ -0,0 +1,20 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cimport cpython + +cdef struct CallbackContext: + grpc_experimental_completion_queue_functor functor + cpython.PyObject *waiter + diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi new file mode 100644 index 00000000000..f5e1a5d3095 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi @@ -0,0 +1,18 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class AioChannel: + cdef: + grpc_channel * channel + bytes _target diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi new file mode 100644 index 00000000000..b52c070553d --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi @@ -0,0 +1,30 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class AioChannel: + def __cinit__(self, bytes target): + self.channel = grpc_insecure_channel_create(target, NULL, NULL) + self._target = target + + def __repr__(self): + class_name = self.__class__.__name__ + id_ = id(self) + return f"<{class_name} {id_}>" + + def close(self): + grpc_channel_destroy(self.channel) + + async def unary_unary(self, method, request): + call = _AioCall(self) + return await call.unary_unary(method, request) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi new file mode 100644 index 00000000000..6cefb63d208 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi @@ -0,0 +1,25 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +cdef extern from "src/core/lib/iomgr/timer_manager.h": + void grpc_timer_manager_set_threading(bint enabled); + +cdef extern from "src/core/lib/iomgr/iomgr_internal.h": + void grpc_set_default_iomgr_platform(); + +cdef extern from "src/core/lib/iomgr/executor.h" namespace "grpc_core": + cdef cppclass Executor: + @staticmethod + void SetThreadingAll(bint enable); diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi new file mode 100644 index 00000000000..64645a6c3b4 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -0,0 +1,37 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef bint _grpc_aio_initialized = 0 + + +def init_grpc_aio(): + global _grpc_aio_initialized + + if _grpc_aio_initialized: + return + + install_asyncio_iomgr() + grpc_init() + + # Timers are triggered by the Asyncio loop. We disable + # the background thread that is being used by the native + # gRPC iomgr. + grpc_timer_manager_set_threading(0) + + # gRPC callbaks are executed within the same thread used by the Asyncio + # event loop, as it is being done by the other Asyncio callbacks. + Executor.SetThreadingAll(0) + + _grpc_aio_initialized = 1 diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi new file mode 100644 index 00000000000..13e95ee1206 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -0,0 +1,185 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from cpython cimport Py_INCREF, Py_DECREF + +from libc cimport string + +cdef grpc_socket_vtable asyncio_socket_vtable +cdef grpc_custom_resolver_vtable asyncio_resolver_vtable +cdef grpc_custom_timer_vtable asyncio_timer_vtable +cdef grpc_custom_poller_vtable asyncio_pollset_vtable + + +cdef grpc_error* asyncio_socket_init( + grpc_custom_socket* grpc_socket, + int domain) with gil: + socket = _AsyncioSocket.create(grpc_socket) + Py_INCREF(socket) + grpc_socket.impl = socket + return 0 + + +cdef void asyncio_socket_destroy(grpc_custom_socket* grpc_socket) with gil: + Py_DECREF(<_AsyncioSocket>grpc_socket.impl) + + +cdef void asyncio_socket_connect( + grpc_custom_socket* grpc_socket, + const grpc_sockaddr* addr, + size_t addr_len, + grpc_custom_connect_callback connect_cb) with gil: + + host, port = sockaddr_to_tuple(addr, addr_len) + socket = <_AsyncioSocket>grpc_socket.impl + socket.connect(host, port, connect_cb) + + +cdef void asyncio_socket_close( + grpc_custom_socket* grpc_socket, + grpc_custom_close_callback close_cb) with gil: + socket = (<_AsyncioSocket>grpc_socket.impl) + socket.close() + close_cb(grpc_socket) + + +cdef void asyncio_socket_shutdown(grpc_custom_socket* grpc_socket) with gil: + socket = (<_AsyncioSocket>grpc_socket.impl) + socket.close() + + +cdef void asyncio_socket_write( + grpc_custom_socket* grpc_socket, + grpc_slice_buffer* slice_buffer, + grpc_custom_write_callback write_cb) with gil: + socket = (<_AsyncioSocket>grpc_socket.impl) + socket.write(slice_buffer, write_cb) + + +cdef void asyncio_socket_read( + grpc_custom_socket* grpc_socket, + char* buffer_, + size_t length, + grpc_custom_read_callback read_cb) with gil: + socket = (<_AsyncioSocket>grpc_socket.impl) + socket.read(buffer_, length, read_cb) + + +cdef grpc_error* asyncio_socket_getpeername( + grpc_custom_socket* grpc_socket, + const grpc_sockaddr* addr, + int* length) with gil: + raise NotImplemented() + + +cdef grpc_error* asyncio_socket_getsockname( + grpc_custom_socket* grpc_socket, + const grpc_sockaddr* addr, + int* length) with gil: + raise NotImplemented() + + +cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil: + raise NotImplemented() + + +cdef grpc_error* asyncio_socket_bind( + grpc_custom_socket* grpc_socket, + const grpc_sockaddr* addr, + size_t len, int flags) with gil: + raise NotImplemented() + + +cdef void asyncio_socket_accept( + grpc_custom_socket* grpc_socket, + grpc_custom_socket* grpc_socket_client, + grpc_custom_accept_callback accept_cb) with gil: + raise NotImplemented() + + +cdef grpc_error* asyncio_resolve( + char* host, + char* port, + grpc_resolved_addresses** res) with gil: + raise NotImplemented() + + +cdef void asyncio_resolve_async( + grpc_custom_resolver* grpc_resolver, + char* host, + char* port) with gil: + resolver = _AsyncioResolver.create(grpc_resolver) + resolver.resolve(host, port) + + +cdef void asyncio_timer_start(grpc_custom_timer* grpc_timer) with gil: + timer = _AsyncioTimer.create(grpc_timer, grpc_timer.timeout_ms / 1000.0) + Py_INCREF(timer) + grpc_timer.timer = timer + + +cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil: + timer = <_AsyncioTimer>grpc_timer.timer + timer.stop() + Py_DECREF(timer) + + +cdef void asyncio_init_loop() with gil: + pass + + +cdef void asyncio_destroy_loop() with gil: + pass + + +cdef void asyncio_kick_loop() with gil: + pass + + +cdef void asyncio_run_loop(size_t timeout_ms) with gil: + pass + + +def install_asyncio_iomgr(): + asyncio_resolver_vtable.resolve = asyncio_resolve + asyncio_resolver_vtable.resolve_async = asyncio_resolve_async + + asyncio_socket_vtable.init = asyncio_socket_init + asyncio_socket_vtable.connect = asyncio_socket_connect + asyncio_socket_vtable.destroy = asyncio_socket_destroy + asyncio_socket_vtable.shutdown = asyncio_socket_shutdown + asyncio_socket_vtable.close = asyncio_socket_close + asyncio_socket_vtable.write = asyncio_socket_write + asyncio_socket_vtable.read = asyncio_socket_read + asyncio_socket_vtable.getpeername = asyncio_socket_getpeername + asyncio_socket_vtable.getsockname = asyncio_socket_getsockname + asyncio_socket_vtable.bind = asyncio_socket_bind + asyncio_socket_vtable.listen = asyncio_socket_listen + asyncio_socket_vtable.accept = asyncio_socket_accept + + asyncio_timer_vtable.start = asyncio_timer_start + asyncio_timer_vtable.stop = asyncio_timer_stop + + asyncio_pollset_vtable.init = asyncio_init_loop + asyncio_pollset_vtable.poll = asyncio_run_loop + asyncio_pollset_vtable.kick = asyncio_kick_loop + asyncio_pollset_vtable.shutdown = asyncio_destroy_loop + + grpc_custom_iomgr_init( + &asyncio_socket_vtable, + &asyncio_resolver_vtable, + &asyncio_timer_vtable, + &asyncio_pollset_vtable + ) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi new file mode 100644 index 00000000000..26089c95337 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi @@ -0,0 +1,23 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class _AsyncioResolver: + cdef: + grpc_custom_resolver* _grpc_resolver + object _task_resolve + + @staticmethod + cdef _AsyncioResolver create(grpc_custom_resolver* grpc_resolver) + + cdef void resolve(self, char* host, char* port) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi new file mode 100644 index 00000000000..4c102392e5c --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi @@ -0,0 +1,61 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _AsyncioResolver: + def __cinit__(self): + self._grpc_resolver = NULL + self._task_resolve = None + + @staticmethod + cdef _AsyncioResolver create(grpc_custom_resolver* grpc_resolver): + resolver = _AsyncioResolver() + resolver._grpc_resolver = grpc_resolver + return resolver + + def __repr__(self): + class_name = self.__class__.__name__ + id_ = id(self) + return f"<{class_name} {id_}>" + + def _resolve_cb(self, future): + error = False + try: + res = future.result() + except Exception as e: + error = True + finally: + self._task_resolve = None + + if not error: + grpc_custom_resolve_callback( + self._grpc_resolver, + tuples_to_resolvaddr(res), + 0 + ) + else: + grpc_custom_resolve_callback( + self._grpc_resolver, + NULL, + grpc_socket_error("getaddrinfo {}".format(str(e)).encode()) + ) + + cdef void resolve(self, char* host, char* port): + assert not self._task_resolve + + loop = asyncio.get_event_loop() + self._task_resolve = asyncio.ensure_future( + loop.getaddrinfo(host, port) + ) + self._task_resolve.add_done_callback(self._resolve_cb) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi new file mode 100644 index 00000000000..aab5db2149a --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi @@ -0,0 +1,34 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _AsyncioSocket: + cdef: + grpc_custom_socket * _grpc_socket + grpc_custom_connect_callback _grpc_connect_cb + grpc_custom_read_callback _grpc_read_cb + object _reader + object _writer + object _task_read + object _task_connect + char * _read_buffer + + @staticmethod + cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket) + + cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb) + cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb) + cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb) + cdef bint is_connected(self) + cdef void close(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi new file mode 100644 index 00000000000..690c34c2da9 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -0,0 +1,134 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import socket + +from libc cimport string + +cdef class _AsyncioSocket: + def __cinit__(self): + self._grpc_socket = NULL + self._grpc_connect_cb = NULL + self._grpc_read_cb = NULL + self._reader = None + self._writer = None + self._task_connect = None + self._task_read = None + self._read_buffer = NULL + + @staticmethod + cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket): + socket = _AsyncioSocket() + socket._grpc_socket = grpc_socket + return socket + + def __repr__(self): + class_name = self.__class__.__name__ + id_ = id(self) + connected = self.is_connected() + return f"<{class_name} {id_} connected={connected}>" + + def _connect_cb(self, future): + error = False + try: + self._reader, self._writer = future.result() + except Exception as e: + error = True + finally: + self._task_connect = None + + if not error: + # gRPC default posix implementation disables nagle + # algorithm. + sock = self._writer.transport.get_extra_info('socket') + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) + + self._grpc_connect_cb( + self._grpc_socket, + 0 + ) + else: + self._grpc_connect_cb( + self._grpc_socket, + grpc_socket_error("connect {}".format(str(e)).encode()) + ) + + def _read_cb(self, future): + error = False + try: + buffer_ = future.result() + except Exception as e: + error = True + error_msg = str(e) + finally: + self._task_read = None + + if not error: + string.memcpy( + self._read_buffer, + buffer_, + len(buffer_) + ) + self._grpc_read_cb( + self._grpc_socket, + len(buffer_), + 0 + ) + else: + self._grpc_read_cb( + self._grpc_socket, + -1, + grpc_socket_error("read {}".format(error_msg).encode()) + ) + + cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb): + assert not self._task_connect + + self._task_connect = asyncio.ensure_future( + asyncio.open_connection(host, port) + ) + self._grpc_connect_cb = grpc_connect_cb + self._task_connect.add_done_callback(self._connect_cb) + + cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb): + assert not self._task_read + + self._task_read = asyncio.ensure_future( + self._reader.read(n=length) + ) + self._grpc_read_cb = grpc_read_cb + self._task_read.add_done_callback(self._read_cb) + self._read_buffer = buffer_ + + cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb): + cdef char* start + buffer_ = bytearray() + for i in range(g_slice_buffer.count): + start = grpc_slice_buffer_start(g_slice_buffer, i) + length = grpc_slice_buffer_length(g_slice_buffer, i) + buffer_.extend(start[:length]) + + self._writer.write(buffer_) + + grpc_write_cb( + self._grpc_socket, + 0 + ) + + cdef bint is_connected(self): + return self._reader and not self._reader._transport.is_closing() + + cdef void close(self): + if self.is_connected(): + self._writer.close() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi new file mode 100644 index 00000000000..5af5dcd9282 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi @@ -0,0 +1,25 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class _AsyncioTimer: + cdef: + grpc_custom_timer * _grpc_timer + object _deadline + object _timer_handler + int _active + + @staticmethod + cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, deadline) + + cdef stop(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi new file mode 100644 index 00000000000..e8edb4a5cf8 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi @@ -0,0 +1,45 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _AsyncioTimer: + def __cinit__(self): + self._grpc_timer = NULL + self._timer_handler = None + self._active = 0 + + @staticmethod + cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, deadline): + timer = _AsyncioTimer() + timer._grpc_timer = grpc_timer + timer._deadline = deadline + timer._timer_handler = asyncio.get_event_loop().call_later(deadline, timer._on_deadline) + timer._active = 1 + return timer + + def _on_deadline(self): + self._active = 0 + grpc_custom_timer_callback(self._grpc_timer, 0) + + def __repr__(self): + class_name = self.__class__.__name__ + id_ = id(self) + return f"<{class_name} {id_} deadline={self._deadline} active={self._active}>" + + cdef stop(self): + if self._active == 0: + return + + self._timer_handler.cancel() + self._active = 0 diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 4bfb42026aa..e2117905421 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -41,6 +41,11 @@ cdef extern from "grpc/byte_buffer_reader.h": pass +cdef extern from "grpc/impl/codegen/grpc_types.h": + ctypedef struct grpc_experimental_completion_queue_functor: + void (*functor_run)(grpc_experimental_completion_queue_functor*, int); + + cdef extern from "grpc/grpc.h": ctypedef struct grpc_slice: @@ -325,6 +330,7 @@ cdef extern from "grpc/grpc.h": ctypedef struct grpc_op: grpc_op_type type "op" uint32_t flags + void * reserved grpc_op_data data void grpc_init() nogil @@ -350,6 +356,10 @@ cdef extern from "grpc/grpc.h": void grpc_completion_queue_shutdown(grpc_completion_queue *cq) nogil void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil + grpc_completion_queue *grpc_completion_queue_create_for_callback( + grpc_experimental_completion_queue_functor* shutdown_callback, + void *reserved) nogil + grpc_call_error grpc_call_start_batch( grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi index 30fdf6a7600..4f5033b8e44 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi @@ -13,115 +13,6 @@ # limitations under the License. # distutils: language=c++ -cdef extern from "grpc/impl/codegen/slice.h": - struct grpc_slice_buffer: - int count - -cdef extern from "src/core/lib/iomgr/error.h": - struct grpc_error: - pass - -cdef extern from "src/core/lib/iomgr/gevent_util.h": - grpc_error* grpc_socket_error(char* error) - char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i) - int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i) - -cdef extern from "src/core/lib/iomgr/sockaddr.h": - ctypedef struct grpc_sockaddr: - pass - -cdef extern from "src/core/lib/iomgr/resolve_address.h": - ctypedef struct grpc_resolved_addresses: - size_t naddrs - grpc_resolved_address* addrs - - ctypedef struct grpc_resolved_address: - char[128] addr - size_t len - -cdef extern from "src/core/lib/iomgr/resolve_address_custom.h": - struct grpc_custom_resolver: - pass - - struct grpc_custom_resolver_vtable: - grpc_error* (*resolve)(char* host, char* port, grpc_resolved_addresses** res); - void (*resolve_async)(grpc_custom_resolver* resolver, char* host, char* port); - - void grpc_custom_resolve_callback(grpc_custom_resolver* resolver, - grpc_resolved_addresses* result, - grpc_error* error); - -cdef extern from "src/core/lib/iomgr/tcp_custom.h": - struct grpc_custom_socket: - void* impl - # We don't care about the rest of the fields - ctypedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket, - grpc_error* error) - ctypedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket, - grpc_error* error) - ctypedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket, - size_t nread, grpc_error* error) - ctypedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket, - grpc_custom_socket* client, - grpc_error* error) - ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket) - - struct grpc_socket_vtable: - grpc_error* (*init)(grpc_custom_socket* socket, int domain); - void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr, - size_t len, grpc_custom_connect_callback cb); - void (*destroy)(grpc_custom_socket* socket); - void (*shutdown)(grpc_custom_socket* socket); - void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb); - void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices, - grpc_custom_write_callback cb); - void (*read)(grpc_custom_socket* socket, char* buffer, size_t length, - grpc_custom_read_callback cb); - grpc_error* (*getpeername)(grpc_custom_socket* socket, - const grpc_sockaddr* addr, int* len); - grpc_error* (*getsockname)(grpc_custom_socket* socket, - const grpc_sockaddr* addr, int* len); - grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr, - size_t len, int flags); - grpc_error* (*listen)(grpc_custom_socket* socket); - void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client, - grpc_custom_accept_callback cb); - -cdef extern from "src/core/lib/iomgr/timer_custom.h": - struct grpc_custom_timer: - void* timer - int timeout_ms - # We don't care about the rest of the fields - - struct grpc_custom_timer_vtable: - void (*start)(grpc_custom_timer* t); - void (*stop)(grpc_custom_timer* t); - - void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error); - -cdef extern from "src/core/lib/iomgr/pollset_custom.h": - struct grpc_custom_poller_vtable: - void (*init)() - void (*poll)(size_t timeout_ms) - void (*kick)() - void (*shutdown)() - -cdef extern from "src/core/lib/iomgr/iomgr_custom.h": - void grpc_custom_iomgr_init(grpc_socket_vtable* socket, - grpc_custom_resolver_vtable* resolver, - grpc_custom_timer_vtable* timer, - grpc_custom_poller_vtable* poller); - -cdef extern from "src/core/lib/iomgr/sockaddr_utils.h": - int grpc_sockaddr_get_port(const grpc_resolved_address *addr); - int grpc_sockaddr_to_string(char **out, const grpc_resolved_address *addr, - int normalize); - void grpc_string_to_sockaddr(grpc_resolved_address *out, char* addr, int port); - int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr, - int port) - const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* resolved_addr) - - cdef class TimerWrapper: cdef grpc_custom_timer *c_timer diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi index a1618d04d0e..13256ed49b8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi @@ -15,7 +15,6 @@ cimport cpython from libc cimport string -from libc.stdlib cimport malloc, free import errno gevent_g = None gevent_socket = None @@ -24,51 +23,6 @@ gevent_event = None g_event = None g_pool = None -cdef grpc_error* grpc_error_none(): - return 0 - -cdef grpc_error* socket_error(str syscall, str err): - error_str = "{} failed: {}".format(syscall, err) - error_bytes = str_to_bytes(error_str) - return grpc_socket_error(error_bytes) - -cdef resolved_addr_to_tuple(grpc_resolved_address* address): - cdef char* res_str - port = grpc_sockaddr_get_port(address) - str_len = grpc_sockaddr_to_string(&res_str, address, 0) - byte_str = _decode(res_str[:str_len]) - if byte_str.endswith(':' + str(port)): - byte_str = byte_str[:(0 - len(str(port)) - 1)] - byte_str = byte_str.lstrip('[') - byte_str = byte_str.rstrip(']') - byte_str = '{}'.format(byte_str) - return byte_str, port - -cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length): - cdef grpc_resolved_address c_addr - string.memcpy(c_addr.addr, address, length) - c_addr.len = length - return resolved_addr_to_tuple(&c_addr) - -cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length): - cdef grpc_resolved_address c_addr - string.memcpy(c_addr.addr, address, length) - c_addr.len = length - return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4' - -cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups): - cdef grpc_resolved_addresses* addresses - tups_set = set((tup[4][0], tup[4][1]) for tup in tups) - addresses = malloc(sizeof(grpc_resolved_addresses)) - addresses.naddrs = len(tups_set) - addresses.addrs = malloc(sizeof(grpc_resolved_address) * len(tups_set)) - i = 0 - for tup in set(tups_set): - hostname = str_to_bytes(tup[0]) - grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1]) - i += 1 - return addresses - def _spawn_greenlet(*args): greenlet = g_pool.spawn(*args) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi new file mode 100644 index 00000000000..2f00bab3c0a --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi @@ -0,0 +1,124 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +cdef extern from "grpc/impl/codegen/slice.h": + struct grpc_slice_buffer: + int count + +cdef extern from "src/core/lib/iomgr/error.h": + struct grpc_error: + pass + +# TODO(https://github.com/grpc/grpc/issues/20135) Change the filename +# for something more meaningful. +cdef extern from "src/core/lib/iomgr/gevent_util.h": + grpc_error* grpc_socket_error(char* error) + char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i) + int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i) + +cdef extern from "src/core/lib/iomgr/sockaddr.h": + ctypedef struct grpc_sockaddr: + pass + +cdef extern from "src/core/lib/iomgr/resolve_address.h": + ctypedef struct grpc_resolved_addresses: + size_t naddrs + grpc_resolved_address* addrs + + ctypedef struct grpc_resolved_address: + char[128] addr + size_t len + +cdef extern from "src/core/lib/iomgr/resolve_address_custom.h": + struct grpc_custom_resolver: + pass + + struct grpc_custom_resolver_vtable: + grpc_error* (*resolve)(char* host, char* port, grpc_resolved_addresses** res); + void (*resolve_async)(grpc_custom_resolver* resolver, char* host, char* port); + + void grpc_custom_resolve_callback(grpc_custom_resolver* resolver, + grpc_resolved_addresses* result, + grpc_error* error); + +cdef extern from "src/core/lib/iomgr/tcp_custom.h": + struct grpc_custom_socket: + void* impl + # We don't care about the rest of the fields + ctypedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket, + grpc_error* error) + ctypedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket, + grpc_error* error) + ctypedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket, + size_t nread, grpc_error* error) + ctypedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket, + grpc_custom_socket* client, + grpc_error* error) + ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket) + + struct grpc_socket_vtable: + grpc_error* (*init)(grpc_custom_socket* socket, int domain); + void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr, + size_t len, grpc_custom_connect_callback cb); + void (*destroy)(grpc_custom_socket* socket); + void (*shutdown)(grpc_custom_socket* socket); + void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb); + void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices, + grpc_custom_write_callback cb); + void (*read)(grpc_custom_socket* socket, char* buffer, size_t length, + grpc_custom_read_callback cb); + grpc_error* (*getpeername)(grpc_custom_socket* socket, + const grpc_sockaddr* addr, int* len); + grpc_error* (*getsockname)(grpc_custom_socket* socket, + const grpc_sockaddr* addr, int* len); + grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr, + size_t len, int flags); + grpc_error* (*listen)(grpc_custom_socket* socket); + void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client, + grpc_custom_accept_callback cb); + +cdef extern from "src/core/lib/iomgr/timer_custom.h": + struct grpc_custom_timer: + void* timer + int timeout_ms + # We don't care about the rest of the fields + + struct grpc_custom_timer_vtable: + void (*start)(grpc_custom_timer* t); + void (*stop)(grpc_custom_timer* t); + + void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error); + +cdef extern from "src/core/lib/iomgr/pollset_custom.h": + struct grpc_custom_poller_vtable: + void (*init)() + void (*poll)(size_t timeout_ms) + void (*kick)() + void (*shutdown)() + +cdef extern from "src/core/lib/iomgr/iomgr_custom.h": + void grpc_custom_iomgr_init(grpc_socket_vtable* socket, + grpc_custom_resolver_vtable* resolver, + grpc_custom_timer_vtable* timer, + grpc_custom_poller_vtable* poller); + +cdef extern from "src/core/lib/iomgr/sockaddr_utils.h": + int grpc_sockaddr_get_port(const grpc_resolved_address *addr); + int grpc_sockaddr_to_string(char **out, const grpc_resolved_address *addr, + int normalize); + void grpc_string_to_sockaddr(grpc_resolved_address *out, char* addr, int port); + int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr, + int port) + const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* resolved_addr) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi new file mode 100644 index 00000000000..9274f1c5fdb --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi @@ -0,0 +1,62 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +from libc cimport string +from libc.stdlib cimport malloc + +cdef grpc_error* grpc_error_none(): + return 0 + +cdef grpc_error* socket_error(str syscall, str err): + error_str = "{} failed: {}".format(syscall, err) + error_bytes = str_to_bytes(error_str) + return grpc_socket_error(error_bytes) + +cdef resolved_addr_to_tuple(grpc_resolved_address* address): + cdef char* res_str + port = grpc_sockaddr_get_port(address) + str_len = grpc_sockaddr_to_string(&res_str, address, 0) + byte_str = _decode(res_str[:str_len]) + if byte_str.endswith(':' + str(port)): + byte_str = byte_str[:(0 - len(str(port)) - 1)] + byte_str = byte_str.lstrip('[') + byte_str = byte_str.rstrip(']') + byte_str = '{}'.format(byte_str) + return byte_str, port + +cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length): + cdef grpc_resolved_address c_addr + string.memcpy(c_addr.addr, address, length) + c_addr.len = length + return resolved_addr_to_tuple(&c_addr) + +cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length): + cdef grpc_resolved_address c_addr + string.memcpy(c_addr.addr, address, length) + c_addr.len = length + return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4' + +cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups): + cdef grpc_resolved_addresses* addresses + tups_set = set((tup[4][0], tup[4][1]) for tup in tups) + addresses = malloc(sizeof(grpc_resolved_addresses)) + addresses.naddrs = len(tups_set) + addresses.addrs = malloc(sizeof(grpc_resolved_address) * len(tups_set)) + i = 0 + for tup in set(tups_set): + hostname = str_to_bytes(tup[0]) + grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1]) + i += 1 + return addresses diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index e29f7aee97a..4d081fb83e2 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -32,7 +32,18 @@ include "_cygrpc/time.pxd.pxi" include "_cygrpc/vtable.pxd.pxi" include "_cygrpc/_hooks.pxd.pxi" +include "_cygrpc/iomgr.pxd.pxi" + include "_cygrpc/grpc_gevent.pxd.pxi" IF UNAME_SYSNAME != "Windows": include "_cygrpc/fork_posix.pxd.pxi" + +# Following pxi files are part of the Aio module +include "_cygrpc/aio/iomgr/socket.pxd.pxi" +include "_cygrpc/aio/iomgr/timer.pxd.pxi" +include "_cygrpc/aio/iomgr/resolver.pxd.pxi" +include "_cygrpc/aio/grpc_aio.pxd.pxi" +include "_cygrpc/aio/callbackcontext.pxd.pxi" +include "_cygrpc/aio/call.pxd.pxi" +include "_cygrpc/aio/channel.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index f2dd0df89d4..316fb993095 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -17,6 +17,13 @@ cimport cpython import os.path import sys +try: + import asyncio +except ImportError: + # TODO(https://github.com/grpc/grpc/issues/19728) Improve how Aio Cython is + # distributed without breaking none compatible Python versions. For now, if + # Asyncio package is not available we just skip it. + pass # TODO(atash): figure out why the coverage tool gets confused about the Cython # coverage plugin when the following files don't have a '.pxi' suffix. @@ -39,6 +46,8 @@ include "_cygrpc/time.pyx.pxi" include "_cygrpc/vtable.pyx.pxi" include "_cygrpc/_hooks.pyx.pxi" +include "_cygrpc/iomgr.pyx.pxi" + include "_cygrpc/grpc_gevent.pyx.pxi" IF UNAME_SYSNAME == "Windows": @@ -46,6 +55,16 @@ IF UNAME_SYSNAME == "Windows": ELSE: include "_cygrpc/fork_posix.pyx.pxi" +# Following pxi files are part of the Aio module +include "_cygrpc/aio/iomgr/iomgr.pyx.pxi" +include "_cygrpc/aio/iomgr/socket.pyx.pxi" +include "_cygrpc/aio/iomgr/timer.pyx.pxi" +include "_cygrpc/aio/iomgr/resolver.pyx.pxi" +include "_cygrpc/aio/grpc_aio.pyx.pxi" +include "_cygrpc/aio/call.pyx.pxi" +include "_cygrpc/aio/channel.pyx.pxi" + + # # initialize gRPC # diff --git a/src/python/grpcio/grpc/experimental/BUILD.bazel b/src/python/grpcio/grpc/experimental/BUILD.bazel index cd8afe533b6..00815c4e72e 100644 --- a/src/python/grpcio/grpc/experimental/BUILD.bazel +++ b/src/python/grpcio/grpc/experimental/BUILD.bazel @@ -1,9 +1,21 @@ package(default_visibility = ["//visibility:public"]) +py_library( + name = "aio", + srcs = [ + "aio/__init__.py", + "aio/_channel.py", + ], + deps = [ + "//src/python/grpcio/grpc/_cython:cygrpc", + ], +) + py_library( name = "experimental", srcs = ["__init__.py",], deps = [ + ":aio", ":gevent", ":session_cache", ], diff --git a/src/python/grpcio/grpc/experimental/aio/__init__.py b/src/python/grpcio/grpc/experimental/aio/__init__.py new file mode 100644 index 00000000000..6004126549b --- /dev/null +++ b/src/python/grpcio/grpc/experimental/aio/__init__.py @@ -0,0 +1,123 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""gRPC's Asynchronous Python API.""" + +import abc +import six + +from grpc._cython.cygrpc import init_grpc_aio + + +class Channel(six.with_metaclass(abc.ABCMeta)): + """Asynchronous Channel implementation.""" + + @abc.abstractmethod + def unary_unary(self, + method, + request_serializer=None, + response_deserializer=None): + """Creates a UnaryUnaryMultiCallable for a unary-unary method. + + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None + is passed. + + Returns: + A UnaryUnaryMultiCallable value for the named unary-unary method. + """ + raise NotImplementedError() + + @abc.abstractmethod + async def close(self): + """Closes this Channel and releases all resources held by it. + + Closing the Channel will proactively terminate all RPCs active with the + Channel and it is not valid to invoke new RPCs with the Channel. + + This method is idempotent. + """ + raise NotImplementedError() + + @abc.abstractmethod + async def __aenter__(self): + """Starts an asynchronous context manager. + + Returns: + Channel the channel that was instantiated. + """ + raise NotImplementedError() + + @abc.abstractmethod + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Finishes the asynchronous context manager by closing gracefully the channel.""" + raise NotImplementedError() + + +class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): + """Affords invoking a unary-unary RPC from client-side in an asynchronous way.""" + + @abc.abstractmethod + async def __call__(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None): + """Asynchronously invokes the underlying RPC. + + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow + for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. Only valid for + secure Channel. + wait_for_ready: This is an EXPERIMENTAL argument. An optional + flag to enable wait for ready mechanism + compression: An element of grpc.compression, e.g. + grpc.compression.Gzip. This is an EXPERIMENTAL option. + + Returns: + The response value for the RPC. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ + raise NotImplementedError() + + +def insecure_channel(target, options=None, compression=None): + """Creates an insecure asynchronous Channel to a server. + + Args: + target: The server address + options: An optional list of key-value pairs (channel args + in gRPC Core runtime) to configure the channel. + compression: An optional value indicating the compression method to be + used over the lifetime of the channel. This is an EXPERIMENTAL option. + + Returns: + A Channel. + """ + from grpc.experimental.aio import _channel # pylint: disable=cyclic-import + return _channel.Channel(target, () + if options is None else options, None, compression) diff --git a/src/python/grpcio/grpc/experimental/aio/_channel.py b/src/python/grpcio/grpc/experimental/aio/_channel.py new file mode 100644 index 00000000000..e3c8fcdbf2f --- /dev/null +++ b/src/python/grpcio/grpc/experimental/aio/_channel.py @@ -0,0 +1,105 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Invocation-side implementation of gRPC Asyncio Python.""" + +from grpc import _common +from grpc._cython import cygrpc +from grpc.experimental import aio + + +class UnaryUnaryMultiCallable(aio.UnaryUnaryMultiCallable): + + def __init__(self, channel, method, request_serializer, + response_deserializer): + self._channel = channel + self._method = method + self._request_serializer = request_serializer + self._response_deserializer = response_deserializer + + async def __call__(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None): + + if timeout: + raise NotImplementedError("TODO: timeout not implemented yet") + + if metadata: + raise NotImplementedError("TODO: metadata not implemented yet") + + if credentials: + raise NotImplementedError("TODO: credentials not implemented yet") + + if wait_for_ready: + raise NotImplementedError( + "TODO: wait_for_ready not implemented yet") + + if compression: + raise NotImplementedError("TODO: compression not implemented yet") + + response = await self._channel.unary_unary( + self._method, _common.serialize(request, self._request_serializer)) + + return _common.deserialize(response, self._response_deserializer) + + +class Channel(aio.Channel): + """A cygrpc.AioChannel-backed implementation of grpc.experimental.aio.Channel.""" + + def __init__(self, target, options, credentials, compression): + """Constructor. + + Args: + target: The target to which to connect. + options: Configuration options for the channel. + credentials: A cygrpc.ChannelCredentials or None. + compression: An optional value indicating the compression method to be + used over the lifetime of the channel. + """ + + if options: + raise NotImplementedError("TODO: options not implemented yet") + + if credentials: + raise NotImplementedError("TODO: credentials not implemented yet") + + if compression: + raise NotImplementedError("TODO: compression not implemented yet") + + self._channel = cygrpc.AioChannel(_common.encode(target)) + + def unary_unary(self, + method, + request_serializer=None, + response_deserializer=None): + + return UnaryUnaryMultiCallable(self._channel, _common.encode(method), + request_serializer, + response_deserializer) + + async def _close(self): + # TODO: Send cancellation status + self._channel.close() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self._close() + + async def close(self): + await self._close() diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 61d8bdc1f7b..2912ba113c9 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -107,6 +107,35 @@ class TestLite(setuptools.Command): self.distribution.fetch_build_eggs(self.distribution.tests_require) +class TestAio(setuptools.Command): + """Command to run aio tests without fetching or building anything.""" + + description = 'run aio tests without fetching or building anything.' + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + self._add_eggs_to_path() + + import tests + loader = tests.Loader() + loader.loadTestsFromNames(['tests_aio']) + runner = tests.Runner() + result = runner.run(loader.suite) + if not result.wasSuccessful(): + sys.exit('Test failure') + + def _add_eggs_to_path(self): + """Fetch install and test requirements""" + self.distribution.fetch_build_eggs(self.distribution.install_requires) + self.distribution.fetch_build_eggs(self.distribution.tests_require) + + class TestGevent(setuptools.Command): """Command to run tests w/gevent.""" diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index f9cb9d0cec9..50ba1bb5942 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -58,6 +58,7 @@ COMMAND_CLASS = { 'run_interop': commands.RunInterop, 'test_lite': commands.TestLite, 'test_gevent': commands.TestGevent, + 'test_aio': commands.TestAio, } PACKAGE_DATA = { diff --git a/src/python/grpcio_tests/tests/_sanity/_sanity_test.py b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py index 7da6e7b34c3..6b4efdaca9f 100644 --- a/src/python/grpcio_tests/tests/_sanity/_sanity_test.py +++ b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py @@ -25,17 +25,20 @@ class SanityTest(unittest.TestCase): maxDiff = 32768 + TEST_PKG_MODULE_NAME = 'tests' + TEST_PKG_PATH = 'tests' + def testTestsJsonUpToDate(self): """Autodiscovers all test suites and checks that tests.json is up to date""" loader = tests.Loader() - loader.loadTestsFromNames(['tests']) + loader.loadTestsFromNames([self.TEST_PKG_MODULE_NAME]) test_suite_names = sorted({ test_case_class.id().rsplit('.', 1)[0] for test_case_class in tests._loader.iterate_suite_cases( loader.suite) }) - tests_json_string = pkgutil.get_data('tests', 'tests.json') + tests_json_string = pkgutil.get_data(self.TEST_PKG_PATH, 'tests.json') tests_json = json.loads(tests_json_string.decode() if six.PY3 else tests_json_string) diff --git a/src/python/grpcio_tests/tests_aio/__init__.py b/src/python/grpcio_tests/tests_aio/__init__.py new file mode 100644 index 00000000000..8ddd3106965 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +from tests import _loader +from tests import _runner + +Loader = _loader.Loader +Runner = _runner.Runner diff --git a/src/python/grpcio_tests/tests_aio/_sanity/__init__.py b/src/python/grpcio_tests/tests_aio/_sanity/__init__.py new file mode 100644 index 00000000000..f4b321fc5b2 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/_sanity/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2019 The gRPC Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests_aio/_sanity/_sanity_test.py b/src/python/grpcio_tests/tests_aio/_sanity/_sanity_test.py new file mode 100644 index 00000000000..e74dec0739b --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/_sanity/_sanity_test.py @@ -0,0 +1,27 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from tests._sanity import _sanity_test + + +class AioSanityTest(_sanity_test.SanityTest): + + TEST_PKG_MODULE_NAME = 'tests_aio' + TEST_PKG_PATH = 'tests_aio' + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/tests.json b/src/python/grpcio_tests/tests_aio/tests.json new file mode 100644 index 00000000000..49d025a5abe --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/tests.json @@ -0,0 +1,5 @@ +[ + "_sanity._sanity_test.AioSanityTest", + "unit.channel_test.TestChannel", + "unit.init_test.TestInsecureChannel" +] diff --git a/src/python/grpcio_tests/tests_aio/unit/__init__.py b/src/python/grpcio_tests/tests_aio/unit/__init__.py new file mode 100644 index 00000000000..f4b321fc5b2 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2019 The gRPC Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py new file mode 100644 index 00000000000..6bc53ec625e --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -0,0 +1,58 @@ +# Copyright 2019 The gRPC Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import unittest + +from grpc.experimental import aio +from tests_aio.unit import test_base +from src.proto.grpc.testing import messages_pb2 + + +class TestChannel(test_base.AioTestBase): + + def test_async_context(self): + + async def coro(): + async with aio.insecure_channel(self.server_target) as channel: + hi = channel.unary_unary( + '/grpc.testing.TestService/UnaryCall', + request_serializer=messages_pb2.SimpleRequest. + SerializeToString, + response_deserializer=messages_pb2.SimpleResponse.FromString + ) + await hi(messages_pb2.SimpleRequest()) + + self.loop.run_until_complete(coro()) + + def test_unary_unary(self): + + async def coro(): + channel = aio.insecure_channel(self.server_target) + hi = channel.unary_unary( + '/grpc.testing.TestService/UnaryCall', + request_serializer=messages_pb2.SimpleRequest.SerializeToString, + response_deserializer=messages_pb2.SimpleResponse.FromString) + response = await hi(messages_pb2.SimpleRequest()) + + self.assertEqual(type(response), messages_pb2.SimpleResponse) + + await channel.close() + + self.loop.run_until_complete(coro()) + + +if __name__ == '__main__': + logging.basicConfig() + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/unit/init_test.py b/src/python/grpcio_tests/tests_aio/unit/init_test.py new file mode 100644 index 00000000000..ab580f18e11 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/init_test.py @@ -0,0 +1,35 @@ +# Copyright 2019 The gRPC Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import unittest + +from grpc.experimental import aio +from tests_aio.unit import test_base + + +class TestInsecureChannel(test_base.AioTestBase): + + def test_insecure_channel(self): + + async def coro(): + channel = aio.insecure_channel(self.server_target) + self.assertIsInstance(channel, aio.Channel) + + self.loop.run_until_complete(coro()) + + +if __name__ == '__main__': + logging.basicConfig() + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/unit/sync_server.py b/src/python/grpcio_tests/tests_aio/unit/sync_server.py new file mode 100644 index 00000000000..105ded8e76c --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/sync_server.py @@ -0,0 +1,50 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + +from concurrent import futures +from time import sleep + +import grpc +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import test_pb2_grpc + + +# TODO (https://github.com/grpc/grpc/issues/19762) +# Change for an asynchronous server version once it's implemented. +class TestServiceServicer(test_pb2_grpc.TestServiceServicer): + + def UnaryCall(self, request, context): + return messages_pb2.SimpleResponse() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Synchronous gRPC server.') + parser.add_argument( + '--host_and_port', + required=True, + type=str, + nargs=1, + help='the host and port to listen.') + args = parser.parse_args() + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(('grpc.so_reuseport', 1),)) + test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(), + server) + server.add_insecure_port(args.host_and_port[0]) + server.start() + server.wait_for_termination() diff --git a/src/python/grpcio_tests/tests_aio/unit/test_base.py b/src/python/grpcio_tests/tests_aio/unit/test_base.py new file mode 100644 index 00000000000..0b325523e0f --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/test_base.py @@ -0,0 +1,101 @@ +# Copyright 2019 The gRPC Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import subprocess + +import asyncio +import unittest +import socket + +from grpc.experimental import aio +from tests_aio.unit import sync_server + + +def _get_free_loopback_tcp_port(): + if socket.has_ipv6: + tcp_socket = socket.socket(socket.AF_INET6) + host = "::1" + host_target = "[::1]" + else: + tcp_socket = socket.socket(socket.AF_INET) + host = "127.0.0.1" + host_target = host + tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + tcp_socket.bind((host, 0)) + address_tuple = tcp_socket.getsockname() + return tcp_socket, "%s:%s" % (host_target, address_tuple[1]) + + +class _Server: + """_Server is an wrapper for a sync-server subprocess. + + The synchronous server is executed in another process which initializes + implicitly the grpc using the synchronous configuration. Both worlds + can not coexist within the same process. + """ + + def __init__(self, host_and_port): # pylint: disable=W0621 + self._host_and_port = host_and_port + self._handle = None + + def start(self): + assert self._handle is None + + try: + from google3.pyglib import resources + executable = resources.GetResourceFilename( + "google3/third_party/py/grpc/sync_server") + args = [executable, '--host_and_port', self._host_and_port] + except ImportError: + executable = sys.executable + directory, _ = os.path.split(os.path.abspath(__file__)) + filename = directory + '/sync_server.py' + args = [ + executable, filename, '--host_and_port', self._host_and_port + ] + + self._handle = subprocess.Popen(args) + + def terminate(self): + if not self._handle: + return + + self._handle.terminate() + self._handle.wait() + self._handle = None + + +class AioTestBase(unittest.TestCase): + + def setUp(self): + self._socket, self._target = _get_free_loopback_tcp_port() + self._server = _Server(self._target) + self._server.start() + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + aio.init_grpc_aio() + + def tearDown(self): + self._server.terminate() + self._socket.close() + + @property + def loop(self): + return self._loop + + @property + def server_target(self): + return self._target diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index a3708dc2b80..3476f9ce4c8 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -703,6 +703,10 @@ class PythonConfig( class PythonLanguage(object): + _DEFAULT_COMMAND = 'test_lite' + _TEST_SPECS_FILE = 'src/python/grpcio_tests/tests/tests.json' + _TEST_FOLDER = 'test' + def configure(self, config, args): self.config = config self.args = args @@ -710,8 +714,7 @@ class PythonLanguage(object): def test_specs(self): # load list of known test suites - with open( - 'src/python/grpcio_tests/tests/tests.json') as tests_json_file: + with open(self._TEST_SPECS_FILE) as tests_json_file: tests_json = json.load(tests_json_file) environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS) return [ @@ -721,7 +724,8 @@ class PythonLanguage(object): environ=dict( list(environment.items()) + [( 'GRPC_PYTHON_TESTRUNNER_FILTER', str(suite_name))]), - shortname='%s.test.%s' % (config.name, suite_name), + shortname='%s.%s.%s' % (config.name, self._TEST_FOLDER, + suite_name), ) for suite_name in tests_json for config in self.pythons ] @@ -789,7 +793,7 @@ class PythonLanguage(object): venv_relative_python = ['bin/python'] toolchain = ['unix'] - test_command = 'test_lite' + test_command = self._DEFAULT_COMMAND if args.iomgr_platform == 'gevent': test_command = 'test_gevent' runner = [ @@ -882,6 +886,31 @@ class PythonLanguage(object): return 'python' +class PythonAioLanguage(PythonLanguage): + + _DEFAULT_COMMAND = 'test_aio' + _TEST_SPECS_FILE = 'src/python/grpcio_tests/tests_aio/tests.json' + _TEST_FOLDER = 'test_aio' + + def configure(self, config, args): + self.config = config + self.args = args + self.pythons = self._get_pythons(self.args) + + def _get_pythons(self, args): + """Get python runtimes to test with, based on current platform, architecture, compiler etc.""" + + if args.compiler not in ('python3.6', 'python3.7', 'python3.8'): + raise Exception('Compiler %s not supported.' % args.compiler) + if args.iomgr_platform not in ('native'): + raise Exception( + 'Iomgr platform %s not supported.' % args.iomgr_platform) + return super()._get_pythons(args) + + def __str__(self): + return 'python_aio' + + class RubyLanguage(object): def configure(self, config, args): @@ -1269,6 +1298,7 @@ _LANGUAGES = { 'php': PhpLanguage(), 'php7': Php7Language(), 'python': PythonLanguage(), + 'python-aio': PythonAioLanguage(), 'ruby': RubyLanguage(), 'csharp': CSharpLanguage(), 'objc': ObjCLanguage(),