Refactor: Move the Pool out from RpcServer

pull/1309/head
Tim Emiola 10 years ago
parent bf6d78c8e4
commit e6be7f31e4
  1. 184
      src/ruby/lib/grpc/generic/rpc_server.rb
  2. 4
      src/ruby/spec/generic/rpc_server_pool_spec.rb

@ -38,6 +38,103 @@ $grpc_signals = []
# GRPC contains the General RPC module.
module GRPC
# Pool is a simple thread pool.
class Pool
# Default keep alive period is 1s
DEFAULT_KEEP_ALIVE = 1
def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
fail 'pool size must be positive' unless size > 0
@jobs = Queue.new
@size = size
@stopped = false
@stop_mutex = Mutex.new
@stop_cond = ConditionVariable.new
@workers = []
@keep_alive = keep_alive
end
# Returns the number of jobs waiting
def jobs_waiting
@jobs.size
end
# Runs the given block on the queue with the provided args.
#
# @param args the args passed blk when it is called
# @param blk the block to call
def schedule(*args, &blk)
fail 'already stopped' if @stopped
return if blk.nil?
logger.info('schedule another job')
@jobs << [blk, args]
end
# Starts running the jobs in the thread pool.
def start
fail 'already stopped' if @stopped
until @workers.size == @size.to_i
next_thread = Thread.new do
catch(:exit) do # allows { throw :exit } to kill a thread
loop_execute_jobs
end
remove_current_thread
end
@workers << next_thread
end
end
# Stops the jobs in the pool
def stop
logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
@stopped = true
@stop_mutex.synchronize do # wait @keep_alive for works to stop
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
logger.info('stopped, all workers are shutdown')
end
protected
# Forcibly shutdown any threads that are still alive.
def forcibly_stop_workers
return unless @workers.size > 0
logger.info("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
t.exit
rescue StandardError => e
logger.warn('error while terminating a worker')
logger.warn(e)
end
end
end
# removes the threads from workers, and signal when all the
# threads are complete.
def remove_current_thread
@stop_mutex.synchronize do
@workers.delete(Thread.current)
@stop_cond.signal if @workers.size == 0
end
end
def loop_execute_jobs
loop do
begin
blk, args = @jobs.pop
blk.call(*args)
rescue StandardError => e
logger.warn('Error in worker thread')
logger.warn(e)
end
end
end
end
# RpcServer hosts a number of services and makes them available on the
# network.
class RpcServer
@ -320,93 +417,6 @@ module GRPC
an_rpc.deadline)
end
# Pool is a simple thread pool for running server requests.
class Pool
# Default keep alive period is 1s
DEFAULT_KEEP_ALIVE = 1
def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
fail 'pool size must be positive' unless size > 0
@jobs = Queue.new
@size = size
@stopped = false
@stop_mutex = Mutex.new
@stop_cond = ConditionVariable.new
@workers = []
@keep_alive = keep_alive
end
# Returns the number of jobs waiting
def jobs_waiting
@jobs.size
end
# Runs the given block on the queue with the provided args.
#
# @param args the args passed blk when it is called
# @param blk the block to call
def schedule(*args, &blk)
fail 'already stopped' if @stopped
return if blk.nil?
logger.info('schedule another job')
@jobs << [blk, args]
end
# Starts running the jobs in the thread pool.
def start
fail 'already stopped' if @stopped
until @workers.size == @size.to_i
next_thread = Thread.new do
catch(:exit) do # allows { throw :exit } to kill a thread
loop do
begin
blk, args = @jobs.pop
blk.call(*args)
rescue StandardError => e
logger.warn('Error in worker thread')
logger.warn(e)
end
end
end
# removes the threads from workers, and signal when all the
# threads are complete.
@stop_mutex.synchronize do
@workers.delete(Thread.current)
@stop_cond.signal if @workers.size == 0
end
end
@workers << next_thread
end
end
# Stops the jobs in the pool
def stop
logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
@stopped = true
@stop_mutex.synchronize do
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
# Forcibly shutdown any threads that are still alive.
if @workers.size > 0
logger.info("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
t.exit
rescue StandardError => e
logger.warn('error while terminating a worker')
logger.warn(e)
end
end
end
logger.info('stopped, all workers are shutdown')
end
end
protected
def rpc_descs

@ -30,9 +30,9 @@
require 'grpc'
require 'xray/thread_dump_signal_handler'
Pool = GRPC::RpcServer::Pool
describe GRPC::Pool do
Pool = GRPC::Pool
describe Pool do
describe '#new' do
it 'raises if a non-positive size is used' do
expect { Pool.new(0) }.to raise_error

Loading…
Cancel
Save