From 9558460b77d12deca7db0ecb2a43bf87a846baba Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Thu, 16 Apr 2015 12:53:07 -0700 Subject: [PATCH] Adds a #run_till_terminated func to RpcServer - adds signal handling to RpcServer - ensures that very little behaviour occurs in the signal handling context - adds #run_till_terminated that launches a server in a thread and then handles signals that can be used to stop it. --- src/ruby/.rubocop_todo.yml | 8 +-- src/ruby/lib/grpc/generic/rpc_server.rb | 71 ++++++++++++++++++++++--- 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml index b4d66c517c2..02136a81a91 100644 --- a/src/ruby/.rubocop_todo.yml +++ b/src/ruby/.rubocop_todo.yml @@ -1,18 +1,18 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2015-04-15 18:43:23 -0700 using RuboCop version 0.30.0. +# on 2015-04-16 12:30:09 -0700 using RuboCop version 0.30.0. # The point is for the user to remove these configuration records # one by one as the offenses are removed from the code base. # Note that changes in the inspected code, or installation of new # versions of RuboCop, may require this file to be generated again. -# Offense count: 32 +# Offense count: 34 Metrics/AbcSize: Max: 36 # Offense count: 3 # Configuration parameters: CountComments. Metrics/ClassLength: - Max: 183 + Max: 185 # Offense count: 35 # Configuration parameters: CountComments. @@ -24,7 +24,7 @@ Metrics/MethodLength: Metrics/ParameterLists: Max: 8 -# Offense count: 6 +# Offense count: 9 # Configuration parameters: AllowedVariables. Style/GlobalVars: Enabled: false diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 30a4bf15325..fa21d770806 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -33,6 +33,9 @@ require 'grpc/generic/service' require 'thread' require 'xray/thread_dump_signal_handler' +# A global that contains signals the gRPC servers should respond to. +$grpc_signals = [] + # GRPC contains the General RPC module. module GRPC # RpcServer hosts a number of services and makes them available on the @@ -50,6 +53,23 @@ module GRPC # Default max_waiting_requests size is 20 DEFAULT_MAX_WAITING_REQUESTS = 20 + # Default poll period is 1s + DEFAULT_POLL_PERIOD = 1 + + # Signal check period is 0.25s + SIGNAL_CHECK_PERIOD = 0.25 + + # Sets up a signal handler that adds signals to the signal handling global. + # + # Signal handlers should do as little as humanly possible. + # Here, they just add themselves to $grpc_signals + # + # RpcServer (and later other parts of gRPC) monitors the signals + # $grpc_signals in its own non-signal context. + def self.trap_signals + %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } + end + # Creates a new RpcServer. # # The RPC server is configured using keyword arguments. @@ -79,7 +99,7 @@ module GRPC # with not available to new requests def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, - poll_period:INFINITE_FUTURE, + poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, **kw) @@ -117,6 +137,13 @@ module GRPC return unless @running @stopped = true @pool.stop + + # TODO: uncomment this: + # + # This segfaults in the c layer, so its commented out for now. Shutdown + # still occurs, but the c layer has to do the cleanup. + # + # @server.close end # determines if the server is currently running @@ -139,7 +166,37 @@ module GRPC running? end - # determines if the server is currently stopped + # Runs the server in its own thread, then waits for signal INT or TERM on + # the current thread to terminate it. + def run_till_terminated + self.class.trap_signals + t = Thread.new { run } + wait_till_running + loop do + sleep SIGNAL_CHECK_PERIOD + break unless handle_signals + end + stop + t.join + end + + # Handles the signals in $grpc_signals. + # + # @return false if the server should exit, true if not. + def handle_signals + loop do + sig = $grpc_signals.shift + case sig + when 'INT' + return false + when 'TERM' + return false + end + end + true + end + + # Determines if the server is currently stopped def stopped? @stopped ||= false end @@ -265,7 +322,10 @@ module GRPC # Pool is a simple thread pool for running server requests. class Pool - def initialize(size) + # Default keep alive period is 1s + DEFAULT_KEEP_ALIVE = 1 + + def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) fail 'pool size must be positive' unless size > 0 @jobs = Queue.new @size = size @@ -273,6 +333,7 @@ module GRPC @stop_mutex = Mutex.new @stop_cond = ConditionVariable.new @workers = [] + @keep_alive = keep_alive end # Returns the number of jobs waiting @@ -325,7 +386,6 @@ module GRPC @workers.size.times { schedule { throw :exit } } @stopped = true - # TODO: allow configuration of the keepalive period keep_alive = 5 @stop_mutex.synchronize do @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0 @@ -333,7 +393,7 @@ module GRPC # Forcibly shutdown any threads that are still alive. if @workers.size > 0 - logger.warn("forcibly terminating #{@workers.size} worker(s)") + logger.info("forcibly terminating #{@workers.size} worker(s)") @workers.each do |t| next unless t.alive? begin @@ -344,7 +404,6 @@ module GRPC end end end - logger.info('stopped, all workers are shutdown') end end