Made signal handling properly handle non-killing signals

pull/6611/head
murgatroid99 9 years ago
parent 76733cf196
commit c0ecedba83
  1. 8
      src/ruby/ext/grpc/rb_call.c
  2. 62
      src/ruby/ext/grpc/rb_completion_queue.c
  3. 4
      src/ruby/ext/grpc/rb_completion_queue.h
  4. 12
      src/ruby/ext/grpc/rb_server.c

@ -722,10 +722,6 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
return result;
}
static void run_batch_unblock_func(void *call) {
grpc_call_cancel((grpc_call*)call, NULL);
}
/* call-seq:
cq = CompletionQueue.new
ops = {
@ -776,9 +772,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
grpc_call_error_detail_of(err), err);
return Qnil;
}
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout,
run_batch_unblock_func,
(void*)call);
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");

@ -52,21 +52,47 @@ typedef struct next_call_stack {
grpc_event event;
gpr_timespec timeout;
void *tag;
volatile int interrupted;
} next_call_stack;
/* Calls grpc_completion_queue_next without holding the ruby GIL */
static void *grpc_rb_completion_queue_next_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
next_call->event =
grpc_completion_queue_next(next_call->cq, next_call->timeout, NULL);
gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
gpr_timespec deadline;
do {
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
if (gpr_time_cmp(deadline, next_call->timeout) > 0) {
// Then we have run out of time
break;
}
next_call->event = grpc_completion_queue_next(next_call->cq,
deadline, NULL);
if (next_call->event.success) {
break;
}
} while (!next_call->interrupted);
return NULL;
}
/* Calls grpc_completion_queue_pluck without holding the ruby GIL */
static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag,
next_call->timeout, NULL);
gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
gpr_timespec deadline;
do {
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
if (gpr_time_cmp(deadline, next_call->timeout) > 0) {
// Then we have run out of time
break;
}
next_call->event = grpc_completion_queue_pluck(next_call->cq,
next_call->tag,
deadline, NULL);
if (next_call->event.type != GRPC_QUEUE_TIMEOUT) {
break;
}
} while (!next_call->interrupted);
return NULL;
}
@ -139,12 +165,15 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq);
}
static void unblock_func(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
next_call->interrupted = 1;
}
/* Blocks until the next event for given tag is available, and returns the
* event. */
grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
VALUE timeout,
rb_unblock_function_t *ubf,
void *unblock_arg) {
VALUE timeout) {
next_call_stack next_call;
MEMZERO(&next_call, next_call_stack, 1);
TypedData_Get_Struct(self, grpc_completion_queue,
@ -160,8 +189,23 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
next_call.tag = ROBJECT(tag);
}
next_call.event.type = GRPC_QUEUE_TIMEOUT;
rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
(void *)&next_call, ubf, unblock_arg);
/* Loop until we finish a pluck without an interruption. The internal
pluck function runs either until it is interrupted or it gets an
event, or time runs out.
The basic reason we need this relatively complicated construction is that
we need to re-acquire the GVL when an interrupt comes in, so that the ruby
interpeter can do what it needs to do with the interrupt. But we also need
to get back to plucking when */
do {
next_call.interrupted = 0;
rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
(void *)&next_call, unblock_func,
(void *)&next_call);
/* If an interrupt prevented pluck from returning useful information, then
any plucks that did complete must have timed out */
} while (next_call.interrupted &&
next_call.event.type == GRPC_QUEUE_TIMEOUT);
return next_call.event;
}

@ -47,9 +47,7 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
* This avoids having code that holds the GIL repeated at multiple sites.
*/
grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
VALUE timeout,
rb_unblock_function_t *ubf,
void *unblock_arg);
VALUE timeout);
/* Initializes the CompletionQueue class. */
void Init_grpc_completion_queue();

@ -207,11 +207,6 @@ static void grpc_request_call_stack_cleanup(request_call_stack* st) {
grpc_call_details_destroy(&st->details);
}
static void request_call_unblock_func(void *ptr) {
grpc_rb_server *rb_srv = (grpc_rb_server*)ptr;
grpc_server_shutdown_and_notify(rb_srv->wrapped, rb_srv->queue, rb_srv);
}
/* call-seq:
cq = CompletionQueue.new
tag = Object.new
@ -249,9 +244,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
return Qnil;
}
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout,
request_call_unblock_func,
(void*)s);
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_request_call_stack_cleanup(&st);
return Qnil;
@ -314,8 +307,7 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) {
if (s->wrapped != NULL) {
grpc_server_shutdown_and_notify(s->wrapped, cq, NULL);
ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout,
NULL, NULL);
ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout);
if (!ev.success) {
rb_warn("server shutdown failed, cancelling the calls, objects may leak");
grpc_server_cancel_all_calls(s->wrapped);

Loading…
Cancel
Save