Merge pull request #6873 from kpayson64/python_signal_handling

Added check for signals in poll()
pull/6908/head
Jan Tattermusch 9 years ago committed by GitHub
commit fd1419a176
  1. 3
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
  2. 59
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
  3. 6
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  4. 2
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi

@ -31,9 +31,6 @@
cdef class CompletionQueue: cdef class CompletionQueue:
cdef grpc_completion_queue *c_completion_queue cdef grpc_completion_queue *c_completion_queue
cdef object pluck_condition
cdef int num_plucking
cdef int num_polling
cdef bint is_shutting_down cdef bint is_shutting_down
cdef bint is_shutdown cdef bint is_shutdown

@ -32,6 +32,8 @@ cimport cpython
import threading import threading
import time import time
cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
cdef class CompletionQueue: cdef class CompletionQueue:
@ -40,9 +42,6 @@ cdef class CompletionQueue:
self.c_completion_queue = grpc_completion_queue_create(NULL) self.c_completion_queue = grpc_completion_queue_create(NULL)
self.is_shutting_down = False self.is_shutting_down = False
self.is_shutdown = False self.is_shutdown = False
self.pluck_condition = threading.Condition()
self.num_plucking = 0
self.num_polling = 0
cdef _interpret_event(self, grpc_event event): cdef _interpret_event(self, grpc_event event):
cdef OperationTag tag = None cdef OperationTag tag = None
@ -83,45 +82,27 @@ cdef class CompletionQueue:
def poll(self, Timespec deadline=None): def poll(self, Timespec deadline=None):
# We name this 'poll' to avoid problems with CPython's expectations for # We name this 'poll' to avoid problems with CPython's expectations for
# 'special' methods (like next and __next__). # 'special' methods (like next and __next__).
cdef gpr_timespec c_increment
cdef gpr_timespec c_timeout
cdef gpr_timespec c_deadline cdef gpr_timespec c_deadline
with nogil: with nogil:
c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
if deadline is not None: if deadline is not None:
c_deadline = deadline.c_time c_deadline = deadline.c_time
cdef grpc_event event
while True:
# Poll within a critical section to detect contention c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
with self.pluck_condition: if gpr_time_cmp(c_timeout, c_deadline) > 0:
assert self.num_plucking == 0, 'cannot simultaneously pluck and poll' c_timeout = c_deadline
self.num_polling += 1 event = grpc_completion_queue_next(
with nogil: self.c_completion_queue, c_timeout, NULL)
event = grpc_completion_queue_next( if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0:
self.c_completion_queue, c_deadline, NULL) break;
with self.pluck_condition:
self.num_polling -= 1 # Handle any signals
return self._interpret_event(event) with gil:
cpython.PyErr_CheckSignals()
def pluck(self, OperationTag tag, Timespec deadline=None):
# Plucking a 'None' tag is equivalent to passing control to GRPC core until
# the deadline.
cdef gpr_timespec c_deadline = gpr_inf_future(
GPR_CLOCK_REALTIME)
if deadline is not None:
c_deadline = deadline.c_time
cdef grpc_event event
# Pluck within a critical section to detect contention
with self.pluck_condition:
assert self.num_polling == 0, 'cannot simultaneously pluck and poll'
assert self.num_plucking < GRPC_MAX_COMPLETION_QUEUE_PLUCKERS, (
'cannot pluck more than {} times simultaneously'.format(
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS))
self.num_plucking += 1
with nogil:
event = grpc_completion_queue_pluck(
self.c_completion_queue, <cpython.PyObject *>tag, c_deadline, NULL)
with self.pluck_condition:
self.num_plucking -= 1
return self._interpret_event(event) return self._interpret_event(event)
def shutdown(self): def shutdown(self):

@ -80,6 +80,12 @@ cdef extern from "grpc/_cython/loader.h":
gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_timespec gpr_convert_clock_type(gpr_timespec t,
gpr_clock_type target_clock) nogil gpr_clock_type target_clock) nogil
gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type type) nogil
gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) nogil
int gpr_time_cmp(gpr_timespec a, gpr_timespec b) nogil
ctypedef enum grpc_status_code: ctypedef enum grpc_status_code:
GRPC_STATUS_OK GRPC_STATUS_OK
GRPC_STATUS_CANCELLED GRPC_STATUS_CANCELLED

@ -99,7 +99,7 @@ cdef class Server:
with nogil: with nogil:
grpc_server_start(self.c_server) grpc_server_start(self.c_server)
# Ensure the core has gotten a chance to do the start-up work # Ensure the core has gotten a chance to do the start-up work
self.backup_shutdown_queue.pluck(None, Timespec(None)) self.backup_shutdown_queue.poll(Timespec(None))
def add_http2_port(self, address, def add_http2_port(self, address,
ServerCredentials server_credentials=None): ServerCredentials server_credentials=None):

Loading…
Cancel
Save