Merge pull request #19440 from grpc/revert-19356-i15880

Revert "Surface exceptions in gevent IO manager"
pull/19420/head^2
Lidi Zheng 6 years ago committed by GitHub
commit 51d6416691
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi
  2. 42
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi

@ -44,12 +44,12 @@ cdef extern from "src/core/lib/iomgr/resolve_address_custom.h":
pass
struct grpc_custom_resolver_vtable:
grpc_error* (*resolve)(char* host, char* port, grpc_resolved_addresses** res) except *
void (*resolve_async)(grpc_custom_resolver* resolver, char* host, char* port) except *
grpc_error* (*resolve)(char* host, char* port, grpc_resolved_addresses** res);
void (*resolve_async)(grpc_custom_resolver* resolver, char* host, char* port);
void grpc_custom_resolve_callback(grpc_custom_resolver* resolver,
grpc_resolved_addresses* result,
grpc_error* error)
grpc_error* error);
cdef extern from "src/core/lib/iomgr/tcp_custom.h":
struct grpc_custom_socket:
@ -67,25 +67,25 @@ cdef extern from "src/core/lib/iomgr/tcp_custom.h":
ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket)
struct grpc_socket_vtable:
grpc_error* (*init)(grpc_custom_socket* socket, int domain) except *
grpc_error* (*init)(grpc_custom_socket* socket, int domain);
void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t len, grpc_custom_connect_callback cb) except *
void (*destroy)(grpc_custom_socket* socket) except *
void (*shutdown)(grpc_custom_socket* socket) except *
void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb) except *
size_t len, grpc_custom_connect_callback cb);
void (*destroy)(grpc_custom_socket* socket);
void (*shutdown)(grpc_custom_socket* socket);
void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb);
void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices,
grpc_custom_write_callback cb) except *
grpc_custom_write_callback cb);
void (*read)(grpc_custom_socket* socket, char* buffer, size_t length,
grpc_custom_read_callback cb) except *
grpc_custom_read_callback cb);
grpc_error* (*getpeername)(grpc_custom_socket* socket,
const grpc_sockaddr* addr, int* len) except *
const grpc_sockaddr* addr, int* len);
grpc_error* (*getsockname)(grpc_custom_socket* socket,
const grpc_sockaddr* addr, int* len) except *
const grpc_sockaddr* addr, int* len);
grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t len, int flags) except *
grpc_error* (*listen)(grpc_custom_socket* socket) except *
size_t len, int flags);
grpc_error* (*listen)(grpc_custom_socket* socket);
void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client,
grpc_custom_accept_callback cb) except *
grpc_custom_accept_callback cb);
cdef extern from "src/core/lib/iomgr/timer_custom.h":
struct grpc_custom_timer:
@ -94,17 +94,17 @@ cdef extern from "src/core/lib/iomgr/timer_custom.h":
# We don't care about the rest of the fields
struct grpc_custom_timer_vtable:
void (*start)(grpc_custom_timer* t) except *
void (*stop)(grpc_custom_timer* t) except *
void (*start)(grpc_custom_timer* t);
void (*stop)(grpc_custom_timer* t);
void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error)
void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error);
cdef extern from "src/core/lib/iomgr/pollset_custom.h":
struct grpc_custom_poller_vtable:
void (*init)() except *
void (*poll)(size_t timeout_ms) except *
void (*kick)() except *
void (*shutdown)() except *
void (*init)()
void (*poll)(size_t timeout_ms)
void (*kick)()
void (*shutdown)()
cdef extern from "src/core/lib/iomgr/iomgr_custom.h":
void grpc_custom_iomgr_init(grpc_socket_vtable* socket,

@ -56,7 +56,7 @@ cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length):
c_addr.len = length
return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4'
cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups) except *:
cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups):
cdef grpc_resolved_addresses* addresses
tups_set = set((tup[4][0], tup[4][1]) for tup in tups)
addresses = <grpc_resolved_addresses*> malloc(sizeof(grpc_resolved_addresses))
@ -84,7 +84,7 @@ cdef class SocketWrapper:
self.c_buffer = NULL
self.len = 0
cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) except * with gil:
cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil:
sw = SocketWrapper()
sw.c_socket = socket
sw.sockopts = []
@ -112,7 +112,7 @@ def socket_connect_async(socket_wrapper, addr_tuple):
cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr,
size_t addr_len,
grpc_custom_connect_callback cb) except * with gil:
grpc_custom_connect_callback cb) with gil:
py_socket = None
socket_wrapper = <SocketWrapper>socket.impl
socket_wrapper.connect_cb = cb
@ -125,10 +125,10 @@ cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr,
socket_wrapper.socket = py_socket
_spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple)
cdef void socket_destroy(grpc_custom_socket* socket) except * with gil:
cdef void socket_destroy(grpc_custom_socket* socket) with gil:
cpython.Py_DECREF(<SocketWrapper>socket.impl)
cdef void socket_shutdown(grpc_custom_socket* socket) except * with gil:
cdef void socket_shutdown(grpc_custom_socket* socket) with gil:
try:
(<SocketWrapper>socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR)
except IOError as io_error:
@ -136,7 +136,7 @@ cdef void socket_shutdown(grpc_custom_socket* socket) except * with gil:
raise io_error
cdef void socket_close(grpc_custom_socket* socket,
grpc_custom_close_callback cb) except * with gil:
grpc_custom_close_callback cb) with gil:
socket_wrapper = (<SocketWrapper>socket.impl)
if socket_wrapper.socket is not None:
socket_wrapper.socket.close()
@ -176,7 +176,7 @@ def socket_write_async(socket_wrapper, write_bytes):
socket_write_async_cython(socket_wrapper, write_bytes)
cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer,
grpc_custom_write_callback cb) except * with gil:
grpc_custom_write_callback cb) with gil:
cdef char* start
sw = <SocketWrapper>socket.impl
sw.write_cb = cb
@ -204,7 +204,7 @@ def socket_read_async(socket_wrapper):
socket_read_async_cython(socket_wrapper)
cdef void socket_read(grpc_custom_socket* socket, char* buffer,
size_t length, grpc_custom_read_callback cb) except * with gil:
size_t length, grpc_custom_read_callback cb) with gil:
sw = <SocketWrapper>socket.impl
sw.read_cb = cb
sw.c_buffer = buffer
@ -213,7 +213,7 @@ cdef void socket_read(grpc_custom_socket* socket, char* buffer,
cdef grpc_error* socket_getpeername(grpc_custom_socket* socket,
const grpc_sockaddr* addr,
int* length) except * with gil:
int* length) with gil:
cdef char* src_buf
peer = (<SocketWrapper>socket.impl).socket.getpeername()
@ -226,7 +226,7 @@ cdef grpc_error* socket_getpeername(grpc_custom_socket* socket,
cdef grpc_error* socket_getsockname(grpc_custom_socket* socket,
const grpc_sockaddr* addr,
int* length) except * with gil:
int* length) with gil:
cdef char* src_buf
cdef grpc_resolved_address c_addr
if (<SocketWrapper>socket.impl).socket is None:
@ -245,7 +245,7 @@ def applysockopts(s):
cdef grpc_error* socket_bind(grpc_custom_socket* socket,
const grpc_sockaddr* addr,
size_t len, int flags) except * with gil:
size_t len, int flags) with gil:
addr_tuple = sockaddr_to_tuple(addr, len)
try:
try:
@ -262,7 +262,7 @@ cdef grpc_error* socket_bind(grpc_custom_socket* socket,
else:
return grpc_error_none()
cdef grpc_error* socket_listen(grpc_custom_socket* socket) except * with gil:
cdef grpc_error* socket_listen(grpc_custom_socket* socket) with gil:
(<SocketWrapper>socket.impl).socket.listen(50)
return grpc_error_none()
@ -292,7 +292,7 @@ def socket_accept_async(s):
accept_callback_cython(s)
cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client,
grpc_custom_accept_callback cb) except * with gil:
grpc_custom_accept_callback cb) with gil:
sw = <SocketWrapper>socket.impl
sw.accepting_socket = client
sw.accept_cb = cb
@ -322,7 +322,7 @@ cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper):
def socket_resolve_async_python(resolve_wrapper):
socket_resolve_async_cython(resolve_wrapper)
cdef void socket_resolve_async(grpc_custom_resolver* r, char* host, char* port) except * with gil:
cdef void socket_resolve_async(grpc_custom_resolver* r, char* host, char* port) with gil:
rw = ResolveWrapper()
rw.c_resolver = r
rw.c_host = host
@ -330,7 +330,7 @@ cdef void socket_resolve_async(grpc_custom_resolver* r, char* host, char* port)
_spawn_greenlet(socket_resolve_async_python, rw)
cdef grpc_error* socket_resolve(char* host, char* port,
grpc_resolved_addresses** res) except * with gil:
grpc_resolved_addresses** res) with gil:
try:
result = gevent_socket.getaddrinfo(host, port)
res[0] = tuples_to_resolvaddr(result)
@ -360,13 +360,13 @@ cdef class TimerWrapper:
self.event.set()
self.timer.stop()
cdef void timer_start(grpc_custom_timer* t) except * with gil:
cdef void timer_start(grpc_custom_timer* t) with gil:
timer = TimerWrapper(t.timeout_ms / 1000.0)
timer.c_timer = t
t.timer = <void*>timer
timer.start()
cdef void timer_stop(grpc_custom_timer* t) except * with gil:
cdef void timer_stop(grpc_custom_timer* t) with gil:
time_wrapper = <object>t.timer
time_wrapper.stop()
@ -374,16 +374,16 @@ cdef void timer_stop(grpc_custom_timer* t) except * with gil:
### pollset implementation ###
###############################
cdef void init_loop() except * with gil:
cdef void init_loop() with gil:
pass
cdef void destroy_loop() except * with gil:
cdef void destroy_loop() with gil:
g_pool.join()
cdef void kick_loop() except * with gil:
cdef void kick_loop() with gil:
g_event.set()
cdef void run_loop(size_t timeout_ms) except * with gil:
cdef void run_loop(size_t timeout_ms) with gil:
timeout = timeout_ms / 1000.0
if timeout_ms > 0:
g_event.wait(timeout)

Loading…
Cancel
Save