Reimplement Gevent Integration (#28276)

* WIP

* Add gevent test suite run under Bazel.

* Fix things up

* Yapf

* Fix up Bazel files

* Make py_grpc_test fancier

* Attempt to fix Windows RBE

* Attempt to kick GitHub

* Fix Python 2 runs

* Yet more fixes

* And the patch file too

* I am an idiot

* Mark gevent tests flaky

* Try to make rules_python more tolerant

* Typo

* Exclude reconnect test from gevent

* Remove unnecessary parts of patch

* Buildifier

* You saw nothing

* isort

* Move py_grpc_test to an internal-only file

* Review comments

* More reviewer comments

* Review

* Add initial changes for gevent

* WIP. Run completion_queue_next in a threadpool

* WIP.

* WIP

* Re-remove skip annotation

* Finally working

* Reactivate tests

* Clean up

* Move C++ threading utilities to grpc.pxi

* Unbreak sync stack

* Refix test flake

* WIP. Trying to get things working properly

* Move test stuff to test runner

* Clean up

* Can't handle exceptions if you don't compile with exceptions

* Remove debug stuff unintentionally left in

* Add greenlet switch loggging and fix threading issue

* Only run a greenlet scheduling greenlet when there are open channels

* Format

* Add threadpool modifications to old runner

* And actually import gevent
pull/28853/head
Richard Belleville 3 years ago committed by GitHub
parent 09e7e7456b
commit 27bc6fe779
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 55
      bazel/_gevent_test_main.py
  2. 3
      bazel/gevent_test.bzl
  3. 4
      src/python/grpcio/grpc/_channel.py
  4. 18
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
  5. 4
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
  6. 36
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
  7. 33
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  8. 28
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi
  9. 506
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi
  10. 132
      src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi
  11. 63
      src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi
  12. 1
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  13. 2
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  14. 11
      src/python/grpcio_tests/commands.py
  15. 4
      src/python/grpcio_tests/tests/unit/BUILD.bazel
  16. 3
      src/python/grpcio_tests/tests/unit/_compression_test.py
  17. 11
      src/python/grpcio_tests/tests/unit/_contextvars_propagation_test.py
  18. 4
      src/python/grpcio_tests/tests/unit/_local_credentials_test.py
  19. 2
      src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py
  20. 3
      src/python/grpcio_tests/tests/unit/_reconnect_test.py
  21. 5
      src/python/grpcio_tests/tests/unit/_rpc_part_1_test.py
  22. 3
      src/python/grpcio_tests/tests/unit/_rpc_part_2_test.py
  23. 2
      src/python/grpcio_tests/tests/unit/_rpc_test_helpers.py
  24. 12
      src/python/grpcio_tests/tests/unit/_signal_client.py
  25. 22
      src/python/grpcio_tests/tests/unit/_signal_handling_test.py

@ -12,13 +12,59 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import gevent
from gevent import monkey
monkey.patch_all()
threadpool = gevent.hub.get_hub().threadpool
# Currently, each channel corresponds to a single native thread in the
# gevent threadpool. Thus, when the unit test suite spins up hundreds of
# channels concurrently, some will be starved out, causing the test to
# increase in duration. We increase the max size here so this does not
# happen.
threadpool.maxsize = 1024
threadpool.size = 32
import traceback, signal
from typing import Sequence
import grpc.experimental.gevent
grpc.experimental.gevent.init_gevent()
import gevent
import greenlet
import datetime
import grpc
import unittest
import sys
import os
import pkgutil
from typing import Sequence
def trace_callback(event, args):
if event in ("switch", "throw"):
origin, target = args
sys.stderr.write("{} Transfer from {} to {} with {}\n".format(datetime.datetime.now(), origin, target, event))
else:
sys.stderr.write("Unknown event {}.\n".format(event))
sys.stderr.flush()
if os.getenv("GREENLET_TRACE") is not None:
greenlet.settrace(trace_callback)
def debug(sig, frame):
d={'_frame':frame}
d.update(frame.f_globals)
d.update(frame.f_locals)
sys.stderr.write("Traceback:\n{}".format("\n".join(traceback.format_stack(frame))))
import gevent.util; gevent.util.print_run_info()
sys.stderr.flush()
signal.signal(signal.SIGTERM, debug)
class SingleLoader(object):
def __init__(self, pattern: str):
@ -38,13 +84,6 @@ class SingleLoader(object):
return self.suite
if __name__ == "__main__":
from gevent import monkey
monkey.patch_all()
import grpc.experimental.gevent
grpc.experimental.gevent.init_gevent()
import gevent
if len(sys.argv) != 2:
print(f"USAGE: {sys.argv[0]} TARGET_MODULE", file=sys.stderr)

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Houses py_grpc_gevent_test.
"""
@ -81,6 +80,6 @@ def py_grpc_gevent_test(
srcs = [copied_main_filename],
main = copied_main_filename,
python_version = "PY3",
flaky = True,
flaky = False,
**kwargs
)

@ -1481,6 +1481,8 @@ class Channel(grpc.Channel):
self._call_state = _ChannelCallState(self._channel)
self._connectivity_state = _ChannelConnectivityState(self._channel)
cygrpc.fork_register_channel(self)
if cygrpc.g_gevent_activated:
cygrpc.gevent_increment_channel_count()
def _process_python_options(self, python_options):
"""Sets channel attributes according to python-only channel options."""
@ -1547,6 +1549,8 @@ class Channel(grpc.Channel):
self._unsubscribe_all()
self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
cygrpc.fork_unregister_channel(self)
if cygrpc.g_gevent_activated:
cygrpc.gevent_decrement_channel_count()
def _close_on_fork(self):
self._unsubscribe_all()

@ -12,24 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# NOTE(lidiz) Unfortunately, we can't use "cimport" here because Cython
# links it with exception handling. It introduces new dependencies.
cdef extern from "<queue>" namespace "std" nogil:
cdef cppclass queue[T]:
queue()
bint empty()
T& front()
void pop()
void push(T&)
size_t size()
cdef extern from "<mutex>" namespace "std" nogil:
cdef cppclass mutex:
mutex()
void lock()
void unlock()
ctypedef queue[grpc_event] cpp_event_queue

@ -12,12 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
cdef int g_interrupt_check_period_ms = 200
cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except *
cdef _interpret_event(grpc_event c_event)
cdef class _LatentEventArg:
cdef grpc_completion_queue *c_completion_queue
cdef object deadline
cdef class CompletionQueue:

@ -13,14 +13,12 @@
# limitations under the License.
cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except *:
global g_interrupt_check_period_ms
cdef gpr_timespec c_increment
cdef gpr_timespec c_timeout
cdef gpr_timespec c_deadline
c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
c_increment = gpr_time_from_millis(g_interrupt_check_period_ms, GPR_TIMESPAN)
if deadline is None:
c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
else:
@ -33,7 +31,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) excep
c_timeout = c_deadline
c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL)
if (c_event.type != GRPC_QUEUE_TIMEOUT or
gpr_time_cmp(c_timeout, c_deadline) == 0):
break
@ -42,7 +40,6 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) excep
cpython.PyErr_CheckSignals()
return c_event
cdef _interpret_event(grpc_event c_event):
cdef _Tag tag
if c_event.type == GRPC_QUEUE_TIMEOUT:
@ -58,11 +55,25 @@ cdef _interpret_event(grpc_event c_event):
cpython.Py_DECREF(tag)
return tag, tag.event(c_event)
cdef _internal_latent_event(_LatentEventArg latent_event_arg):
cdef grpc_event c_event = _next(latent_event_arg.c_completion_queue, latent_event_arg.deadline)
return _interpret_event(c_event)
cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline):
cdef grpc_event c_event = _next(c_completion_queue, deadline)
return _interpret_event(c_event)
global g_gevent_activated
latent_event_arg = _LatentEventArg()
latent_event_arg.c_completion_queue = c_completion_queue
latent_event_arg.deadline = deadline
if g_gevent_activated:
# For gevent, completion_queue_next is run in a native thread pool.
global g_gevent_threadpool
result = g_gevent_threadpool.apply(_internal_latent_event, (latent_event_arg,))
return result
else:
return _internal_latent_event(latent_event_arg)
cdef class CompletionQueue:
@ -87,10 +98,17 @@ cdef class CompletionQueue:
self.is_shutdown = True
return event
def _internal_poll(self, deadline):
return self._interpret_event(_next(self.c_completion_queue, deadline))
# We name this 'poll' to avoid problems with CPython's expectations for
# 'special' methods (like next and __next__).
def poll(self, deadline=None):
return self._interpret_event(_next(self.c_completion_queue, deadline))
global g_gevent_activated
if g_gevent_activated:
return g_gevent_threadpool.apply(CompletionQueue._internal_poll, (self, deadline))
else:
return self._internal_poll(deadline)
def shutdown(self):
with nogil:

@ -25,6 +25,37 @@ ctypedef unsigned short uint16_t
ctypedef unsigned int uint32_t
ctypedef unsigned long long uint64_t
# C++ Utilities
# NOTE(lidiz) Unfortunately, we can't use "cimport" here because Cython
# links it with exception handling. It introduces new dependencies.
cdef extern from "<queue>" namespace "std" nogil:
cdef cppclass queue[T]:
queue()
bint empty()
T& front()
T& back()
void pop()
void push(T&)
size_t size()
cdef extern from "<mutex>" namespace "std" nogil:
cdef cppclass mutex:
mutex()
void lock()
void unlock()
cdef cppclass unique_lock[Mutex]:
unique_lock(Mutex&)
cdef extern from "<condition_variable>" namespace "std" nogil:
cdef cppclass condition_variable:
condition_variable()
void notify_all()
void wait(unique_lock[mutex]&)
# gRPC Core Declarations
cdef extern from "grpc/support/alloc.h":
@ -700,3 +731,5 @@ cdef extern from "grpc/grpc_security_constants.h":
cdef extern from "src/core/lib/iomgr/error.h":
ctypedef grpc_error* grpc_error_handle
grpc_error_handle GRPC_ERROR_CANCELLED
struct grpc_error:
pass

@ -13,29 +13,9 @@
# limitations under the License.
# distutils: language=c++
cdef class TimerWrapper:
g_gevent_threadpool = None
g_gevent_activated = False
cdef grpc_custom_timer *c_timer
cdef object timer
cdef object event
cpdef void gevent_increment_channel_count()
cdef class SocketWrapper:
cdef object sockopts
cdef object socket
cdef object closed
cdef grpc_custom_socket *c_socket
cdef char* c_buffer
cdef size_t len
cdef grpc_custom_socket *accepting_socket
cdef grpc_custom_connect_callback connect_cb
cdef grpc_custom_write_callback write_cb
cdef grpc_custom_read_callback read_cb
cdef grpc_custom_accept_callback accept_cb
cdef grpc_custom_close_callback close_cb
cdef class ResolveWrapper:
cdef grpc_custom_resolver *c_resolver
cdef const char* c_host
cdef const char* c_port
cpdef void gevent_decrement_channel_count()

@ -14,418 +14,124 @@
# distutils: language=c++
from libc cimport string
import errno
import sys
gevent_g = None
gevent_socket = None
gevent_hub = None
gevent_event = None
g_event = None
g_pool = None
def _spawn_greenlet(*args):
greenlet = g_pool.spawn(*args)
###############################
### socket implementation ###
###############################
cdef class SocketWrapper:
def __cinit__(self):
fork_handlers_and_grpc_init()
self.sockopts = []
self.socket = None
self.c_socket = NULL
self.c_buffer = NULL
self.len = 0
def __dealloc__(self):
grpc_shutdown()
cdef grpc_error_handle socket_init(grpc_custom_socket* socket, int domain) with gil:
sw = SocketWrapper()
sw.c_socket = socket
sw.sockopts = []
cpython.Py_INCREF(sw)
# Python doesn't support AF_UNSPEC sockets, so we defer creation until
# bind/connect when we know what type of socket we need
sw.socket = None
sw.closed = False
sw.accepting_socket = NULL
socket.impl = <void*>sw
return grpc_error_none()
cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple):
try:
socket_wrapper.socket.connect(addr_tuple)
socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
grpc_error_none())
except IOError as io_error:
socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
socket_error("connect", str(io_error)))
g_event.set()
def socket_connect_async(socket_wrapper, addr_tuple):
socket_connect_async_cython(socket_wrapper, addr_tuple)
cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t addr_len,
grpc_custom_connect_callback cb) with gil:
py_socket = None
socket_wrapper = <SocketWrapper>socket.impl
socket_wrapper.connect_cb = cb
addr_tuple = sockaddr_to_tuple(addr, addr_len)
if sockaddr_is_ipv4(addr, addr_len):
py_socket = gevent_socket.socket(gevent_socket.AF_INET)
else:
py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
applysockopts(py_socket)
socket_wrapper.socket = py_socket
_spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple)
cdef void socket_destroy(grpc_custom_socket* socket) with gil:
cpython.Py_DECREF(<SocketWrapper>socket.impl)
cdef void socket_shutdown(grpc_custom_socket* socket) with gil:
try:
(<SocketWrapper>socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR)
except IOError as io_error:
if io_error.errno != errno.ENOTCONN:
raise io_error
cdef void socket_close(grpc_custom_socket* socket,
grpc_custom_close_callback cb) with gil:
socket_wrapper = (<SocketWrapper>socket.impl)
if socket_wrapper.socket is not None:
socket_wrapper.socket.close()
socket_wrapper.closed = True
socket_wrapper.close_cb = cb
# Delay the close callback until the accept() call has picked it up
if socket_wrapper.accepting_socket != NULL:
return
socket_wrapper.close_cb(socket)
def socket_sendmsg(socket, write_bytes):
try:
return socket.sendmsg(write_bytes)
except AttributeError:
# sendmsg not available on all Pythons/Platforms
return socket.send(b''.join(write_bytes))
cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes):
try:
while write_bytes:
sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes)
while sent_byte_count > 0:
if sent_byte_count < len(write_bytes[0]):
write_bytes[0] = write_bytes[0][sent_byte_count:]
sent_byte_count = 0
else:
sent_byte_count -= len(write_bytes[0])
write_bytes = write_bytes[1:]
socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
grpc_error_none())
except IOError as io_error:
socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
socket_error("send", str(io_error)))
g_event.set()
def socket_write_async(socket_wrapper, write_bytes):
socket_write_async_cython(socket_wrapper, write_bytes)
cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer,
grpc_custom_write_callback cb) with gil:
cdef char* start
sw = <SocketWrapper>socket.impl
sw.write_cb = cb
write_bytes = []
for i in range(buffer.count):
start = grpc_slice_buffer_start(buffer, i)
length = grpc_slice_buffer_length(buffer, i)
write_bytes.append(<bytes>start[:length])
_spawn_greenlet(socket_write_async, <SocketWrapper>socket.impl, write_bytes)
cdef socket_read_async_cython(SocketWrapper socket_wrapper):
cdef char* buff_char_arr
try:
buff_str = socket_wrapper.socket.recv(socket_wrapper.len)
buff_char_arr = buff_str
string.memcpy(<void*>socket_wrapper.c_buffer, buff_char_arr, len(buff_str))
socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
len(buff_str), grpc_error_none())
except IOError as io_error:
socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
-1, socket_error("recv", str(io_error)))
g_event.set()
def socket_read_async(socket_wrapper):
socket_read_async_cython(socket_wrapper)
cdef void socket_read(grpc_custom_socket* socket, char* buffer,
size_t length, grpc_custom_read_callback cb) with gil:
sw = <SocketWrapper>socket.impl
sw.read_cb = cb
sw.c_buffer = buffer
sw.len = length
_spawn_greenlet(socket_read_async, sw)
cdef grpc_error_handle socket_getpeername(grpc_custom_socket* socket,
const grpc_sockaddr* addr,
int* length) with gil:
cdef char* src_buf
peer = (<SocketWrapper>socket.impl).socket.getpeername()
cdef grpc_resolved_address c_addr
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()
cdef grpc_error_handle socket_getsockname(grpc_custom_socket* socket,
const grpc_sockaddr* addr,
int* length) with gil:
cdef char* src_buf
cdef grpc_resolved_address c_addr
if (<SocketWrapper>socket.impl).socket is None:
peer = ('0.0.0.0', 0)
else:
peer = (<SocketWrapper>socket.impl).socket.getsockname()
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()
def applysockopts(s):
s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1)
s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True)
cdef grpc_error_handle socket_bind(grpc_custom_socket* socket,
const grpc_sockaddr* addr,
size_t len, int flags) with gil:
addr_tuple = sockaddr_to_tuple(addr, len)
try:
try:
py_socket = gevent_socket.socket(gevent_socket.AF_INET)
applysockopts(py_socket)
py_socket.bind(addr_tuple)
except gevent_socket.gaierror as e:
py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
applysockopts(py_socket)
py_socket.bind(addr_tuple)
(<SocketWrapper>socket.impl).socket = py_socket
except IOError as io_error:
return socket_error("bind", str(io_error))
else:
return grpc_error_none()
cdef grpc_error_handle socket_listen(grpc_custom_socket* socket) with gil:
(<SocketWrapper>socket.impl).socket.listen(50)
return grpc_error_none()
cdef void accept_callback_cython(SocketWrapper s) except *:
try:
conn, address = s.socket.accept()
sw = SocketWrapper()
sw.closed = False
sw.c_socket = s.accepting_socket
sw.sockopts = []
sw.socket = conn
sw.c_socket.impl = <void*>sw
sw.accepting_socket = NULL
cpython.Py_INCREF(sw)
s.accepting_socket = NULL
s.accept_cb(<grpc_custom_socket*>s.c_socket, sw.c_socket, grpc_error_none())
except IOError as io_error:
#TODO actual error
s.accepting_socket = NULL
s.accept_cb(<grpc_custom_socket*>s.c_socket, s.accepting_socket,
socket_error("accept", str(io_error)))
if s.closed:
s.close_cb(<grpc_custom_socket*>s.c_socket)
g_event.set()
def socket_accept_async(s):
accept_callback_cython(s)
from cython.operator cimport dereference
cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client,
grpc_custom_accept_callback cb) with gil:
sw = <SocketWrapper>socket.impl
sw.accepting_socket = client
sw.accept_cb = cb
_spawn_greenlet(socket_accept_async, sw)
from cpython cimport Py_INCREF, Py_DECREF
#####################################
######Resolver implementation #######
#####################################
cdef class ResolveWrapper:
def __cinit__(self):
fork_handlers_and_grpc_init()
self.c_resolver = NULL
self.c_host = NULL
self.c_port = NULL
def __dealloc__(self):
grpc_shutdown()
cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper):
try:
res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port)
grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
tuples_to_resolvaddr(res), grpc_error_none())
except IOError as io_error:
grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
<grpc_resolved_addresses*>0,
socket_error("getaddrinfo", str(io_error)))
g_event.set()
def socket_resolve_async_python(resolve_wrapper):
socket_resolve_async_cython(resolve_wrapper)
cdef void socket_resolve_async(grpc_custom_resolver* r, const char* host, const char* port) with gil:
rw = ResolveWrapper()
rw.c_resolver = r
rw.c_host = host
rw.c_port = port
_spawn_greenlet(socket_resolve_async_python, rw)
cdef grpc_error_handle socket_resolve(const char* host, const char* port,
grpc_resolved_addresses** res) with gil:
try:
result = gevent_socket.getaddrinfo(host, port)
res[0] = tuples_to_resolvaddr(result)
return grpc_error_none()
except IOError as io_error:
return socket_error("getaddrinfo", str(io_error))
###############################
### timer implementation ######
###############################
cdef class TimerWrapper:
def __cinit__(self, deadline):
fork_handlers_and_grpc_init()
self.timer = gevent_hub.get_hub().loop.timer(deadline)
self.event = None
def start(self):
self.event = gevent_event.Event()
self.timer.start(self.on_finish)
def on_finish(self):
grpc_custom_timer_callback(self.c_timer, grpc_error_none())
self.timer.stop()
g_event.set()
def stop(self):
self.event.set()
self.timer.stop()
def __dealloc__(self):
grpc_shutdown()
cdef void timer_start(grpc_custom_timer* t) with gil:
timer = TimerWrapper(t.timeout_ms / 1000.0)
timer.c_timer = t
t.timer = <void*>timer
timer.start()
cdef void timer_stop(grpc_custom_timer* t) with gil:
time_wrapper = <object>t.timer
time_wrapper.stop()
###############################
### pollset implementation ###
###############################
cdef void init_loop() with gil:
pass
cdef void destroy_loop() with gil:
g_pool.join()
cdef void kick_loop() with gil:
g_event.set()
def _run_loop(timeout_ms):
timeout = timeout_ms / 1000.0
if timeout_ms > 0:
try:
g_event.wait(timeout)
finally:
g_event.clear()
cdef grpc_error_handle run_loop(size_t timeout_ms) with gil:
try:
_run_loop(timeout_ms)
return grpc_error_none()
except BaseException:
exc_info = sys.exc_info()
# Avoid running any Python code after setting the exception
cpython.PyErr_SetObject(exc_info[0], exc_info[1])
return GRPC_ERROR_CANCELLED
###############################
### Initializer ###############
###############################
import atexit
import errno
import sys
cdef grpc_socket_vtable gevent_socket_vtable
cdef grpc_custom_resolver_vtable gevent_resolver_vtable
cdef grpc_custom_timer_vtable gevent_timer_vtable
cdef grpc_custom_poller_vtable gevent_pollset_vtable
gevent_hub = None
g_gevent_pool = None
g_gevent_threadpool = None
g_gevent_activated = False
cdef queue[void*] g_greenlets_to_run
cdef condition_variable g_greenlets_cv
cdef mutex g_greenlets_mu
cdef bint g_shutdown_greenlets_to_run_queue = False
cdef int g_channel_count = 0
cdef _submit_to_greenlet_queue(object cb, tuple args):
cdef tuple to_call = (cb,) + args
cdef unique_lock[mutex]* lk
Py_INCREF(to_call)
with nogil:
lk = new unique_lock[mutex](g_greenlets_mu)
g_greenlets_to_run.push(<void*>(to_call))
del lk
g_greenlets_cv.notify_all()
cpdef void gevent_increment_channel_count():
global g_channel_count
cdef int old_channel_count
with nogil:
lk = new unique_lock[mutex](g_greenlets_mu)
old_channel_count = g_channel_count
g_channel_count += 1
del lk
if old_channel_count == 0:
run_spawn_greenlets()
cpdef void gevent_decrement_channel_count():
global g_channel_count
with nogil:
lk = new unique_lock[mutex](g_greenlets_mu)
g_channel_count -= 1
if g_channel_count == 0:
g_greenlets_cv.notify_all()
del lk
cdef object await_next_greenlet():
cdef unique_lock[mutex]* lk
with nogil:
# Cython doesn't allow us to do proper stack allocations, so we can't take
# advantage of RAII.
lk = new unique_lock[mutex](g_greenlets_mu)
while not g_shutdown_greenlets_to_run_queue and g_channel_count != 0:
if not g_greenlets_to_run.empty():
break
g_greenlets_cv.wait(dereference(lk))
if g_channel_count == 0:
del lk
return None
if g_shutdown_greenlets_to_run_queue:
del lk
return None
cdef object to_call = <object>g_greenlets_to_run.front()
Py_DECREF(to_call)
g_greenlets_to_run.pop()
del lk
return to_call
def spawn_greenlets():
while True:
to_call = g_gevent_threadpool.apply(await_next_greenlet, ())
if to_call is None:
break
fn = to_call[0]
args = to_call[1:]
fn(*args)
def run_spawn_greenlets():
g_gevent_pool.spawn(spawn_greenlets)
def shutdown_await_next_greenlet():
global g_shutdown_greenlets_to_run_queue
cdef unique_lock[mutex]* lk
with nogil:
lk = new unique_lock[mutex](g_greenlets_mu)
g_shutdown_greenlets_to_run_queue = True
del lk
g_greenlets_cv.notify_all()
def init_grpc_gevent():
# Lazily import gevent
global gevent_socket
global gevent_g
global gevent_hub
global gevent_event
global g_event
global g_pool
global g_gevent_threadpool
global g_gevent_activated
global g_interrupt_check_period_ms
global g_gevent_pool
import gevent
gevent_g = gevent
import gevent.socket
gevent_socket = gevent.socket
import gevent.hub
gevent_hub = gevent.hub
import gevent.event
gevent_event = gevent.event
import gevent.pool
g_event = gevent.event.Event()
g_pool = gevent.pool.Group()
def cb_func(cb, args):
_spawn_greenlet(cb, *args)
set_async_callback_func(cb_func)
gevent_hub = gevent.hub
g_gevent_threadpool = gevent_hub.get_hub().threadpool
gevent_resolver_vtable.resolve = socket_resolve
gevent_resolver_vtable.resolve_async = socket_resolve_async
g_gevent_activated = True
g_interrupt_check_period_ms = 2000
gevent_socket_vtable.init = socket_init
gevent_socket_vtable.connect = socket_connect
gevent_socket_vtable.destroy = socket_destroy
gevent_socket_vtable.shutdown = socket_shutdown
gevent_socket_vtable.close = socket_close
gevent_socket_vtable.write = socket_write
gevent_socket_vtable.read = socket_read
gevent_socket_vtable.getpeername = socket_getpeername
gevent_socket_vtable.getsockname = socket_getsockname
gevent_socket_vtable.bind = socket_bind
gevent_socket_vtable.listen = socket_listen
gevent_socket_vtable.accept = socket_accept
g_gevent_pool = gevent.pool.Group()
gevent_timer_vtable.start = timer_start
gevent_timer_vtable.stop = timer_stop
gevent_pollset_vtable.init = init_loop
gevent_pollset_vtable.poll = run_loop
gevent_pollset_vtable.kick = kick_loop
gevent_pollset_vtable.shutdown = destroy_loop
set_async_callback_func(_submit_to_greenlet_queue)
grpc_custom_iomgr_init(&gevent_socket_vtable,
&gevent_resolver_vtable,
&gevent_timer_vtable,
&gevent_pollset_vtable)
# TODO: Document how this all works.
atexit.register(shutdown_await_next_greenlet)

@ -1,132 +0,0 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++
from libcpp cimport bool as bool_t
from libcpp.string cimport string as cppstring
cdef extern from "grpc/impl/codegen/slice.h":
struct grpc_slice_buffer:
int count
cdef extern from "src/core/lib/iomgr/error.h":
struct grpc_error:
pass
ctypedef grpc_error* grpc_error_handle
# TODO(https://github.com/grpc/grpc/issues/20135) Change the filename
# for something more meaningful.
cdef extern from "src/core/lib/iomgr/python_util.h":
grpc_error_handle grpc_socket_error(char* error)
char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i)
int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i)
cdef extern from "src/core/lib/iomgr/sockaddr.h":
ctypedef struct grpc_sockaddr:
pass
cdef extern from "src/core/lib/iomgr/resolve_address.h":
ctypedef struct grpc_resolved_addresses:
size_t naddrs
grpc_resolved_address* addrs
ctypedef struct grpc_resolved_address:
char[128] addr
size_t len
cdef extern from "src/core/lib/iomgr/resolve_address_custom.h":
struct grpc_custom_resolver:
pass
struct grpc_custom_resolver_vtable:
grpc_error_handle (*resolve)(const char* host, const char* port, grpc_resolved_addresses** res);
void (*resolve_async)(grpc_custom_resolver* resolver, const char* host, const char* port);
void grpc_custom_resolve_callback(grpc_custom_resolver* resolver,
grpc_resolved_addresses* result,
grpc_error_handle error);
cdef extern from "src/core/lib/iomgr/tcp_custom.h":
cdef int GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT
struct grpc_custom_socket:
void* impl
# We don't care about the rest of the fields
ctypedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket,
grpc_error_handle error)
ctypedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket,
grpc_error_handle error)
ctypedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket,
size_t nread, grpc_error_handle error)
ctypedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket,
grpc_custom_socket* client,
grpc_error_handle error)
ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket)
struct grpc_socket_vtable:
grpc_error_handle (*init)(grpc_custom_socket* socket, int domain);
void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t len, grpc_custom_connect_callback cb);
void (*destroy)(grpc_custom_socket* socket);
void (*shutdown)(grpc_custom_socket* socket);
void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb);
void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices,
grpc_custom_write_callback cb);
void (*read)(grpc_custom_socket* socket, char* buffer, size_t length,
grpc_custom_read_callback cb);
grpc_error_handle (*getpeername)(grpc_custom_socket* socket,
const grpc_sockaddr* addr, int* len);
grpc_error_handle (*getsockname)(grpc_custom_socket* socket,
const grpc_sockaddr* addr, int* len);
grpc_error_handle (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t len, int flags);
grpc_error_handle (*listen)(grpc_custom_socket* socket);
void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client,
grpc_custom_accept_callback cb);
cdef extern from "src/core/lib/iomgr/timer_custom.h":
struct grpc_custom_timer:
void* timer
int timeout_ms
# We don't care about the rest of the fields
struct grpc_custom_timer_vtable:
void (*start)(grpc_custom_timer* t);
void (*stop)(grpc_custom_timer* t);
void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error_handle error);
cdef extern from "src/core/lib/iomgr/pollset_custom.h":
struct grpc_custom_poller_vtable:
void (*init)()
grpc_error_handle (*poll)(size_t timeout_ms)
void (*kick)()
void (*shutdown)()
cdef extern from "src/core/lib/iomgr/iomgr_custom.h":
void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
grpc_custom_resolver_vtable* resolver,
grpc_custom_timer_vtable* timer,
grpc_custom_poller_vtable* poller);
cdef extern from "src/core/lib/address_utils/sockaddr_utils.h":
int grpc_sockaddr_get_port(const grpc_resolved_address *addr);
cppstring grpc_sockaddr_to_string(const grpc_resolved_address *addr,
bool_t normalize);
int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr,
int port)
const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* resolved_addr)
cdef extern from "src/core/lib/address_utils/parse_address.h":
grpc_error_handle grpc_string_to_sockaddr(grpc_resolved_address *out, char* addr, int port);

@ -1,63 +0,0 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++
from libc cimport string
from libc.stdlib cimport malloc
from libcpp.string cimport string as cppstring
cdef grpc_error_handle grpc_error_none():
return <grpc_error_handle>0
cdef grpc_error_handle socket_error(str syscall, str err):
error_str = "{} failed: {}".format(syscall, err)
error_bytes = str_to_bytes(error_str)
return grpc_socket_error(error_bytes)
cdef resolved_addr_to_tuple(grpc_resolved_address* address):
cdef cppstring res_str
port = grpc_sockaddr_get_port(address)
res_str = grpc_sockaddr_to_string(address, False)
byte_str = _decode(res_str)
if byte_str.endswith(':' + str(port)):
byte_str = byte_str[:(0 - len(str(port)) - 1)]
byte_str = byte_str.lstrip('[')
byte_str = byte_str.rstrip(']')
byte_str = '{}'.format(byte_str)
return byte_str, port
cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length):
cdef grpc_resolved_address c_addr
string.memcpy(<void*>c_addr.addr, <void*> address, length)
c_addr.len = length
return resolved_addr_to_tuple(&c_addr)
cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length):
cdef grpc_resolved_address c_addr
string.memcpy(<void*>c_addr.addr, <void*> address, length)
c_addr.len = length
return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4'
cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups):
cdef grpc_resolved_addresses* addresses
tups_set = set((tup[4][0], tup[4][1]) for tup in tups)
addresses = <grpc_resolved_addresses*> malloc(sizeof(grpc_resolved_addresses))
addresses.naddrs = len(tups_set)
addresses.addrs = <grpc_resolved_address*> malloc(sizeof(grpc_resolved_address) * len(tups_set))
i = 0
for tup in set(tups_set):
hostname = str_to_bytes(tup[0])
grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1])
i += 1
return addresses

@ -34,7 +34,6 @@ include "_cygrpc/time.pxd.pxi"
include "_cygrpc/vtable.pxd.pxi"
include "_cygrpc/_hooks.pxd.pxi"
include "_cygrpc/iomgr.pxd.pxi"
include "_cygrpc/grpc_gevent.pxd.pxi"

@ -56,8 +56,6 @@ include "_cygrpc/time.pyx.pxi"
include "_cygrpc/vtable.pyx.pxi"
include "_cygrpc/_hooks.pyx.pxi"
include "_cygrpc/iomgr.pyx.pxi"
include "_cygrpc/grpc_gevent.pyx.pxi"
include "_cygrpc/thread.pyx.pxi"

@ -245,9 +245,20 @@ class TestGevent(setuptools.Command):
pass
def run(self):
import gevent
from gevent import monkey
monkey.patch_all()
threadpool = gevent.hub.get_hub().threadpool
# Currently, each channel corresponds to a single native thread in the
# gevent threadpool. Thus, when the unit test suite spins up hundreds of
# channels concurrently, some will be starved out, causing the test to
# increase in duration. We increase the max size here so this does not
# happen.
threadpool.maxsize = 1024
threadpool.size = 32
import grpc.experimental.gevent
import tests

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:internal_python_rules.bzl", "internal_py_grpc_test")
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
package(default_visibility = ["//visibility:public"])
@ -64,6 +65,9 @@ py_library(
py_library(
name = "_signal_client",
srcs = ["_signal_client.py"],
deps = [
requirement("gevent"),
],
)
py_library(

@ -25,7 +25,6 @@ import grpc
from grpc import _grpcio_metadata
from tests.unit import _tcp_proxy
from tests.unit import test_common
from tests.unit.framework.common import test_constants
_UNARY_UNARY = '/test/UnaryUnary'
@ -259,8 +258,6 @@ def _stream_stream_client(channel, multicallable_kwargs, message):
i, response))
@unittest.skipIf(test_common.running_under_gevent(),
"This test is nondeterministic under gevent.")
class CompressionTest(unittest.TestCase):
def assertCompressed(self, compression_ratio):

@ -75,7 +75,11 @@ if contextvars_supported():
class TestCallCredentials(grpc.AuthMetadataPlugin):
def __call__(self, context, callback):
if test_var.get() != _EXPECTED_VALUE:
if test_var.get(
) != _EXPECTED_VALUE and not test_common.running_under_gevent():
# contextvars do not work under gevent, but the rest of this
# test is still valuable as a test of concurrent runs of the
# metadata credentials code path.
raise AssertionError("{} != {}".format(test_var.get(),
_EXPECTED_VALUE))
callback((), None)
@ -97,8 +101,6 @@ else:
# TODO(https://github.com/grpc/grpc/issues/22257)
@unittest.skipIf(os.name == "nt", "LocalCredentials not supported on Windows.")
@unittest.skipIf(test_common.running_under_gevent(),
"ThreadLocals do not work under gevent.")
class ContextVarsPropagationTest(unittest.TestCase):
def test_propagation_to_auth_plugin(self):
@ -145,7 +147,8 @@ class ContextVarsPropagationTest(unittest.TestCase):
exception_queue.put(e)
threads = []
for _ in range(_RPC_COUNT):
for _ in range(_THREAD_COUNT):
q = queue.Queue()
thread = threading.Thread(target=_run_on_thread, args=(q,))
thread.setDaemon(True)

@ -19,8 +19,6 @@ import unittest
import grpc
from tests.unit import test_common
class _GenericHandler(grpc.GenericRpcHandler):
@ -58,8 +56,6 @@ class LocalCredentialsTest(unittest.TestCase):
@unittest.skipIf(os.name == 'nt',
'Unix Domain Socket is not supported on Windows')
@unittest.skipIf(test_common.running_under_gevent(),
'UDS not supported under gevent.')
def test_uds(self):
server_addr = 'unix:/tmp/grpc_fullstack_test'
channel_creds = grpc.local_channel_credentials(

@ -188,8 +188,6 @@ def _generic_handler(servicer):
return grpc.method_handlers_generic_handler(_SERVICE, method_handlers)
@unittest.skipIf(test_common.running_under_gevent(),
"Causes deadlock in gevent.")
class MetadataCodeDetailsTest(unittest.TestCase):
def setUp(self):

@ -21,7 +21,6 @@ import unittest
import grpc
from grpc.framework.foundation import logging_pool
from tests.unit import test_common
from tests.unit.framework.common import bound_socket
from tests.unit.framework.common import test_constants
@ -35,8 +34,6 @@ def _handle_unary_unary(unused_request, unused_servicer_context):
return _RESPONSE
@unittest.skipIf(test_common.running_under_gevent(),
"Test is nondeterministic under gevent.")
class ReconnectTest(unittest.TestCase):
def test_reconnect(self):

@ -22,7 +22,6 @@ import unittest
import grpc
from grpc.framework.foundation import logging_pool
from tests.unit import test_common
from tests.unit._rpc_test_helpers import BaseRPCTest
from tests.unit._rpc_test_helpers import Callback
from tests.unit._rpc_test_helpers import TIMEOUT_SHORT
@ -37,8 +36,6 @@ from tests.unit._rpc_test_helpers import unary_unary_multi_callable
from tests.unit.framework.common import test_constants
@unittest.skipIf(test_common.running_under_gevent(),
"This test is nondeterministic under gevent.")
class RPCPart1Test(BaseRPCTest, unittest.TestCase):
def testExpiredStreamRequestBlockingUnaryResponse(self):
@ -238,4 +235,4 @@ class RPCPart1Test(BaseRPCTest, unittest.TestCase):
if __name__ == '__main__':
logging.basicConfig()
unittest.main(verbosity=2)
unittest.main(verbosity=3)

@ -22,7 +22,6 @@ import unittest
import grpc
from grpc.framework.foundation import logging_pool
from tests.unit import test_common
from tests.unit._rpc_test_helpers import BaseRPCTest
from tests.unit._rpc_test_helpers import Callback
from tests.unit._rpc_test_helpers import TIMEOUT_SHORT
@ -37,8 +36,6 @@ from tests.unit._rpc_test_helpers import unary_unary_multi_callable
from tests.unit.framework.common import test_constants
@unittest.skipIf(test_common.running_under_gevent(),
"Causes deadlock under gevent.")
class RPCPart2Test(BaseRPCTest, unittest.TestCase):
def testDefaultThreadPoolIsUsed(self):

@ -36,7 +36,7 @@ _STREAM_UNARY = '/test/StreamUnary'
_STREAM_STREAM = '/test/StreamStream'
_STREAM_STREAM_NON_BLOCKING = '/test/StreamStreamNonBlocking'
TIMEOUT_SHORT = datetime.timedelta(seconds=1).total_seconds()
TIMEOUT_SHORT = datetime.timedelta(seconds=4).total_seconds()
class Callback(object):

@ -108,7 +108,19 @@ if __name__ == '__main__':
parser.add_argument('--exception',
help='Whether the signal throws an exception',
action='store_true')
parser.add_argument('--gevent',
help='Whether to run under gevent.',
action='store_true')
args = parser.parse_args()
if args.gevent:
from gevent import monkey
import gevent.util
monkey.patch_all()
import grpc.experimental.gevent
grpc.experimental.gevent.init_gevent()
if args.arity == 'unary' and not args.exception:
main_unary(args.server)
elif args.arity == 'streaming' and not args.exception:

@ -41,6 +41,10 @@ else:
_HOST = 'localhost'
# The gevent test harness cannot run the monkeypatch code for the child process,
# so we need to instrument it manually.
_GEVENT_ARG = ("--gevent",) if test_common.running_under_gevent() else ()
class _GenericHandler(grpc.GenericRpcHandler):
@ -142,8 +146,8 @@ class SignalHandlingTest(unittest.TestCase):
server_target = '{}:{}'.format(_HOST, self._port)
with tempfile.TemporaryFile(mode='r') as client_stdout:
with tempfile.TemporaryFile(mode='r') as client_stderr:
client = _start_client((server_target, 'unary'), client_stdout,
client_stderr)
client = _start_client((server_target, 'unary') + _GEVENT_ARG,
client_stdout, client_stderr)
self._handler.await_connected_client()
client.send_signal(signal.SIGINT)
self.assertFalse(client.wait(), msg=_read_stream(client_stderr))
@ -157,8 +161,9 @@ class SignalHandlingTest(unittest.TestCase):
server_target = '{}:{}'.format(_HOST, self._port)
with tempfile.TemporaryFile(mode='r') as client_stdout:
with tempfile.TemporaryFile(mode='r') as client_stderr:
client = _start_client((server_target, 'streaming'),
client_stdout, client_stderr)
client = _start_client(
(server_target, 'streaming') + _GEVENT_ARG, client_stdout,
client_stderr)
self._handler.await_connected_client()
client.send_signal(signal.SIGINT)
self.assertFalse(client.wait(), msg=_read_stream(client_stderr))
@ -171,8 +176,9 @@ class SignalHandlingTest(unittest.TestCase):
server_target = '{}:{}'.format(_HOST, self._port)
with tempfile.TemporaryFile(mode='r') as client_stdout:
with tempfile.TemporaryFile(mode='r') as client_stderr:
client = _start_client(('--exception', server_target, 'unary'),
client_stdout, client_stderr)
client = _start_client(
('--exception', server_target, 'unary') + _GEVENT_ARG,
client_stdout, client_stderr)
self._handler.await_connected_client()
client.send_signal(signal.SIGINT)
client.wait()
@ -184,8 +190,8 @@ class SignalHandlingTest(unittest.TestCase):
with tempfile.TemporaryFile(mode='r') as client_stdout:
with tempfile.TemporaryFile(mode='r') as client_stderr:
client = _start_client(
('--exception', server_target, 'streaming'), client_stdout,
client_stderr)
('--exception', server_target, 'streaming') + _GEVENT_ARG,
client_stdout, client_stderr)
self._handler.await_connected_client()
client.send_signal(signal.SIGINT)
client.wait()

Loading…
Cancel
Save