use thread pool from v1.0.x but keep server call life cycle in master

pull/8825/head
Alexander Polcyn 8 years ago committed by Jan Tattermusch
parent 8613e474cb
commit cdd4203042
  1. 1
      grpc.gemspec
  2. 155
      src/ruby/lib/grpc/generic/rpc_server.rb

@ -29,7 +29,6 @@ Gem::Specification.new do |s|
s.add_dependency 'google-protobuf', '~> 3.0.2'
s.add_dependency 'googleauth', '~> 0.5.1'
s.add_dependency 'concurrent-ruby'
s.add_development_dependency 'bundler', '~> 1.9'
s.add_development_dependency 'facter', '~> 2.4'

@ -31,10 +31,132 @@ require_relative '../grpc'
require_relative 'active_call'
require_relative 'service'
require 'thread'
require 'concurrent'
# GRPC contains the General RPC module.
module GRPC
# Pool is a simple thread pool.
class Pool
# Default keep alive period is 1s
DEFAULT_KEEP_ALIVE = 1
def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
fail 'pool size must be positive' unless size > 0
@jobs = Queue.new
@size = size
@stopped = false
@stop_mutex = Mutex.new # needs to be held when accessing @stopped
@stop_cond = ConditionVariable.new
@workers = []
@keep_alive = keep_alive
# Each worker thread has its own queue to push and pull jobs
# these queues are put into @ready_queues when that worker is idle
@ready_workers = Queue.new
end
# Returns the number of jobs waiting
def jobs_waiting
@jobs.size
end
def ready_for_work?
# Busy worker threads are either doing work, or have a single job
# waiting on them. Workers that are idle with no jobs waiting
# have their "queues" in @ready_workers
!@ready_workers.empty?
end
# Runs the given block on the queue with the provided args.
#
# @param args the args passed blk when it is called
# @param blk the block to call
def schedule(*args, &blk)
return if blk.nil?
@stop_mutex.synchronize do
if @stopped
GRPC.logger.warn('did not schedule job, already stopped')
return
end
GRPC.logger.info('schedule another job')
fail 'No worker threads available' if @ready_workers.empty?
worker_queue = @ready_workers.pop
fail 'worker already has a task waiting' unless worker_queue.empty?
worker_queue << [blk, args]
end
end
# Starts running the jobs in the thread pool.
def start
@stop_mutex.synchronize do
fail 'already stopped' if @stopped
end
until @workers.size == @size.to_i
new_worker_queue = Queue.new
@ready_workers << new_worker_queue
next_thread = Thread.new(new_worker_queue) do |jobs|
catch(:exit) do # allows { throw :exit } to kill a thread
loop_execute_jobs(jobs)
end
remove_current_thread
end
@workers << next_thread
end
end
# Stops the jobs in the pool
def stop
GRPC.logger.info('stopping, will wait for all the workers to exit')
schedule { throw :exit } while ready_for_work?
@stop_mutex.synchronize do # wait @keep_alive for works to stop
@stopped = true
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
GRPC.logger.info('stopped, all workers are shutdown')
end
protected
# Forcibly shutdown any threads that are still alive.
def forcibly_stop_workers
return unless @workers.size > 0
GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
t.exit
rescue StandardError => e
GRPC.logger.warn('error while terminating a worker')
GRPC.logger.warn(e)
end
end
end
# removes the threads from workers, and signal when all the
# threads are complete.
def remove_current_thread
@stop_mutex.synchronize do
@workers.delete(Thread.current)
@stop_cond.signal if @workers.size.zero?
end
end
def loop_execute_jobs(worker_queue)
loop do
begin
blk, args = worker_queue.pop
blk.call(*args)
rescue StandardError => e
GRPC.logger.warn('Error in worker thread')
GRPC.logger.warn(e)
end
# there shouldn't be any work given to this thread while its busy
fail('received a task while busy') unless worker_queue.empty?
@ready_workers << worker_queue
end
end
end
# RpcServer hosts a number of services and makes them available on the
# network.
@ -45,17 +167,13 @@ module GRPC
def_delegators :@server, :add_http2_port
# Default max size of the thread pool size is 100
DEFAULT_MAX_POOL_SIZE = 100
# Default minimum size of the thread pool is 5
DEFAULT_MIN_POOL_SIZE = 5
# Default thread pool size is 30
DEFAULT_POOL_SIZE = 30
# Deprecated due to internal changes to the thread pool
DEFAULT_MAX_WAITING_REQUESTS = 20
# Default poll period is 1s
# Used for grpc server shutdown and thread pool shutdown timeouts
DEFAULT_POLL_PERIOD = 1
# Signal check period is 0.25s
@ -76,9 +194,9 @@ module GRPC
# There are some specific keyword args used to configure the RpcServer
# instance.
#
# * pool_size: the maximum size of the thread pool that the server's
# thread pool can reach. No more concurrent requests can be made than
# the size of the thread pool
# * pool_size: the size of the thread pool the server uses to run its
# threads. No more concurrent requests can be made than the size
# of the thread pool
#
# * max_waiting_requests: Deprecated due to internal changes to the thread
# pool. This is still an argument for compatibility but is ignored.
@ -93,8 +211,7 @@ module GRPC
#
# * server_args:
# A server arguments hash to be passed down to the underlying core server
def initialize(pool_size:DEFAULT_MAX_POOL_SIZE,
min_pool_size:DEFAULT_MIN_POOL_SIZE,
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:DEFAULT_POLL_PERIOD,
connect_md_proc:nil,
@ -102,12 +219,8 @@ module GRPC
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@pool = Concurrent::ThreadPoolExecutor.new(
min_threads: [min_pool_size, pool_size].min,
max_threads: pool_size,
max_queue: max_waiting_requests,
fallback_policy: :discard)
@pool_size = pool_size
@pool = Pool.new(@pool_size)
@run_cond = ConditionVariable.new
@run_mutex = Mutex.new
# running_state can take 4 values: :not_started, :running, :stopping, and
@ -128,8 +241,7 @@ module GRPC
end
deadline = from_relative_time(@poll_period)
@server.close(deadline)
@pool.shutdown
@pool.wait_for_termination(@poll_period)
@pool.stop
end
def running_state
@ -226,6 +338,7 @@ module GRPC
def run
@run_mutex.synchronize do
fail 'cannot run without registering services' if rpc_descs.size.zero?
@pool.start
@server.start
transition_running_state(:running)
@run_cond.broadcast
@ -273,7 +386,7 @@ module GRPC
break if (!an_rpc.nil?) && an_rpc.call.nil?
active_call = new_active_server_call(an_rpc)
unless active_call.nil?
@pool.post(active_call) do |ac|
@pool.schedule(active_call) do |ac|
c, mth = ac
begin
rpc_descs[mth].run_server_method(c, rpc_handlers[mth])

Loading…
Cancel
Save