Initial gevent support

Because some cpp code ends up leaking into cython, we change
the cython generator to generate cpp code.
pull/14561/head
kpayson64 7 years ago
parent 0bb2fe946e
commit 1bfff8eec0
  1. 6
      .gitignore
  2. 10
      setup.py
  3. 46
      src/core/lib/iomgr/gevent_util.h
  4. 2
      src/core/lib/iomgr/port.h
  5. 2
      src/core/lib/iomgr/sockaddr_utils.cc
  6. 2
      src/python/grpcio/grpc/_cython/.gitignore
  7. 154
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd
  8. 454
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx
  9. 3
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  10. 3
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  11. 17
      src/python/grpcio/grpc/experimental/__init__.py
  12. 27
      src/python/grpcio/grpc/experimental/gevent.py
  13. 49
      src/python/grpcio_tests/commands.py
  14. 3
      src/python/grpcio_tests/setup.py
  15. 52
      src/python/grpcio_tests/tests/_runner.py
  16. 6
      tools/run_tests/helper_scripts/build_python.sh
  17. 2
      tools/run_tests/helper_scripts/run_python.sh
  18. 6
      tools/run_tests/run_interop_tests.py
  19. 31
      tools/run_tests/run_tests.py
  20. 86
      tools/run_tests/run_tests_matrix.py

6
.gitignore vendored

@ -15,8 +15,10 @@ python_pylint_venv/
htmlcov/
dist/
*.egg
py27/
py3[0-9]*/
py27_gevent/
py27_native/
py3[0-9]_gevent/
py3[0-9]_native/
# Node installation output
node_modules

@ -103,9 +103,9 @@ ENABLE_DOCUMENTATION_BUILD = os.environ.get(
EXTRA_ENV_COMPILE_ARGS = os.environ.get('GRPC_PYTHON_CFLAGS', None)
EXTRA_ENV_LINK_ARGS = os.environ.get('GRPC_PYTHON_LDFLAGS', None)
if EXTRA_ENV_COMPILE_ARGS is None:
EXTRA_ENV_COMPILE_ARGS = ''
EXTRA_ENV_COMPILE_ARGS = ' -std=c++11'
if 'win32' in sys.platform and sys.version_info < (3, 5):
EXTRA_ENV_COMPILE_ARGS += ' -std=c++11'
EXTRA_ENV_COMPILE_ARGS += ' -D_hypot=hypot'
# We use define flags here and don't directly add to DEFINE_MACROS below to
# ensure that the expert user/builder has a way of turning it off (via the
# envvars) without adding yet more GRPC-specific envvars.
@ -115,10 +115,10 @@ if EXTRA_ENV_COMPILE_ARGS is None:
else:
EXTRA_ENV_COMPILE_ARGS += ' -D_ftime=_ftime64 -D_timeb=__timeb64'
elif "linux" in sys.platform:
EXTRA_ENV_COMPILE_ARGS += ' -std=c++11 -std=gnu99 -fvisibility=hidden -fno-wrapv -fno-exceptions'
EXTRA_ENV_COMPILE_ARGS += ' -std=gnu99 -fvisibility=hidden -fno-wrapv -fno-exceptions'
elif "darwin" in sys.platform:
EXTRA_ENV_COMPILE_ARGS += ' -fvisibility=hidden -fno-wrapv -fno-exceptions'
EXTRA_ENV_COMPILE_ARGS += ' -DPB_FIELD_16BIT'
EXTRA_ENV_COMPILE_ARGS += ' -DPB_FIELD_16BIT'
if EXTRA_ENV_LINK_ARGS is None:
EXTRA_ENV_LINK_ARGS = ''
@ -181,7 +181,7 @@ LDFLAGS = tuple(EXTRA_LINK_ARGS)
CFLAGS = tuple(EXTRA_COMPILE_ARGS)
if "linux" in sys.platform or "darwin" in sys.platform:
pymodinit_type = 'PyObject*' if PY3 else 'void'
pymodinit = '__attribute__((visibility ("default"))) {}'.format(pymodinit_type)
pymodinit = 'extern "C" __attribute__((visibility ("default"))) {}'.format(pymodinit_type)
DEFINE_MACROS += (('PyMODINIT_FUNC', pymodinit),)
DEFINE_MACROS += (('GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK', 1),)

@ -0,0 +1,46 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_GEVENT_UTIL_H
#define GRPC_CORE_LIB_IOMGR_GEVENT_UTIL_H
#include <grpc/support/port_platform.h>
#include <grpc/impl/codegen/slice.h>
#include <grpc/status.h>
#include "src/core/lib/iomgr/error.h"
// These are only used by the gRPC Python extension for gevent
// support. They are easier to define here (rather than in Cython)
// because Cython doesn't handle #defines well.
grpc_error* grpc_socket_error(char* error) {
return grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
}
char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i) {
return (char*)GRPC_SLICE_START_PTR(buffer->slices[i]);
}
int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i) {
return GRPC_SLICE_LENGTH(buffer->slices[i]);
}
#endif

@ -22,8 +22,10 @@
#define GRPC_CORE_LIB_IOMGR_PORT_H
#ifdef GRPC_UV
#ifndef GRPC_CUSTOM_SOCKET
#define GRPC_CUSTOM_SOCKET
#endif
#endif
#if defined(GRPC_CUSTOM_SOCKET)
// Do Nothing
#elif defined(GPR_MANYLINUX1)

@ -204,6 +204,8 @@ void grpc_string_to_sockaddr(grpc_resolved_address* out, char* addr, int port) {
if (grpc_inet_pton(GRPC_AF_INET6, addr, &addr6->sin6_addr) == 1) {
addr6->sin6_family = GRPC_AF_INET6;
addr6->sin6_flowinfo = 0;
addr6->sin6_scope_id = 0;
out->len = sizeof(grpc_sockaddr_in6);
} else if (grpc_inet_pton(GRPC_AF_INET, addr, &addr4->sin_addr) == 1) {
addr4->sin_family = GRPC_AF_INET;

@ -1,4 +1,4 @@
cygrpc.c
cygrpc.cpp
*.a
*.so
*.dll

@ -0,0 +1,154 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++
from libc.stdint cimport uint32_t
cdef extern from "grpc/impl/codegen/slice.h":
struct grpc_slice_buffer:
int count
cdef extern from "src/core/lib/iomgr/error.h":
struct grpc_error:
pass
cdef extern from "src/core/lib/iomgr/gevent_util.h":
grpc_error* grpc_socket_error(char* error)
char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i)
int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i)
cdef extern from "src/core/lib/iomgr/sockaddr.h":
ctypedef struct grpc_sockaddr:
pass
cdef extern from "src/core/lib/iomgr/resolve_address.h":
ctypedef struct grpc_resolved_addresses:
size_t naddrs
grpc_resolved_address* addrs
ctypedef struct grpc_resolved_address:
char[128] addr
size_t len
cdef extern from "src/core/lib/iomgr/resolve_address_custom.h":
struct grpc_custom_resolver:
pass
struct grpc_custom_resolver_vtable:
grpc_error* (*resolve)(char* host, char* port, grpc_resolved_addresses** res);
void (*resolve_async)(grpc_custom_resolver* resolver, char* host, char* port);
void grpc_custom_resolve_callback(grpc_custom_resolver* resolver,
grpc_resolved_addresses* result,
grpc_error* error);
cdef extern from "src/core/lib/iomgr/tcp_custom.h":
struct grpc_custom_socket:
void* impl
# We don't care about the rest of the fields
ctypedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket,
grpc_error* error)
ctypedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket,
grpc_error* error)
ctypedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket,
size_t nread, grpc_error* error)
ctypedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket,
grpc_custom_socket* client,
grpc_error* error)
ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket)
struct grpc_socket_vtable:
grpc_error* (*init)(grpc_custom_socket* socket, int domain);
void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t len, grpc_custom_connect_callback cb);
void (*destroy)(grpc_custom_socket* socket);
void (*shutdown)(grpc_custom_socket* socket);
void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb);
void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices,
grpc_custom_write_callback cb);
void (*read)(grpc_custom_socket* socket, char* buffer, size_t length,
grpc_custom_read_callback cb);
grpc_error* (*getpeername)(grpc_custom_socket* socket,
const grpc_sockaddr* addr, int* len);
grpc_error* (*getsockname)(grpc_custom_socket* socket,
const grpc_sockaddr* addr, int* len);
grpc_error* (*setsockopt)(grpc_custom_socket* socket, int level, int optname,
const void* optval, uint32_t len);
grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t len, int flags);
grpc_error* (*listen)(grpc_custom_socket* socket);
void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client,
grpc_custom_accept_callback cb);
cdef extern from "src/core/lib/iomgr/timer_custom.h":
struct grpc_custom_timer:
void* timer
int timeout_ms
# We don't care about the rest of the fields
struct grpc_custom_timer_vtable:
void (*start)(grpc_custom_timer* t);
void (*stop)(grpc_custom_timer* t);
void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error);
cdef extern from "src/core/lib/iomgr/pollset_custom.h":
struct grpc_custom_poller_vtable:
void (*init)()
void (*poll)(size_t timeout_ms)
void (*kick)()
void (*shutdown)()
cdef extern from "src/core/lib/iomgr/iomgr_custom.h":
void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
grpc_custom_resolver_vtable* resolver,
grpc_custom_timer_vtable* timer,
grpc_custom_poller_vtable* poller);
cdef extern from "src/core/lib/iomgr/sockaddr_utils.h":
int grpc_sockaddr_get_port(const grpc_resolved_address *addr);
int grpc_sockaddr_to_string(char **out, const grpc_resolved_address *addr,
int normalize);
void grpc_string_to_sockaddr(grpc_resolved_address *out, char* addr, int port);
int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr,
int port)
const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* resolved_addr)
cdef class TimerWrapper:
cdef grpc_custom_timer *c_timer
cdef object timer
cdef object event
cdef class SocketWrapper:
cdef object sockopts
cdef object socket
cdef object closed
cdef grpc_custom_socket *c_socket
cdef char* c_buffer
cdef size_t len
cdef grpc_custom_socket *accepting_socket
cdef grpc_custom_connect_callback connect_cb
cdef grpc_custom_write_callback write_cb
cdef grpc_custom_read_callback read_cb
cdef grpc_custom_accept_callback accept_cb
cdef grpc_custom_close_callback close_cb
cdef class ResolveWrapper:
cdef grpc_custom_resolver *c_resolver
cdef char* c_host
cdef char* c_port

@ -0,0 +1,454 @@
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++
cimport cpython
from libc cimport string
from libc.stdlib cimport malloc, free
import errno
gevent_g = None
gevent_socket = None
gevent_hub = None
gevent_event = None
g_event = None
g_pool = None
cdef grpc_error* grpc_error_none():
return <grpc_error*>0
cdef grpc_error* socket_error(str syscall, str err):
error_str = "{} failed: {}".format(syscall, err)
error_bytes = str_to_bytes(error_str)
return grpc_socket_error(error_bytes)
cdef resolved_addr_to_tuple(grpc_resolved_address* address):
cdef char* res_str
port = grpc_sockaddr_get_port(address)
str_len = grpc_sockaddr_to_string(&res_str, address, 0)
byte_str = _decode(<bytes>res_str[:str_len])
if byte_str.endswith(':' + str(port)):
byte_str = byte_str[:(0 - len(str(port)) - 1)]
byte_str = byte_str.lstrip('[')
byte_str = byte_str.rstrip(']')
byte_str = '{}'.format(byte_str)
return byte_str, port
cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length):
cdef grpc_resolved_address c_addr
string.memcpy(<void*>c_addr.addr, <void*> address, length)
c_addr.len = length
return resolved_addr_to_tuple(&c_addr)
cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length):
cdef grpc_resolved_address c_addr
string.memcpy(<void*>c_addr.addr, <void*> address, length)
c_addr.len = length
return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4'
cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups):
cdef grpc_resolved_addresses* addresses
tups_set = set((tup[4][0], tup[4][1]) for tup in tups)
addresses = <grpc_resolved_addresses*> malloc(sizeof(grpc_resolved_addresses))
addresses.naddrs = len(tups_set)
addresses.addrs = <grpc_resolved_address*> malloc(sizeof(grpc_resolved_address) * len(tups_set))
i = 0
for tup in set(tups_set):
hostname = str_to_bytes(tup[0])
grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1])
i += 1
return addresses
def _spawn_greenlet(*args):
greenlet = g_pool.spawn(*args)
###############################
### socket implementation ###
###############################
cdef class SocketWrapper:
def __cinit__(self):
self.sockopts = []
self.socket = None
self.c_socket = NULL
self.c_buffer = NULL
self.len = 0
cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil:
sw = SocketWrapper()
sw.c_socket = socket
sw.sockopts = []
cpython.Py_INCREF(sw)
# Python doesn't support AF_UNSPEC sockets, so we defer creation until
# bind/connect when we know what type of socket we need
sw.socket = None
sw.closed = False
sw.accepting_socket = NULL
socket.impl = <void*>sw
return grpc_error_none()
cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple):
try:
socket_wrapper.socket.connect(addr_tuple)
socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
grpc_error_none())
except IOError as io_error:
socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
socket_error("connect", str(io_error)))
g_event.set()
def socket_connect_async(socket_wrapper, addr_tuple):
socket_connect_async_cython(socket_wrapper, addr_tuple)
cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t addr_len,
grpc_custom_connect_callback cb) with gil:
py_socket = None
socket_wrapper = <SocketWrapper>socket.impl
socket_wrapper.connect_cb = cb
addr_tuple = sockaddr_to_tuple(addr, addr_len)
if sockaddr_is_ipv4(addr, addr_len):
py_socket = gevent_socket.socket(gevent_socket.AF_INET)
else:
py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
applysockopts(py_socket)
socket_wrapper.socket = py_socket
_spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple)
cdef void socket_destroy(grpc_custom_socket* socket) with gil:
cpython.Py_DECREF(<SocketWrapper>socket.impl)
cdef void socket_shutdown(grpc_custom_socket* socket) with gil:
try:
(<SocketWrapper>socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR)
except IOError as io_error:
if io_error.errno != errno.ENOTCONN:
raise io_error
cdef void socket_close(grpc_custom_socket* socket,
grpc_custom_close_callback cb) with gil:
socket_wrapper = (<SocketWrapper>socket.impl)
if socket_wrapper.socket is not None:
socket_wrapper.socket.close()
socket_wrapper.closed = True
socket_wrapper.close_cb = cb
# Delay the close callback until the accept() call has picked it up
if socket_wrapper.accepting_socket != NULL:
return
socket_wrapper.close_cb(socket)
def socket_sendmsg(socket, write_bytes):
try:
return socket.sendmsg(write_bytes)
except AttributeError:
# sendmsg not available on all Pythons/Platforms
return socket.send(b''.join(write_bytes))
cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes):
try:
while write_bytes:
sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes)
while sent_byte_count > 0:
if sent_byte_count < len(write_bytes[0]):
write_bytes[0] = write_bytes[0][sent_byte_count:]
sent_byte_count = 0
else:
sent_byte_count -= len(write_bytes[0])
write_bytes = write_bytes[1:]
socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
grpc_error_none())
except IOError as io_error:
socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
socket_error("send", str(io_error)))
g_event.set()
def socket_write_async(socket_wrapper, write_bytes):
socket_write_async_cython(socket_wrapper, write_bytes)
cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer,
grpc_custom_write_callback cb) with gil:
cdef char* start
sw = <SocketWrapper>socket.impl
sw.write_cb = cb
write_bytes = []
for i in range(buffer.count):
start = grpc_slice_buffer_start(buffer, i)
length = grpc_slice_buffer_length(buffer, i)
write_bytes.append(<bytes>start[:length])
_spawn_greenlet(socket_write_async, <SocketWrapper>socket.impl, write_bytes)
cdef socket_read_async_cython(SocketWrapper socket_wrapper):
cdef char* buff_char_arr
try:
buff_str = socket_wrapper.socket.recv(socket_wrapper.len)
buff_char_arr = buff_str
string.memcpy(<void*>socket_wrapper.c_buffer, buff_char_arr, len(buff_str))
socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
len(buff_str), grpc_error_none())
except IOError as io_error:
socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
-1, socket_error("recv", str(io_error)))
g_event.set()
def socket_read_async(socket_wrapper):
socket_read_async_cython(socket_wrapper)
cdef void socket_read(grpc_custom_socket* socket, char* buffer,
size_t length, grpc_custom_read_callback cb) with gil:
sw = <SocketWrapper>socket.impl
sw.read_cb = cb
sw.c_buffer = buffer
sw.len = length
_spawn_greenlet(socket_read_async, sw)
cdef grpc_error* socket_getpeername(grpc_custom_socket* socket,
const grpc_sockaddr* addr,
int* length) with gil:
cdef char* src_buf
peer = (<SocketWrapper>socket.impl).socket.getpeername()
cdef grpc_resolved_address c_addr
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()
cdef grpc_error* socket_getsockname(grpc_custom_socket* socket,
const grpc_sockaddr* addr,
int* length) with gil:
cdef char* src_buf
cdef grpc_resolved_address c_addr
if (<SocketWrapper>socket.impl).socket is None:
peer = ('0.0.0.0', 0)
else:
peer = (<SocketWrapper>socket.impl).socket.getsockname()
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()
cdef grpc_error* socket_setsockopt(grpc_custom_socket* socket, int level, int optname,
const void *optval, uint32_t optlen) with gil:
# No-op; we provide a default set of options
return grpc_error_none()
def applysockopts(s):
s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1)
s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True)
cdef grpc_error* socket_bind(grpc_custom_socket* socket,
const grpc_sockaddr* addr,
size_t len, int flags) with gil:
addr_tuple = sockaddr_to_tuple(addr, len)
try:
try:
py_socket = gevent_socket.socket(gevent_socket.AF_INET)
applysockopts(py_socket)
py_socket.bind(addr_tuple)
except gevent_socket.gaierror as e:
py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
applysockopts(py_socket)
py_socket.bind(addr_tuple)
(<SocketWrapper>socket.impl).socket = py_socket
except IOError as io_error:
return socket_error("bind", str(io_error))
else:
return grpc_error_none()
cdef grpc_error* socket_listen(grpc_custom_socket* socket) with gil:
(<SocketWrapper>socket.impl).socket.listen(50)
return grpc_error_none()
cdef void accept_callback_cython(SocketWrapper s):
try:
conn, address = s.socket.accept()
sw = SocketWrapper()
sw.closed = False
sw.c_socket = s.accepting_socket
sw.sockopts = []
sw.socket = conn
sw.c_socket.impl = <void*>sw
sw.accepting_socket = NULL
cpython.Py_INCREF(sw)
s.accepting_socket = NULL
s.accept_cb(<grpc_custom_socket*>s.c_socket, sw.c_socket, grpc_error_none())
except IOError as io_error:
#TODO actual error
s.accepting_socket = NULL
s.accept_cb(<grpc_custom_socket*>s.c_socket, s.accepting_socket,
socket_error("accept", str(io_error)))
if s.closed:
s.close_cb(<grpc_custom_socket*>s.c_socket)
g_event.set()
def socket_accept_async(s):
accept_callback_cython(s)
cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client,
grpc_custom_accept_callback cb) with gil:
sw = <SocketWrapper>socket.impl
sw.accepting_socket = client
sw.accept_cb = cb
_spawn_greenlet(socket_accept_async, sw)
#####################################
######Resolver implementation #######
#####################################
cdef class ResolveWrapper:
def __cinit__(self):
self.c_resolver = NULL
self.c_host = NULL
self.c_port = NULL
cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper):
try:
res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port)
grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
tuples_to_resolvaddr(res), grpc_error_none())
except IOError as io_error:
grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
<grpc_resolved_addresses*>0,
socket_error("getaddrinfo", str(io_error)))
g_event.set()
def socket_resolve_async_python(resolve_wrapper):
socket_resolve_async_cython(resolve_wrapper)
cdef void socket_resolve_async(grpc_custom_resolver* r, char* host, char* port) with gil:
rw = ResolveWrapper()
rw.c_resolver = r
rw.c_host = host
rw.c_port = port
_spawn_greenlet(socket_resolve_async_python, rw)
cdef grpc_error* socket_resolve(char* host, char* port,
grpc_resolved_addresses** res) with gil:
try:
result = gevent_socket.getaddrinfo(host, port)
res[0] = tuples_to_resolvaddr(result)
return grpc_error_none()
except IOError as io_error:
return socket_error("getaddrinfo", str(io_error))
###############################
### timer implementation ######
###############################
cdef class TimerWrapper:
def __cinit__(self, deadline):
self.timer = gevent_hub.get_hub().loop.timer(deadline)
self.event = None
def start(self):
self.event = gevent_event.Event()
self.timer.start(self.on_finish)
def on_finish(self):
grpc_custom_timer_callback(self.c_timer, grpc_error_none())
self.timer.stop()
g_event.set()
def stop(self):
self.event.set()
self.timer.stop()
cdef void timer_start(grpc_custom_timer* t) with gil:
timer = TimerWrapper(t.timeout_ms / 1000.0)
timer.c_timer = t
t.timer = <void*>timer
timer.start()
cdef void timer_stop(grpc_custom_timer* t) with gil:
time_wrapper = <object>t.timer
time_wrapper.stop()
###############################
### pollset implementation ###
###############################
cdef void init_loop() with gil:
pass
cdef void destroy_loop() with gil:
g_pool.join()
cdef void kick_loop() with gil:
g_event.set()
cdef void run_loop(size_t timeout_ms) with gil:
timeout = timeout_ms / 1000.0
if timeout_ms > 0:
g_event.wait(timeout)
g_event.clear()
###############################
### Initializer ###############
###############################
cdef grpc_socket_vtable gevent_socket_vtable
cdef grpc_custom_resolver_vtable gevent_resolver_vtable
cdef grpc_custom_timer_vtable gevent_timer_vtable
cdef grpc_custom_poller_vtable gevent_pollset_vtable
def init_grpc_gevent():
# Lazily import gevent
global gevent_socket
global gevent_g
global gevent_hub
global gevent_event
global g_event
global g_pool
import gevent
gevent_g = gevent
import gevent.socket
gevent_socket = gevent.socket
import gevent.hub
gevent_hub = gevent.hub
import gevent.event
gevent_event = gevent.event
import gevent.pool
g_event = gevent.event.Event()
g_pool = gevent.pool.Group()
gevent_resolver_vtable.resolve = socket_resolve
gevent_resolver_vtable.resolve_async = socket_resolve_async
gevent_socket_vtable.init = socket_init
gevent_socket_vtable.connect = socket_connect
gevent_socket_vtable.destroy = socket_destroy
gevent_socket_vtable.shutdown = socket_shutdown
gevent_socket_vtable.close = socket_close
gevent_socket_vtable.write = socket_write
gevent_socket_vtable.read = socket_read
gevent_socket_vtable.getpeername = socket_getpeername
gevent_socket_vtable.getsockname = socket_getsockname
gevent_socket_vtable.setsockopt = socket_setsockopt
gevent_socket_vtable.bind = socket_bind
gevent_socket_vtable.listen = socket_listen
gevent_socket_vtable.accept = socket_accept
gevent_timer_vtable.start = timer_start
gevent_timer_vtable.stop = timer_stop
gevent_pollset_vtable.init = init_loop
gevent_pollset_vtable.poll = run_loop
gevent_pollset_vtable.kick = kick_loop
gevent_pollset_vtable.shutdown = destroy_loop
grpc_custom_iomgr_init(&gevent_socket_vtable,
&gevent_resolver_vtable,
&gevent_timer_vtable,
&gevent_pollset_vtable)

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++
include "_cygrpc/grpc.pxi"
@ -27,3 +28,5 @@ include "_cygrpc/security.pxd.pxi"
include "_cygrpc/server.pxd.pxi"
include "_cygrpc/tag.pxd.pxi"
include "_cygrpc/time.pxd.pxi"
include "_cygrpc/grpc_gevent.pxd"

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++
cimport cpython
@ -35,6 +36,8 @@ include "_cygrpc/server.pyx.pxi"
include "_cygrpc/tag.pyx.pxi"
include "_cygrpc/time.pyx.pxi"
include "_cygrpc/grpc_gevent.pyx"
#
# initialize gRPC
#

@ -0,0 +1,17 @@
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""gRPC's experimental APIs.
These APIs are subject to be removed during any minor version release.
"""

@ -0,0 +1,27 @@
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""gRPC's Python gEvent APIs."""
from grpc._cython import cygrpc as _cygrpc
def init_gevent():
"""Patches gRPC's libraries to be compatible with gevent.
This must be called AFTER the python standard lib has been patched,
but BEFORE creating and gRPC objects.
In order for progress to be made, the application must drive the event loop.
"""
_cygrpc.init_grpc_gevent()

@ -108,6 +108,55 @@ class TestLite(setuptools.Command):
self.distribution.fetch_build_eggs(self.distribution.tests_require)
class TestGevent(setuptools.Command):
"""Command to run tests w/gevent."""
BANNED_TESTS = (
# These tests send a lot of RPCs and are really slow on gevent. They will
# eventually succeed, but need to dig into performance issues.
'unit._cython._no_messages_server_completion_queue_per_call_test.Test.test_rpcs',
'unit._cython._no_messages_single_server_completion_queue_test.Test.test_rpcs',
# I have no idea why this doesn't work in gevent, but it shouldn't even be
# using the c-core
'testing._client_test.ClientTest.test_infinite_request_stream_real_time',
# TODO(https://github.com/grpc/grpc/issues/14789) enable this test
'unit._server_ssl_cert_config_test',
# Beta API is unsupported for gevent
'protoc_plugin.beta_python_plugin_test',
'unit.beta._beta_features_test',
)
description = 'run tests with gevent. Assumes grpc/gevent are installed'
user_options = []
def initialize_options(self):
pass
def finalize_options(self):
# distutils requires this override.
pass
def run(self):
from gevent import monkey
monkey.patch_all()
import tests
import grpc.experimental.gevent
grpc.experimental.gevent.init_gevent()
import gevent
import tests
loader = tests.Loader()
loader.loadTestsFromNames(['tests'])
runner = tests.Runner()
runner.skip_tests(self.BANNED_TESTS)
result = gevent.spawn(runner.run, loader.suite)
result.join()
if not result.value.wasSuccessful():
sys.exit('Test failure')
class RunInterop(test.test):
description = 'run interop test client/server'

@ -50,7 +50,8 @@ COMMAND_CLASS = {
'build_package_protos': grpc_tools.command.BuildPackageProtos,
'build_py': commands.BuildPy,
'run_interop': commands.RunInterop,
'test_lite': commands.TestLite
'test_lite': commands.TestLite,
'test_gevent': commands.TestGevent,
}
PACKAGE_DATA = {

@ -117,6 +117,12 @@ class AugmentedCase(collections.namedtuple('AugmentedCase', ['case', 'id'])):
class Runner(object):
def __init__(self):
self._skipped_tests = []
def skip_tests(self, tests):
self._skipped_tests = tests
def run(self, suite):
"""See setuptools' test_runner setup argument for information."""
# only run test cases with id starting with given prefix
@ -181,27 +187,31 @@ class Runner(object):
# Run the tests
result.startTestRun()
for augmented_case in augmented_cases:
sys.stdout.write('Running {}\n'.format(
augmented_case.case.id()))
sys.stdout.flush()
case_thread = threading.Thread(
target=augmented_case.case.run, args=(result,))
try:
with stdout_pipe, stderr_pipe:
case_thread.start()
while case_thread.is_alive():
check_kill_self()
time.sleep(0)
case_thread.join()
except:
# re-raise the exception after forcing the with-block to end
raise
result.set_output(augmented_case.case, stdout_pipe.output(),
stderr_pipe.output())
sys.stdout.write(result_out.getvalue())
sys.stdout.flush()
result_out.truncate(0)
check_kill_self()
for skipped_test in self._skipped_tests:
if skipped_test in augmented_case.case.id():
break
else:
sys.stdout.write('Running {}\n'.format(
augmented_case.case.id()))
sys.stdout.flush()
case_thread = threading.Thread(
target=augmented_case.case.run, args=(result,))
try:
with stdout_pipe, stderr_pipe:
case_thread.start()
while case_thread.is_alive():
check_kill_self()
time.sleep(0)
case_thread.join()
except:
# re-raise the exception after forcing the with-block to end
raise
result.set_output(augmented_case.case, stdout_pipe.output(),
stderr_pipe.output())
sys.stdout.write(result_out.getvalue())
sys.stdout.flush()
result_out.truncate(0)
check_kill_self()
result.stopTestRun()
stdout_pipe.close()
stderr_pipe.close()

@ -152,6 +152,12 @@ pip_install_dir() {
cd "$PWD"
}
case "$VENV" in
*gevent*)
$VENV_PYTHON -m pip install gevent
;;
esac
$VENV_PYTHON -m pip install --upgrade pip==9.0.1
$VENV_PYTHON -m pip install setuptools
$VENV_PYTHON -m pip install cython

@ -22,7 +22,7 @@ PYTHON=$(realpath "${1:-py27/bin/python}")
ROOT=$(pwd)
$PYTHON "$ROOT/src/python/grpcio_tests/setup.py" test_lite
$PYTHON "$ROOT/src/python/grpcio_tests/setup.py" "$2"
mkdir -p "$ROOT/reports"
rm -rf "$ROOT/reports/python-coverage"

@ -545,13 +545,13 @@ class PythonLanguage:
def client_cmd(self, args):
return [
'py27/bin/python', 'src/python/grpcio_tests/setup.py',
'py27_native/bin/python', 'src/python/grpcio_tests/setup.py',
'run_interop', '--client', '--args="{}"'.format(' '.join(args))
]
def client_cmd_http2interop(self, args):
return [
'py27/bin/python',
'py27_native/bin/python',
'src/python/grpcio_tests/tests/http2/negative_http2_client.py',
] + args
@ -560,7 +560,7 @@ class PythonLanguage:
def server_cmd(self, args):
return [
'py27/bin/python', 'src/python/grpcio_tests/setup.py',
'py27_native/bin/python', 'src/python/grpcio_tests/setup.py',
'run_interop', '--server', '--args="{}"'.format(' '.join(args))
]

@ -204,19 +204,28 @@ def _is_use_docker_child():
_PythonConfigVars = collections.namedtuple('_ConfigVars', [
'shell', 'builder', 'builder_prefix_arguments', 'venv_relative_python',
'toolchain', 'runner'
'shell',
'builder',
'builder_prefix_arguments',
'venv_relative_python',
'toolchain',
'runner',
'test_name',
'iomgr_platform',
])
def _python_config_generator(name, major, minor, bits, config_vars):
name += '_' + config_vars.iomgr_platform
return PythonConfig(
name, config_vars.shell + config_vars.builder +
config_vars.builder_prefix_arguments + [
_python_pattern_function(major=major, minor=minor, bits=bits)
] + [name] + config_vars.venv_relative_python + config_vars.toolchain,
config_vars.shell + config_vars.runner +
[os.path.join(name, config_vars.venv_relative_python[0])])
config_vars.shell + config_vars.runner + [
os.path.join(name, config_vars.venv_relative_python[0]),
config_vars.test_name
])
def _pypy_config_generator(name, major, config_vars):
@ -281,7 +290,7 @@ class CLanguage(object):
self._docker_distro, self._make_options = self._compiler_options(
self.args.use_docker, self.args.compiler)
if args.iomgr_platform == "uv":
cflags = '-DGRPC_UV -DGRPC_CUSTOM_IOMGR_THREAD_CHECK '
cflags = '-DGRPC_UV -DGRPC_CUSTOM_IOMGR_THREAD_CHECK -DGRPC_CUSTOM_SOCKET '
try:
cflags += subprocess.check_output(
['pkg-config', '--cflags', 'libuv']).strip() + ' '
@ -770,12 +779,16 @@ class PythonLanguage(object):
venv_relative_python = ['bin/python']
toolchain = ['unix']
test_command = 'test_lite'
if args.iomgr_platform == 'gevent':
test_command = 'test_gevent'
runner = [
os.path.abspath('tools/run_tests/helper_scripts/run_python.sh')
]
config_vars = _PythonConfigVars(shell, builder,
builder_prefix_arguments,
venv_relative_python, toolchain, runner)
config_vars = _PythonConfigVars(
shell, builder, builder_prefix_arguments, venv_relative_python,
toolchain, runner, test_command, args.iomgr_platform)
python27_config = _python_config_generator(
name='py27',
major='2',
@ -1341,7 +1354,7 @@ argp.add_argument(
)
argp.add_argument(
'--iomgr_platform',
choices=['native', 'uv'],
choices=['native', 'uv', 'gevent'],
default='native',
help='Selects iomgr platform to build on')
argp.add_argument(

@ -114,7 +114,7 @@ def _workspace_jobspec(name,
def _generate_jobs(languages,
configs,
platforms,
iomgr_platform='native',
iomgr_platforms=['native'],
arch=None,
compiler=None,
labels=[],
@ -125,40 +125,43 @@ def _generate_jobs(languages,
result = []
for language in languages:
for platform in platforms:
for config in configs:
name = '%s_%s_%s_%s' % (language, platform, config,
iomgr_platform)
runtests_args = [
'-l', language, '-c', config, '--iomgr_platform',
iomgr_platform
]
if arch or compiler:
name += '_%s_%s' % (arch, compiler)
runtests_args += ['--arch', arch, '--compiler', compiler]
if '--build_only' in extra_args:
name += '_buildonly'
for extra_env in extra_envs:
name += '_%s_%s' % (extra_env, extra_envs[extra_env])
runtests_args += extra_args
if platform == 'linux':
job = _docker_jobspec(
name=name,
runtests_args=runtests_args,
runtests_envs=extra_envs,
inner_jobs=inner_jobs,
timeout_seconds=timeout_seconds)
else:
job = _workspace_jobspec(
name=name,
runtests_args=runtests_args,
runtests_envs=extra_envs,
inner_jobs=inner_jobs,
timeout_seconds=timeout_seconds)
job.labels = [platform, config, language, iomgr_platform
] + labels
result.append(job)
for iomgr_platform in iomgr_platforms:
for config in configs:
name = '%s_%s_%s_%s' % (language, platform, config,
iomgr_platform)
runtests_args = [
'-l', language, '-c', config, '--iomgr_platform',
iomgr_platform
]
if arch or compiler:
name += '_%s_%s' % (arch, compiler)
runtests_args += [
'--arch', arch, '--compiler', compiler
]
if '--build_only' in extra_args:
name += '_buildonly'
for extra_env in extra_envs:
name += '_%s_%s' % (extra_env, extra_envs[extra_env])
runtests_args += extra_args
if platform == 'linux':
job = _docker_jobspec(
name=name,
runtests_args=runtests_args,
runtests_envs=extra_envs,
inner_jobs=inner_jobs,
timeout_seconds=timeout_seconds)
else:
job = _workspace_jobspec(
name=name,
runtests_args=runtests_args,
runtests_envs=extra_envs,
inner_jobs=inner_jobs,
timeout_seconds=timeout_seconds)
job.labels = [platform, config, language, iomgr_platform
] + labels
result.append(job)
return result
@ -184,13 +187,22 @@ def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS):
timeout_seconds=_CPP_RUNTESTS_TIMEOUT)
test_jobs += _generate_jobs(
languages=['csharp', 'python'],
languages=['csharp'],
configs=['dbg', 'opt'],
platforms=['linux', 'macos', 'windows'],
labels=['basictests', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
test_jobs += _generate_jobs(
languages=['python'],
configs=['opt'],
platforms=['linux', 'macos', 'windows'],
iomgr_platforms=['native', 'gevent'],
labels=['basictests', 'multilang'],
extra_args=extra_args,
inner_jobs=inner_jobs)
# supported on linux and mac.
test_jobs += _generate_jobs(
languages=['c++'],
@ -377,7 +389,7 @@ def _create_portability_test_jobs(extra_args=[],
languages=['c'],
configs=['dbg'],
platforms=['linux'],
iomgr_platform='uv',
iomgr_platforms=['uv'],
labels=['portability', 'corelang'],
extra_args=extra_args,
inner_jobs=inner_jobs,

Loading…
Cancel
Save