diff --git a/.gitignore b/.gitignore index 0f3cd78947d..cde82bc3d36 100644 --- a/.gitignore +++ b/.gitignore @@ -15,8 +15,10 @@ python_pylint_venv/ htmlcov/ dist/ *.egg -py27/ -py3[0-9]*/ +py27_gevent/ +py27_native/ +py3[0-9]_gevent/ +py3[0-9]_native/ # Node installation output node_modules diff --git a/setup.py b/setup.py index 7c07c5614c0..a45c3503824 100644 --- a/setup.py +++ b/setup.py @@ -103,9 +103,9 @@ ENABLE_DOCUMENTATION_BUILD = os.environ.get( EXTRA_ENV_COMPILE_ARGS = os.environ.get('GRPC_PYTHON_CFLAGS', None) EXTRA_ENV_LINK_ARGS = os.environ.get('GRPC_PYTHON_LDFLAGS', None) if EXTRA_ENV_COMPILE_ARGS is None: - EXTRA_ENV_COMPILE_ARGS = '' + EXTRA_ENV_COMPILE_ARGS = ' -std=c++11' if 'win32' in sys.platform and sys.version_info < (3, 5): - EXTRA_ENV_COMPILE_ARGS += ' -std=c++11' + EXTRA_ENV_COMPILE_ARGS += ' -D_hypot=hypot' # We use define flags here and don't directly add to DEFINE_MACROS below to # ensure that the expert user/builder has a way of turning it off (via the # envvars) without adding yet more GRPC-specific envvars. @@ -115,10 +115,10 @@ if EXTRA_ENV_COMPILE_ARGS is None: else: EXTRA_ENV_COMPILE_ARGS += ' -D_ftime=_ftime64 -D_timeb=__timeb64' elif "linux" in sys.platform: - EXTRA_ENV_COMPILE_ARGS += ' -std=c++11 -std=gnu99 -fvisibility=hidden -fno-wrapv -fno-exceptions' + EXTRA_ENV_COMPILE_ARGS += ' -std=gnu99 -fvisibility=hidden -fno-wrapv -fno-exceptions' elif "darwin" in sys.platform: EXTRA_ENV_COMPILE_ARGS += ' -fvisibility=hidden -fno-wrapv -fno-exceptions' -EXTRA_ENV_COMPILE_ARGS += ' -DPB_FIELD_16BIT' +EXTRA_ENV_COMPILE_ARGS += ' -DPB_FIELD_16BIT' if EXTRA_ENV_LINK_ARGS is None: EXTRA_ENV_LINK_ARGS = '' @@ -181,7 +181,7 @@ LDFLAGS = tuple(EXTRA_LINK_ARGS) CFLAGS = tuple(EXTRA_COMPILE_ARGS) if "linux" in sys.platform or "darwin" in sys.platform: pymodinit_type = 'PyObject*' if PY3 else 'void' - pymodinit = '__attribute__((visibility ("default"))) {}'.format(pymodinit_type) + pymodinit = 'extern "C" __attribute__((visibility ("default"))) {}'.format(pymodinit_type) DEFINE_MACROS += (('PyMODINIT_FUNC', pymodinit),) DEFINE_MACROS += (('GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK', 1),) diff --git a/src/core/lib/iomgr/gevent_util.h b/src/core/lib/iomgr/gevent_util.h new file mode 100644 index 00000000000..de30543e3a3 --- /dev/null +++ b/src/core/lib/iomgr/gevent_util.h @@ -0,0 +1,46 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_GEVENT_UTIL_H +#define GRPC_CORE_LIB_IOMGR_GEVENT_UTIL_H + +#include + +#include +#include +#include "src/core/lib/iomgr/error.h" + +// These are only used by the gRPC Python extension for gevent +// support. They are easier to define here (rather than in Cython) +// because Cython doesn't handle #defines well. + +grpc_error* grpc_socket_error(char* error) { + return grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); +} + +char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i) { + return (char*)GRPC_SLICE_START_PTR(buffer->slices[i]); +} + +int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i) { + return GRPC_SLICE_LENGTH(buffer->slices[i]); +} + +#endif diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index c1dcc52618d..a397012003c 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -22,8 +22,10 @@ #define GRPC_CORE_LIB_IOMGR_PORT_H #ifdef GRPC_UV +#ifndef GRPC_CUSTOM_SOCKET #define GRPC_CUSTOM_SOCKET #endif +#endif #if defined(GRPC_CUSTOM_SOCKET) // Do Nothing #elif defined(GPR_MANYLINUX1) diff --git a/src/core/lib/iomgr/sockaddr_utils.cc b/src/core/lib/iomgr/sockaddr_utils.cc index df25f7778a6..1b66dceb130 100644 --- a/src/core/lib/iomgr/sockaddr_utils.cc +++ b/src/core/lib/iomgr/sockaddr_utils.cc @@ -204,6 +204,8 @@ void grpc_string_to_sockaddr(grpc_resolved_address* out, char* addr, int port) { if (grpc_inet_pton(GRPC_AF_INET6, addr, &addr6->sin6_addr) == 1) { addr6->sin6_family = GRPC_AF_INET6; + addr6->sin6_flowinfo = 0; + addr6->sin6_scope_id = 0; out->len = sizeof(grpc_sockaddr_in6); } else if (grpc_inet_pton(GRPC_AF_INET, addr, &addr4->sin_addr) == 1) { addr4->sin_family = GRPC_AF_INET; diff --git a/src/python/grpcio/grpc/_cython/.gitignore b/src/python/grpcio/grpc/_cython/.gitignore index 306e3ad277f..b9936e932ca 100644 --- a/src/python/grpcio/grpc/_cython/.gitignore +++ b/src/python/grpcio/grpc/_cython/.gitignore @@ -1,4 +1,4 @@ -cygrpc.c +cygrpc.cpp *.a *.so *.dll diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd new file mode 100644 index 00000000000..f35aebadde4 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd @@ -0,0 +1,154 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +from libc.stdint cimport uint32_t + +cdef extern from "grpc/impl/codegen/slice.h": + struct grpc_slice_buffer: + int count + +cdef extern from "src/core/lib/iomgr/error.h": + struct grpc_error: + pass + +cdef extern from "src/core/lib/iomgr/gevent_util.h": + grpc_error* grpc_socket_error(char* error) + char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i) + int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i) + +cdef extern from "src/core/lib/iomgr/sockaddr.h": + ctypedef struct grpc_sockaddr: + pass + +cdef extern from "src/core/lib/iomgr/resolve_address.h": + ctypedef struct grpc_resolved_addresses: + size_t naddrs + grpc_resolved_address* addrs + + ctypedef struct grpc_resolved_address: + char[128] addr + size_t len + +cdef extern from "src/core/lib/iomgr/resolve_address_custom.h": + struct grpc_custom_resolver: + pass + + struct grpc_custom_resolver_vtable: + grpc_error* (*resolve)(char* host, char* port, grpc_resolved_addresses** res); + void (*resolve_async)(grpc_custom_resolver* resolver, char* host, char* port); + + void grpc_custom_resolve_callback(grpc_custom_resolver* resolver, + grpc_resolved_addresses* result, + grpc_error* error); + +cdef extern from "src/core/lib/iomgr/tcp_custom.h": + struct grpc_custom_socket: + void* impl + # We don't care about the rest of the fields + ctypedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket, + grpc_error* error) + ctypedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket, + grpc_error* error) + ctypedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket, + size_t nread, grpc_error* error) + ctypedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket, + grpc_custom_socket* client, + grpc_error* error) + ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket) + + struct grpc_socket_vtable: + grpc_error* (*init)(grpc_custom_socket* socket, int domain); + void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr, + size_t len, grpc_custom_connect_callback cb); + void (*destroy)(grpc_custom_socket* socket); + void (*shutdown)(grpc_custom_socket* socket); + void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb); + void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices, + grpc_custom_write_callback cb); + void (*read)(grpc_custom_socket* socket, char* buffer, size_t length, + grpc_custom_read_callback cb); + grpc_error* (*getpeername)(grpc_custom_socket* socket, + const grpc_sockaddr* addr, int* len); + grpc_error* (*getsockname)(grpc_custom_socket* socket, + const grpc_sockaddr* addr, int* len); + grpc_error* (*setsockopt)(grpc_custom_socket* socket, int level, int optname, + const void* optval, uint32_t len); + grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr, + size_t len, int flags); + grpc_error* (*listen)(grpc_custom_socket* socket); + void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client, + grpc_custom_accept_callback cb); + +cdef extern from "src/core/lib/iomgr/timer_custom.h": + struct grpc_custom_timer: + void* timer + int timeout_ms + # We don't care about the rest of the fields + + struct grpc_custom_timer_vtable: + void (*start)(grpc_custom_timer* t); + void (*stop)(grpc_custom_timer* t); + + void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error); + +cdef extern from "src/core/lib/iomgr/pollset_custom.h": + struct grpc_custom_poller_vtable: + void (*init)() + void (*poll)(size_t timeout_ms) + void (*kick)() + void (*shutdown)() + +cdef extern from "src/core/lib/iomgr/iomgr_custom.h": + void grpc_custom_iomgr_init(grpc_socket_vtable* socket, + grpc_custom_resolver_vtable* resolver, + grpc_custom_timer_vtable* timer, + grpc_custom_poller_vtable* poller); + +cdef extern from "src/core/lib/iomgr/sockaddr_utils.h": + int grpc_sockaddr_get_port(const grpc_resolved_address *addr); + int grpc_sockaddr_to_string(char **out, const grpc_resolved_address *addr, + int normalize); + void grpc_string_to_sockaddr(grpc_resolved_address *out, char* addr, int port); + int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr, + int port) + const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* resolved_addr) + + +cdef class TimerWrapper: + + cdef grpc_custom_timer *c_timer + cdef object timer + cdef object event + +cdef class SocketWrapper: + cdef object sockopts + cdef object socket + cdef object closed + cdef grpc_custom_socket *c_socket + cdef char* c_buffer + cdef size_t len + cdef grpc_custom_socket *accepting_socket + + cdef grpc_custom_connect_callback connect_cb + cdef grpc_custom_write_callback write_cb + cdef grpc_custom_read_callback read_cb + cdef grpc_custom_accept_callback accept_cb + cdef grpc_custom_close_callback close_cb + + +cdef class ResolveWrapper: + cdef grpc_custom_resolver *c_resolver + cdef char* c_host + cdef char* c_port diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx new file mode 100644 index 00000000000..ba8c7318914 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx @@ -0,0 +1,454 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +cimport cpython +from libc cimport string +from libc.stdlib cimport malloc, free +import errno +gevent_g = None +gevent_socket = None +gevent_hub = None +gevent_event = None +g_event = None +g_pool = None + +cdef grpc_error* grpc_error_none(): + return 0 + +cdef grpc_error* socket_error(str syscall, str err): + error_str = "{} failed: {}".format(syscall, err) + error_bytes = str_to_bytes(error_str) + return grpc_socket_error(error_bytes) + +cdef resolved_addr_to_tuple(grpc_resolved_address* address): + cdef char* res_str + port = grpc_sockaddr_get_port(address) + str_len = grpc_sockaddr_to_string(&res_str, address, 0) + byte_str = _decode(res_str[:str_len]) + if byte_str.endswith(':' + str(port)): + byte_str = byte_str[:(0 - len(str(port)) - 1)] + byte_str = byte_str.lstrip('[') + byte_str = byte_str.rstrip(']') + byte_str = '{}'.format(byte_str) + return byte_str, port + +cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length): + cdef grpc_resolved_address c_addr + string.memcpy(c_addr.addr, address, length) + c_addr.len = length + return resolved_addr_to_tuple(&c_addr) + +cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length): + cdef grpc_resolved_address c_addr + string.memcpy(c_addr.addr, address, length) + c_addr.len = length + return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4' + +cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups): + cdef grpc_resolved_addresses* addresses + tups_set = set((tup[4][0], tup[4][1]) for tup in tups) + addresses = malloc(sizeof(grpc_resolved_addresses)) + addresses.naddrs = len(tups_set) + addresses.addrs = malloc(sizeof(grpc_resolved_address) * len(tups_set)) + i = 0 + for tup in set(tups_set): + hostname = str_to_bytes(tup[0]) + grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1]) + i += 1 + return addresses + +def _spawn_greenlet(*args): + greenlet = g_pool.spawn(*args) + +############################### +### socket implementation ### +############################### + +cdef class SocketWrapper: + def __cinit__(self): + self.sockopts = [] + self.socket = None + self.c_socket = NULL + self.c_buffer = NULL + self.len = 0 + +cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil: + sw = SocketWrapper() + sw.c_socket = socket + sw.sockopts = [] + cpython.Py_INCREF(sw) + # Python doesn't support AF_UNSPEC sockets, so we defer creation until + # bind/connect when we know what type of socket we need + sw.socket = None + sw.closed = False + sw.accepting_socket = NULL + socket.impl = sw + return grpc_error_none() + +cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple): + try: + socket_wrapper.socket.connect(addr_tuple) + socket_wrapper.connect_cb(socket_wrapper.c_socket, + grpc_error_none()) + except IOError as io_error: + socket_wrapper.connect_cb(socket_wrapper.c_socket, + socket_error("connect", str(io_error))) + g_event.set() + +def socket_connect_async(socket_wrapper, addr_tuple): + socket_connect_async_cython(socket_wrapper, addr_tuple) + +cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr, + size_t addr_len, + grpc_custom_connect_callback cb) with gil: + py_socket = None + socket_wrapper = socket.impl + socket_wrapper.connect_cb = cb + addr_tuple = sockaddr_to_tuple(addr, addr_len) + if sockaddr_is_ipv4(addr, addr_len): + py_socket = gevent_socket.socket(gevent_socket.AF_INET) + else: + py_socket = gevent_socket.socket(gevent_socket.AF_INET6) + applysockopts(py_socket) + socket_wrapper.socket = py_socket + _spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple) + +cdef void socket_destroy(grpc_custom_socket* socket) with gil: + cpython.Py_DECREF(socket.impl) + +cdef void socket_shutdown(grpc_custom_socket* socket) with gil: + try: + (socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR) + except IOError as io_error: + if io_error.errno != errno.ENOTCONN: + raise io_error + +cdef void socket_close(grpc_custom_socket* socket, + grpc_custom_close_callback cb) with gil: + socket_wrapper = (socket.impl) + if socket_wrapper.socket is not None: + socket_wrapper.socket.close() + socket_wrapper.closed = True + socket_wrapper.close_cb = cb + # Delay the close callback until the accept() call has picked it up + if socket_wrapper.accepting_socket != NULL: + return + socket_wrapper.close_cb(socket) + +def socket_sendmsg(socket, write_bytes): + try: + return socket.sendmsg(write_bytes) + except AttributeError: + # sendmsg not available on all Pythons/Platforms + return socket.send(b''.join(write_bytes)) + +cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes): + try: + while write_bytes: + sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes) + while sent_byte_count > 0: + if sent_byte_count < len(write_bytes[0]): + write_bytes[0] = write_bytes[0][sent_byte_count:] + sent_byte_count = 0 + else: + sent_byte_count -= len(write_bytes[0]) + write_bytes = write_bytes[1:] + socket_wrapper.write_cb(socket_wrapper.c_socket, + grpc_error_none()) + except IOError as io_error: + socket_wrapper.write_cb(socket_wrapper.c_socket, + socket_error("send", str(io_error))) + g_event.set() + +def socket_write_async(socket_wrapper, write_bytes): + socket_write_async_cython(socket_wrapper, write_bytes) + +cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer, + grpc_custom_write_callback cb) with gil: + cdef char* start + sw = socket.impl + sw.write_cb = cb + write_bytes = [] + for i in range(buffer.count): + start = grpc_slice_buffer_start(buffer, i) + length = grpc_slice_buffer_length(buffer, i) + write_bytes.append(start[:length]) + _spawn_greenlet(socket_write_async, socket.impl, write_bytes) + +cdef socket_read_async_cython(SocketWrapper socket_wrapper): + cdef char* buff_char_arr + try: + buff_str = socket_wrapper.socket.recv(socket_wrapper.len) + buff_char_arr = buff_str + string.memcpy(socket_wrapper.c_buffer, buff_char_arr, len(buff_str)) + socket_wrapper.read_cb(socket_wrapper.c_socket, + len(buff_str), grpc_error_none()) + except IOError as io_error: + socket_wrapper.read_cb(socket_wrapper.c_socket, + -1, socket_error("recv", str(io_error))) + g_event.set() + +def socket_read_async(socket_wrapper): + socket_read_async_cython(socket_wrapper) + +cdef void socket_read(grpc_custom_socket* socket, char* buffer, + size_t length, grpc_custom_read_callback cb) with gil: + sw = socket.impl + sw.read_cb = cb + sw.c_buffer = buffer + sw.len = length + _spawn_greenlet(socket_read_async, sw) + +cdef grpc_error* socket_getpeername(grpc_custom_socket* socket, + const grpc_sockaddr* addr, + int* length) with gil: + cdef char* src_buf + peer = (socket.impl).socket.getpeername() + + cdef grpc_resolved_address c_addr + hostname = str_to_bytes(peer[0]) + grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + string.memcpy(addr, c_addr.addr, c_addr.len) + length[0] = c_addr.len + return grpc_error_none() + +cdef grpc_error* socket_getsockname(grpc_custom_socket* socket, + const grpc_sockaddr* addr, + int* length) with gil: + cdef char* src_buf + cdef grpc_resolved_address c_addr + if (socket.impl).socket is None: + peer = ('0.0.0.0', 0) + else: + peer = (socket.impl).socket.getsockname() + hostname = str_to_bytes(peer[0]) + grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + string.memcpy(addr, c_addr.addr, c_addr.len) + length[0] = c_addr.len + return grpc_error_none() + +cdef grpc_error* socket_setsockopt(grpc_custom_socket* socket, int level, int optname, + const void *optval, uint32_t optlen) with gil: + # No-op; we provide a default set of options + return grpc_error_none() + +def applysockopts(s): + s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1) + s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True) + +cdef grpc_error* socket_bind(grpc_custom_socket* socket, + const grpc_sockaddr* addr, + size_t len, int flags) with gil: + addr_tuple = sockaddr_to_tuple(addr, len) + try: + try: + py_socket = gevent_socket.socket(gevent_socket.AF_INET) + applysockopts(py_socket) + py_socket.bind(addr_tuple) + except gevent_socket.gaierror as e: + py_socket = gevent_socket.socket(gevent_socket.AF_INET6) + applysockopts(py_socket) + py_socket.bind(addr_tuple) + (socket.impl).socket = py_socket + except IOError as io_error: + return socket_error("bind", str(io_error)) + else: + return grpc_error_none() + +cdef grpc_error* socket_listen(grpc_custom_socket* socket) with gil: + (socket.impl).socket.listen(50) + return grpc_error_none() + +cdef void accept_callback_cython(SocketWrapper s): + try: + conn, address = s.socket.accept() + sw = SocketWrapper() + sw.closed = False + sw.c_socket = s.accepting_socket + sw.sockopts = [] + sw.socket = conn + sw.c_socket.impl = sw + sw.accepting_socket = NULL + cpython.Py_INCREF(sw) + s.accepting_socket = NULL + s.accept_cb(s.c_socket, sw.c_socket, grpc_error_none()) + except IOError as io_error: + #TODO actual error + s.accepting_socket = NULL + s.accept_cb(s.c_socket, s.accepting_socket, + socket_error("accept", str(io_error))) + if s.closed: + s.close_cb(s.c_socket) + g_event.set() + +def socket_accept_async(s): + accept_callback_cython(s) + +cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client, + grpc_custom_accept_callback cb) with gil: + sw = socket.impl + sw.accepting_socket = client + sw.accept_cb = cb + _spawn_greenlet(socket_accept_async, sw) + +##################################### +######Resolver implementation ####### +##################################### + +cdef class ResolveWrapper: + def __cinit__(self): + self.c_resolver = NULL + self.c_host = NULL + self.c_port = NULL + +cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper): + try: + res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port) + grpc_custom_resolve_callback(resolve_wrapper.c_resolver, + tuples_to_resolvaddr(res), grpc_error_none()) + except IOError as io_error: + grpc_custom_resolve_callback(resolve_wrapper.c_resolver, + 0, + socket_error("getaddrinfo", str(io_error))) + g_event.set() + +def socket_resolve_async_python(resolve_wrapper): + socket_resolve_async_cython(resolve_wrapper) + +cdef void socket_resolve_async(grpc_custom_resolver* r, char* host, char* port) with gil: + rw = ResolveWrapper() + rw.c_resolver = r + rw.c_host = host + rw.c_port = port + _spawn_greenlet(socket_resolve_async_python, rw) + +cdef grpc_error* socket_resolve(char* host, char* port, + grpc_resolved_addresses** res) with gil: + try: + result = gevent_socket.getaddrinfo(host, port) + res[0] = tuples_to_resolvaddr(result) + return grpc_error_none() + except IOError as io_error: + return socket_error("getaddrinfo", str(io_error)) + +############################### +### timer implementation ###### +############################### + +cdef class TimerWrapper: + def __cinit__(self, deadline): + self.timer = gevent_hub.get_hub().loop.timer(deadline) + self.event = None + + def start(self): + self.event = gevent_event.Event() + self.timer.start(self.on_finish) + + def on_finish(self): + grpc_custom_timer_callback(self.c_timer, grpc_error_none()) + self.timer.stop() + g_event.set() + + def stop(self): + self.event.set() + self.timer.stop() + +cdef void timer_start(grpc_custom_timer* t) with gil: + timer = TimerWrapper(t.timeout_ms / 1000.0) + timer.c_timer = t + t.timer = timer + timer.start() + +cdef void timer_stop(grpc_custom_timer* t) with gil: + time_wrapper = t.timer + time_wrapper.stop() + +############################### +### pollset implementation ### +############################### + +cdef void init_loop() with gil: + pass + +cdef void destroy_loop() with gil: + g_pool.join() + +cdef void kick_loop() with gil: + g_event.set() + +cdef void run_loop(size_t timeout_ms) with gil: + timeout = timeout_ms / 1000.0 + if timeout_ms > 0: + g_event.wait(timeout) + g_event.clear() + +############################### +### Initializer ############### +############################### + +cdef grpc_socket_vtable gevent_socket_vtable +cdef grpc_custom_resolver_vtable gevent_resolver_vtable +cdef grpc_custom_timer_vtable gevent_timer_vtable +cdef grpc_custom_poller_vtable gevent_pollset_vtable + +def init_grpc_gevent(): + # Lazily import gevent + global gevent_socket + global gevent_g + global gevent_hub + global gevent_event + global g_event + global g_pool + import gevent + gevent_g = gevent + import gevent.socket + gevent_socket = gevent.socket + import gevent.hub + gevent_hub = gevent.hub + import gevent.event + gevent_event = gevent.event + import gevent.pool + + g_event = gevent.event.Event() + g_pool = gevent.pool.Group() + gevent_resolver_vtable.resolve = socket_resolve + gevent_resolver_vtable.resolve_async = socket_resolve_async + + gevent_socket_vtable.init = socket_init + gevent_socket_vtable.connect = socket_connect + gevent_socket_vtable.destroy = socket_destroy + gevent_socket_vtable.shutdown = socket_shutdown + gevent_socket_vtable.close = socket_close + gevent_socket_vtable.write = socket_write + gevent_socket_vtable.read = socket_read + gevent_socket_vtable.getpeername = socket_getpeername + gevent_socket_vtable.getsockname = socket_getsockname + gevent_socket_vtable.setsockopt = socket_setsockopt + gevent_socket_vtable.bind = socket_bind + gevent_socket_vtable.listen = socket_listen + gevent_socket_vtable.accept = socket_accept + + gevent_timer_vtable.start = timer_start + gevent_timer_vtable.stop = timer_stop + + gevent_pollset_vtable.init = init_loop + gevent_pollset_vtable.poll = run_loop + gevent_pollset_vtable.kick = kick_loop + gevent_pollset_vtable.shutdown = destroy_loop + + grpc_custom_iomgr_init(&gevent_socket_vtable, + &gevent_resolver_vtable, + &gevent_timer_vtable, + &gevent_pollset_vtable) diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index b6a794c6d7b..c8ace7c3cc8 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# distutils: language=c++ include "_cygrpc/grpc.pxi" @@ -27,3 +28,5 @@ include "_cygrpc/security.pxd.pxi" include "_cygrpc/server.pxd.pxi" include "_cygrpc/tag.pxd.pxi" include "_cygrpc/time.pxd.pxi" + +include "_cygrpc/grpc_gevent.pxd" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 2ee2e6b73e8..f5f08fc9838 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# distutils: language=c++ cimport cpython @@ -35,6 +36,8 @@ include "_cygrpc/server.pyx.pxi" include "_cygrpc/tag.pyx.pxi" include "_cygrpc/time.pyx.pxi" +include "_cygrpc/grpc_gevent.pyx" + # # initialize gRPC # diff --git a/src/python/grpcio/grpc/experimental/__init__.py b/src/python/grpcio/grpc/experimental/__init__.py new file mode 100644 index 00000000000..dcec322b69b --- /dev/null +++ b/src/python/grpcio/grpc/experimental/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""gRPC's experimental APIs. + +These APIs are subject to be removed during any minor version release. +""" diff --git a/src/python/grpcio/grpc/experimental/gevent.py b/src/python/grpcio/grpc/experimental/gevent.py new file mode 100644 index 00000000000..159d612b4ed --- /dev/null +++ b/src/python/grpcio/grpc/experimental/gevent.py @@ -0,0 +1,27 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""gRPC's Python gEvent APIs.""" + +from grpc._cython import cygrpc as _cygrpc + + +def init_gevent(): + """Patches gRPC's libraries to be compatible with gevent. + + This must be called AFTER the python standard lib has been patched, + but BEFORE creating and gRPC objects. + + In order for progress to be made, the application must drive the event loop. + """ + _cygrpc.init_grpc_gevent() diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 93f84572b76..d4dbcdc191c 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -108,6 +108,55 @@ class TestLite(setuptools.Command): self.distribution.fetch_build_eggs(self.distribution.tests_require) +class TestGevent(setuptools.Command): + """Command to run tests w/gevent.""" + + BANNED_TESTS = ( + # These tests send a lot of RPCs and are really slow on gevent. They will + # eventually succeed, but need to dig into performance issues. + 'unit._cython._no_messages_server_completion_queue_per_call_test.Test.test_rpcs', + 'unit._cython._no_messages_single_server_completion_queue_test.Test.test_rpcs', + # I have no idea why this doesn't work in gevent, but it shouldn't even be + # using the c-core + 'testing._client_test.ClientTest.test_infinite_request_stream_real_time', + # TODO(https://github.com/grpc/grpc/issues/14789) enable this test + 'unit._server_ssl_cert_config_test', + # Beta API is unsupported for gevent + 'protoc_plugin.beta_python_plugin_test', + 'unit.beta._beta_features_test', + ) + description = 'run tests with gevent. Assumes grpc/gevent are installed' + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + # distutils requires this override. + pass + + def run(self): + from gevent import monkey + monkey.patch_all() + + import tests + + import grpc.experimental.gevent + grpc.experimental.gevent.init_gevent() + + import gevent + + import tests + loader = tests.Loader() + loader.loadTestsFromNames(['tests']) + runner = tests.Runner() + runner.skip_tests(self.BANNED_TESTS) + result = gevent.spawn(runner.run, loader.suite) + result.join() + if not result.value.wasSuccessful(): + sys.exit('Test failure') + + class RunInterop(test.test): description = 'run interop test client/server' diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index 250df658034..4e0f6726fa5 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -50,7 +50,8 @@ COMMAND_CLASS = { 'build_package_protos': grpc_tools.command.BuildPackageProtos, 'build_py': commands.BuildPy, 'run_interop': commands.RunInterop, - 'test_lite': commands.TestLite + 'test_lite': commands.TestLite, + 'test_gevent': commands.TestGevent, } PACKAGE_DATA = { diff --git a/src/python/grpcio_tests/tests/_runner.py b/src/python/grpcio_tests/tests/_runner.py index 8e27dc6c6df..eaaa027e61f 100644 --- a/src/python/grpcio_tests/tests/_runner.py +++ b/src/python/grpcio_tests/tests/_runner.py @@ -117,6 +117,12 @@ class AugmentedCase(collections.namedtuple('AugmentedCase', ['case', 'id'])): class Runner(object): + def __init__(self): + self._skipped_tests = [] + + def skip_tests(self, tests): + self._skipped_tests = tests + def run(self, suite): """See setuptools' test_runner setup argument for information.""" # only run test cases with id starting with given prefix @@ -181,27 +187,31 @@ class Runner(object): # Run the tests result.startTestRun() for augmented_case in augmented_cases: - sys.stdout.write('Running {}\n'.format( - augmented_case.case.id())) - sys.stdout.flush() - case_thread = threading.Thread( - target=augmented_case.case.run, args=(result,)) - try: - with stdout_pipe, stderr_pipe: - case_thread.start() - while case_thread.is_alive(): - check_kill_self() - time.sleep(0) - case_thread.join() - except: - # re-raise the exception after forcing the with-block to end - raise - result.set_output(augmented_case.case, stdout_pipe.output(), - stderr_pipe.output()) - sys.stdout.write(result_out.getvalue()) - sys.stdout.flush() - result_out.truncate(0) - check_kill_self() + for skipped_test in self._skipped_tests: + if skipped_test in augmented_case.case.id(): + break + else: + sys.stdout.write('Running {}\n'.format( + augmented_case.case.id())) + sys.stdout.flush() + case_thread = threading.Thread( + target=augmented_case.case.run, args=(result,)) + try: + with stdout_pipe, stderr_pipe: + case_thread.start() + while case_thread.is_alive(): + check_kill_self() + time.sleep(0) + case_thread.join() + except: + # re-raise the exception after forcing the with-block to end + raise + result.set_output(augmented_case.case, stdout_pipe.output(), + stderr_pipe.output()) + sys.stdout.write(result_out.getvalue()) + sys.stdout.flush() + result_out.truncate(0) + check_kill_self() result.stopTestRun() stdout_pipe.close() stderr_pipe.close() diff --git a/tools/run_tests/helper_scripts/build_python.sh b/tools/run_tests/helper_scripts/build_python.sh index b809fe012f5..dac29b91dda 100755 --- a/tools/run_tests/helper_scripts/build_python.sh +++ b/tools/run_tests/helper_scripts/build_python.sh @@ -152,6 +152,12 @@ pip_install_dir() { cd "$PWD" } +case "$VENV" in + *gevent*) + $VENV_PYTHON -m pip install gevent + ;; +esac + $VENV_PYTHON -m pip install --upgrade pip==9.0.1 $VENV_PYTHON -m pip install setuptools $VENV_PYTHON -m pip install cython diff --git a/tools/run_tests/helper_scripts/run_python.sh b/tools/run_tests/helper_scripts/run_python.sh index bcfe3a6577f..2b7321e527a 100755 --- a/tools/run_tests/helper_scripts/run_python.sh +++ b/tools/run_tests/helper_scripts/run_python.sh @@ -22,7 +22,7 @@ PYTHON=$(realpath "${1:-py27/bin/python}") ROOT=$(pwd) -$PYTHON "$ROOT/src/python/grpcio_tests/setup.py" test_lite +$PYTHON "$ROOT/src/python/grpcio_tests/setup.py" "$2" mkdir -p "$ROOT/reports" rm -rf "$ROOT/reports/python-coverage" diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py index ba4ab3bd362..970b0d79022 100755 --- a/tools/run_tests/run_interop_tests.py +++ b/tools/run_tests/run_interop_tests.py @@ -545,13 +545,13 @@ class PythonLanguage: def client_cmd(self, args): return [ - 'py27/bin/python', 'src/python/grpcio_tests/setup.py', + 'py27_native/bin/python', 'src/python/grpcio_tests/setup.py', 'run_interop', '--client', '--args="{}"'.format(' '.join(args)) ] def client_cmd_http2interop(self, args): return [ - 'py27/bin/python', + 'py27_native/bin/python', 'src/python/grpcio_tests/tests/http2/negative_http2_client.py', ] + args @@ -560,7 +560,7 @@ class PythonLanguage: def server_cmd(self, args): return [ - 'py27/bin/python', 'src/python/grpcio_tests/setup.py', + 'py27_native/bin/python', 'src/python/grpcio_tests/setup.py', 'run_interop', '--server', '--args="{}"'.format(' '.join(args)) ] diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 85c7f5c4dc0..4146eec42df 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -204,19 +204,28 @@ def _is_use_docker_child(): _PythonConfigVars = collections.namedtuple('_ConfigVars', [ - 'shell', 'builder', 'builder_prefix_arguments', 'venv_relative_python', - 'toolchain', 'runner' + 'shell', + 'builder', + 'builder_prefix_arguments', + 'venv_relative_python', + 'toolchain', + 'runner', + 'test_name', + 'iomgr_platform', ]) def _python_config_generator(name, major, minor, bits, config_vars): + name += '_' + config_vars.iomgr_platform return PythonConfig( name, config_vars.shell + config_vars.builder + config_vars.builder_prefix_arguments + [ _python_pattern_function(major=major, minor=minor, bits=bits) ] + [name] + config_vars.venv_relative_python + config_vars.toolchain, - config_vars.shell + config_vars.runner + - [os.path.join(name, config_vars.venv_relative_python[0])]) + config_vars.shell + config_vars.runner + [ + os.path.join(name, config_vars.venv_relative_python[0]), + config_vars.test_name + ]) def _pypy_config_generator(name, major, config_vars): @@ -281,7 +290,7 @@ class CLanguage(object): self._docker_distro, self._make_options = self._compiler_options( self.args.use_docker, self.args.compiler) if args.iomgr_platform == "uv": - cflags = '-DGRPC_UV -DGRPC_CUSTOM_IOMGR_THREAD_CHECK ' + cflags = '-DGRPC_UV -DGRPC_CUSTOM_IOMGR_THREAD_CHECK -DGRPC_CUSTOM_SOCKET ' try: cflags += subprocess.check_output( ['pkg-config', '--cflags', 'libuv']).strip() + ' ' @@ -770,12 +779,16 @@ class PythonLanguage(object): venv_relative_python = ['bin/python'] toolchain = ['unix'] + test_command = 'test_lite' + if args.iomgr_platform == 'gevent': + test_command = 'test_gevent' runner = [ os.path.abspath('tools/run_tests/helper_scripts/run_python.sh') ] - config_vars = _PythonConfigVars(shell, builder, - builder_prefix_arguments, - venv_relative_python, toolchain, runner) + + config_vars = _PythonConfigVars( + shell, builder, builder_prefix_arguments, venv_relative_python, + toolchain, runner, test_command, args.iomgr_platform) python27_config = _python_config_generator( name='py27', major='2', @@ -1341,7 +1354,7 @@ argp.add_argument( ) argp.add_argument( '--iomgr_platform', - choices=['native', 'uv'], + choices=['native', 'uv', 'gevent'], default='native', help='Selects iomgr platform to build on') argp.add_argument( diff --git a/tools/run_tests/run_tests_matrix.py b/tools/run_tests/run_tests_matrix.py index 1c99e794a93..e43319beba6 100755 --- a/tools/run_tests/run_tests_matrix.py +++ b/tools/run_tests/run_tests_matrix.py @@ -114,7 +114,7 @@ def _workspace_jobspec(name, def _generate_jobs(languages, configs, platforms, - iomgr_platform='native', + iomgr_platforms=['native'], arch=None, compiler=None, labels=[], @@ -125,40 +125,43 @@ def _generate_jobs(languages, result = [] for language in languages: for platform in platforms: - for config in configs: - name = '%s_%s_%s_%s' % (language, platform, config, - iomgr_platform) - runtests_args = [ - '-l', language, '-c', config, '--iomgr_platform', - iomgr_platform - ] - if arch or compiler: - name += '_%s_%s' % (arch, compiler) - runtests_args += ['--arch', arch, '--compiler', compiler] - if '--build_only' in extra_args: - name += '_buildonly' - for extra_env in extra_envs: - name += '_%s_%s' % (extra_env, extra_envs[extra_env]) - - runtests_args += extra_args - if platform == 'linux': - job = _docker_jobspec( - name=name, - runtests_args=runtests_args, - runtests_envs=extra_envs, - inner_jobs=inner_jobs, - timeout_seconds=timeout_seconds) - else: - job = _workspace_jobspec( - name=name, - runtests_args=runtests_args, - runtests_envs=extra_envs, - inner_jobs=inner_jobs, - timeout_seconds=timeout_seconds) - - job.labels = [platform, config, language, iomgr_platform - ] + labels - result.append(job) + for iomgr_platform in iomgr_platforms: + for config in configs: + name = '%s_%s_%s_%s' % (language, platform, config, + iomgr_platform) + runtests_args = [ + '-l', language, '-c', config, '--iomgr_platform', + iomgr_platform + ] + if arch or compiler: + name += '_%s_%s' % (arch, compiler) + runtests_args += [ + '--arch', arch, '--compiler', compiler + ] + if '--build_only' in extra_args: + name += '_buildonly' + for extra_env in extra_envs: + name += '_%s_%s' % (extra_env, extra_envs[extra_env]) + + runtests_args += extra_args + if platform == 'linux': + job = _docker_jobspec( + name=name, + runtests_args=runtests_args, + runtests_envs=extra_envs, + inner_jobs=inner_jobs, + timeout_seconds=timeout_seconds) + else: + job = _workspace_jobspec( + name=name, + runtests_args=runtests_args, + runtests_envs=extra_envs, + inner_jobs=inner_jobs, + timeout_seconds=timeout_seconds) + + job.labels = [platform, config, language, iomgr_platform + ] + labels + result.append(job) return result @@ -184,13 +187,22 @@ def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS): timeout_seconds=_CPP_RUNTESTS_TIMEOUT) test_jobs += _generate_jobs( - languages=['csharp', 'python'], + languages=['csharp'], configs=['dbg', 'opt'], platforms=['linux', 'macos', 'windows'], labels=['basictests', 'multilang'], extra_args=extra_args, inner_jobs=inner_jobs) + test_jobs += _generate_jobs( + languages=['python'], + configs=['opt'], + platforms=['linux', 'macos', 'windows'], + iomgr_platforms=['native', 'gevent'], + labels=['basictests', 'multilang'], + extra_args=extra_args, + inner_jobs=inner_jobs) + # supported on linux and mac. test_jobs += _generate_jobs( languages=['c++'], @@ -377,7 +389,7 @@ def _create_portability_test_jobs(extra_args=[], languages=['c'], configs=['dbg'], platforms=['linux'], - iomgr_platform='uv', + iomgr_platforms=['uv'], labels=['portability', 'corelang'], extra_args=extra_args, inner_jobs=inner_jobs,