@ -33,6 +33,9 @@ require 'grpc/generic/service'
require 'thread'
require 'xray/thread_dump_signal_handler'
# A global that contains signals the gRPC servers should respond to.
$grpc_signals = [ ]
# GRPC contains the General RPC module.
module GRPC
# RpcServer hosts a number of services and makes them available on the
@ -50,6 +53,23 @@ module GRPC
# Default max_waiting_requests size is 20
DEFAULT_MAX_WAITING_REQUESTS = 20
# Default poll period is 1s
DEFAULT_POLL_PERIOD = 1
# Signal check period is 0.25s
SIGNAL_CHECK_PERIOD = 0 . 25
# Sets up a signal handler that adds signals to the signal handling global.
#
# Signal handlers should do as little as humanly possible.
# Here, they just add themselves to $grpc_signals
#
# RpcServer (and later other parts of gRPC) monitors the signals
# $grpc_signals in its own non-signal context.
def self . trap_signals
%w( INT TERM ) . each { | sig | trap ( sig ) { $grpc_signals << sig } }
end
# Creates a new RpcServer.
#
# The RPC server is configured using keyword arguments.
@ -79,7 +99,7 @@ module GRPC
# with not available to new requests
def initialize ( pool_size : DEFAULT_POOL_SIZE ,
max_waiting_requests : DEFAULT_MAX_WAITING_REQUESTS ,
poll_period : INFINITE_FUTURE ,
poll_period : DEFAULT_POLL_PERIOD ,
completion_queue_override : nil ,
server_override : nil ,
** kw )
@ -117,6 +137,13 @@ module GRPC
return unless @running
@stopped = true
@pool . stop
# TODO: uncomment this:
#
# This segfaults in the c layer, so its commented out for now. Shutdown
# still occurs, but the c layer has to do the cleanup.
#
# @server.close
end
# determines if the server is currently running
@ -139,7 +166,37 @@ module GRPC
running?
end
# determines if the server is currently stopped
# Runs the server in its own thread, then waits for signal INT or TERM on
# the current thread to terminate it.
def run_till_terminated
self . class . trap_signals
t = Thread . new { run }
wait_till_running
loop do
sleep SIGNAL_CHECK_PERIOD
break unless handle_signals
end
stop
t . join
end
# Handles the signals in $grpc_signals.
#
# @return false if the server should exit, true if not.
def handle_signals
loop do
sig = $grpc_signals . shift
case sig
when 'INT'
return false
when 'TERM'
return false
end
end
true
end
# Determines if the server is currently stopped
def stopped?
@stopped || = false
end
@ -265,7 +322,10 @@ module GRPC
# Pool is a simple thread pool for running server requests.
class Pool
def initialize ( size )
# Default keep alive period is 1s
DEFAULT_KEEP_ALIVE = 1
def initialize ( size , keep_alive : DEFAULT_KEEP_ALIVE )
fail 'pool size must be positive' unless size > 0
@jobs = Queue . new
@size = size
@ -273,6 +333,7 @@ module GRPC
@stop_mutex = Mutex . new
@stop_cond = ConditionVariable . new
@workers = [ ]
@keep_alive = keep_alive
end
# Returns the number of jobs waiting
@ -325,7 +386,6 @@ module GRPC
@workers . size . times { schedule { throw :exit } }
@stopped = true
# TODO: allow configuration of the keepalive period
keep_alive = 5
@stop_mutex . synchronize do
@stop_cond . wait ( @stop_mutex , keep_alive ) if @workers . size > 0
@ -333,7 +393,7 @@ module GRPC
# Forcibly shutdown any threads that are still alive.
if @workers . size > 0
logger . warn ( " forcibly terminating #{ @workers . size } worker(s) " )
logger . info ( " forcibly terminating #{ @workers . size } worker(s) " )
@workers . each do | t |
next unless t . alive?
begin
@ -344,7 +404,6 @@ module GRPC
end
end
end
logger . info ( 'stopped, all workers are shutdown' )
end
end