Handle signals properly when dropping GVL

pull/6611/head
murgatroid99 9 years ago
parent 1ba1bba66a
commit d595fb6557
  1. 8
      src/ruby/ext/grpc/rb_call.c
  2. 6
      src/ruby/ext/grpc/rb_completion_queue.c
  3. 6
      src/ruby/ext/grpc/rb_completion_queue.h
  4. 2
      src/ruby/ext/grpc/rb_grpc.c
  5. 14
      src/ruby/ext/grpc/rb_server.c
  6. 70
      src/ruby/ext/grpc/rb_signal.c
  7. 39
      src/ruby/ext/grpc/rb_signal.h
  8. 1
      src/ruby/lib/grpc.rb
  9. 69
      src/ruby/lib/grpc/signals.rb

@ -722,6 +722,10 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
return result;
}
static void run_batch_unblock_func(void *call) {
grpc_call_cancel((grpc_call*)call, NULL);
}
/* call-seq:
cq = CompletionQueue.new
ops = {
@ -772,7 +776,9 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
grpc_call_error_detail_of(err), err);
return Qnil;
}
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout,
run_batch_unblock_func,
(void*)call);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");

@ -142,7 +142,9 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
/* Blocks until the next event for given tag is available, and returns the
* event. */
grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
VALUE timeout) {
VALUE timeout,
rb_unblock_function_t *ubf,
void *unblock_arg) {
next_call_stack next_call;
MEMZERO(&next_call, next_call_stack, 1);
TypedData_Get_Struct(self, grpc_completion_queue,
@ -159,7 +161,7 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
}
next_call.event.type = GRPC_QUEUE_TIMEOUT;
rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
(void *)&next_call, NULL, NULL);
(void *)&next_call, ubf, unblock_arg);
return next_call.event;
}

@ -46,8 +46,10 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
*
* This avoids having code that holds the GIL repeated at multiple sites.
*/
grpc_event grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag,
VALUE timeout);
grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
VALUE timeout,
rb_unblock_function_t *ubf,
void *unblock_arg);
/* Initializes the CompletionQueue class. */
void Init_grpc_completion_queue();

@ -50,7 +50,6 @@
#include "rb_loader.h"
#include "rb_server.h"
#include "rb_server_credentials.h"
#include "rb_signal.h"
static VALUE grpc_rb_cTimeVal = Qnil;
@ -333,7 +332,6 @@ void Init_grpc_c() {
Init_grpc_channel_credentials();
Init_grpc_server();
Init_grpc_server_credentials();
Init_grpc_signals();
Init_grpc_status_codes();
Init_grpc_time_consts();
}

@ -60,6 +60,7 @@ typedef struct grpc_rb_server {
VALUE mark;
/* The actual server */
grpc_server *wrapped;
grpc_completion_queue *queue;
} grpc_rb_server;
/* Destroys server instances. */
@ -145,6 +146,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
}
grpc_server_register_completion_queue(srv, cq, NULL);
wrapper->wrapped = srv;
wrapper->queue = cq;
/* Add the cq as the server's mark object. This ensures the ruby cq can't be
GCed before the server */
@ -205,6 +207,11 @@ static void grpc_request_call_stack_cleanup(request_call_stack* st) {
grpc_call_details_destroy(&st->details);
}
static void request_call_unblock_func(void *ptr) {
grpc_rb_server *rb_srv = (grpc_rb_server*)ptr;
grpc_server_shutdown_and_notify(rb_srv->wrapped, rb_srv->queue, rb_srv);
}
/* call-seq:
cq = CompletionQueue.new
tag = Object.new
@ -242,7 +249,9 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
return Qnil;
}
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout);
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout,
request_call_unblock_func,
(void*)s);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_request_call_stack_cleanup(&st);
return Qnil;
@ -305,7 +314,8 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) {
if (s->wrapped != NULL) {
grpc_server_shutdown_and_notify(s->wrapped, cq, NULL);
ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout);
ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout,
NULL, NULL);
if (!ev.success) {
rb_warn("server shutdown failed, cancelling the calls, objects may leak");
grpc_server_cancel_all_calls(s->wrapped);

@ -1,70 +0,0 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <ruby/ruby.h>
#include <signal.h>
#include <stdbool.h>
#include <grpc/support/log.h>
#include "rb_grpc.h"
static void (*old_sigint_handler)(int);
static void (*old_sigterm_handler)(int);
static volatile bool signal_received = false;
/* This has to be handled at the C level instead of Ruby, because Ruby signal
* handlers are constrained to run in the main interpreter thread. If that main
* thread is blocked on grpc_completion_queue_pluck, the signal handlers will
* never run */
static void handle_signal(int signum) {
signal_received = true;
if (signum == SIGINT) {
old_sigint_handler(signum);
} else if (signum == SIGTERM) {
old_sigterm_handler(signum);
}
}
static VALUE grpc_rb_signal_received(VALUE self) {
(void)self;
return signal_received ? Qtrue : Qfalse;
}
void Init_grpc_signals() {
old_sigint_handler = signal(SIGINT, handle_signal);
old_sigterm_handler = signal(SIGTERM, handle_signal);
rb_define_singleton_method(grpc_rb_mGrpcCore, "signal_received?",
grpc_rb_signal_received, 0);
}

@ -1,39 +0,0 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_RB_SIGNAL_H_
#define GRPC_RB_SIGNAL_H_
void Init_grpc_signals();
#endif /* GRPC_RB_SIGNAL_H_ */

@ -33,7 +33,6 @@ require_relative 'grpc/errors'
require_relative 'grpc/grpc'
require_relative 'grpc/logconfig'
require_relative 'grpc/notifier'
require_relative 'grpc/signals'
require_relative 'grpc/version'
require_relative 'grpc/core/time_consts'
require_relative 'grpc/generic/active_call'

@ -1,69 +0,0 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'thread'
require_relative 'grpc'
# GRPC contains the General RPC module.
module GRPC
# Signals contains gRPC functions related to signal handling
module Signals
@interpreter_exiting = false
@signal_handlers = []
@handlers_mutex = Mutex.new
def register_handler(&handler)
@handlers_mutex.synchronize do
@signal_handlers.push(handler)
handler.call if @exit_signal_received
end
# Returns a function to remove the handler
lambda do
@handlers_mutex.synchronize { @signal_handlers.delete(handler) }
end
end
module_function :register_handler
def wait_for_signals
t = Thread.new do
sleep 0.1 until GRPC::Core.signal_received? || @interpreter_exiting
unless @interpreter_exiting
@handlers_mutex.synchronize do
@signal_handlers.each(&:call)
end
end
end
at_exit do
@interpreter_exiting = true
t.join
end
end
module_function :wait_for_signals
end
end
Loading…
Cancel
Save