Merge pull request #8632 from apolcyn/fix_ruby_deadlock

remove wait queue from ruby thread pool to avoid possibility of deadlock
pull/8818/head
Vijay Pai 8 years ago committed by GitHub
commit aa50f2e6a7
  1. 52
      src/ruby/lib/grpc/generic/rpc_server.rb
  2. 62
      src/ruby/spec/generic/rpc_server_pool_spec.rb

@ -48,6 +48,10 @@ module GRPC
@stop_cond = ConditionVariable.new @stop_cond = ConditionVariable.new
@workers = [] @workers = []
@keep_alive = keep_alive @keep_alive = keep_alive
# Each worker thread has its own queue to push and pull jobs
# these queues are put into @ready_queues when that worker is idle
@ready_workers = Queue.new
end end
# Returns the number of jobs waiting # Returns the number of jobs waiting
@ -55,6 +59,13 @@ module GRPC
@jobs.size @jobs.size
end end
def ready_for_work?
# Busy worker threads are either doing work, or have a single job
# waiting on them. Workers that are idle with no jobs waiting
# have their "queues" in @ready_workers
!@ready_workers.empty?
end
# Runs the given block on the queue with the provided args. # Runs the given block on the queue with the provided args.
# #
# @param args the args passed blk when it is called # @param args the args passed blk when it is called
@ -67,7 +78,11 @@ module GRPC
return return
end end
GRPC.logger.info('schedule another job') GRPC.logger.info('schedule another job')
@jobs << [blk, args] fail 'No worker threads available' if @ready_workers.empty?
worker_queue = @ready_workers.pop
fail 'worker already has a task waiting' unless worker_queue.empty?
worker_queue << [blk, args]
end end
end end
@ -77,9 +92,11 @@ module GRPC
fail 'already stopped' if @stopped fail 'already stopped' if @stopped
end end
until @workers.size == @size.to_i until @workers.size == @size.to_i
next_thread = Thread.new do new_worker_queue = Queue.new
@ready_workers << new_worker_queue
next_thread = Thread.new(new_worker_queue) do |jobs|
catch(:exit) do # allows { throw :exit } to kill a thread catch(:exit) do # allows { throw :exit } to kill a thread
loop_execute_jobs loop_execute_jobs(jobs)
end end
remove_current_thread remove_current_thread
end end
@ -90,7 +107,7 @@ module GRPC
# Stops the jobs in the pool # Stops the jobs in the pool
def stop def stop
GRPC.logger.info('stopping, will wait for all the workers to exit') GRPC.logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } } schedule { throw :exit } while ready_for_work?
@stop_mutex.synchronize do # wait @keep_alive for works to stop @stop_mutex.synchronize do # wait @keep_alive for works to stop
@stopped = true @stopped = true
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
@ -125,15 +142,18 @@ module GRPC
end end
end end
def loop_execute_jobs def loop_execute_jobs(worker_queue)
loop do loop do
begin begin
blk, args = @jobs.pop blk, args = worker_queue.pop
blk.call(*args) blk.call(*args)
rescue StandardError => e rescue StandardError => e
GRPC.logger.warn('Error in worker thread') GRPC.logger.warn('Error in worker thread')
GRPC.logger.warn(e) GRPC.logger.warn(e)
end end
# there shouldn't be any work given to this thread while its busy
fail('received a task while busy') unless worker_queue.empty?
@ready_workers << worker_queue
end end
end end
end end
@ -147,10 +167,10 @@ module GRPC
def_delegators :@server, :add_http2_port def_delegators :@server, :add_http2_port
# Default thread pool size is 3 # Default thread pool size is 30
DEFAULT_POOL_SIZE = 3 DEFAULT_POOL_SIZE = 30
# Default max_waiting_requests size is 20 # Deprecated due to internal changes to the thread pool
DEFAULT_MAX_WAITING_REQUESTS = 20 DEFAULT_MAX_WAITING_REQUESTS = 20
# Default poll period is 1s # Default poll period is 1s
@ -175,11 +195,11 @@ module GRPC
# instance. # instance.
# #
# * pool_size: the size of the thread pool the server uses to run its # * pool_size: the size of the thread pool the server uses to run its
# threads # threads. No more concurrent requests can be made than the size
# of the thread pool
# #
# * max_waiting_requests: the maximum number of requests that are not # * max_waiting_requests: Deprecated due to internal changes to the thread
# being handled to allow. When this limit is exceeded, the server responds # pool. This is still an argument for compatibility but is ignored.
# with not available to new requests
# #
# * poll_period: when present, the server polls for new events with this # * poll_period: when present, the server polls for new events with this
# period # period
@ -330,10 +350,8 @@ module GRPC
# Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
def available?(an_rpc) def available?(an_rpc)
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests return an_rpc if @pool.ready_for_work?
GRPC.logger.info("waiting: #{jobs_count}, max: #{max}") GRPC.logger.warn('no free worker threads currently')
return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x } noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
metadata_received: true) metadata_received: true)

@ -29,6 +29,8 @@
require 'grpc' require 'grpc'
Thread.abort_on_exception = true
describe GRPC::Pool do describe GRPC::Pool do
Pool = GRPC::Pool Pool = GRPC::Pool
@ -44,32 +46,34 @@ describe GRPC::Pool do
end end
end end
describe '#jobs_waiting' do describe '#ready_for_work?' do
it 'at start, it is zero' do it 'before start it is not ready' do
p = Pool.new(1) p = Pool.new(1)
expect(p.jobs_waiting).to be(0) expect(p.ready_for_work?).to be(false)
end end
it 'it increases, with each scheduled job if the pool is not running' do it 'it stops being ready after all workers jobs waiting or running' do
p = Pool.new(1) p = Pool.new(5)
job = proc {} p.start
expect(p.jobs_waiting).to be(0) job = proc { sleep(3) } # sleep so workers busy when done scheduling
5.times do |i| 5.times do
expect(p.ready_for_work?).to be(true)
p.schedule(&job) p.schedule(&job)
expect(p.jobs_waiting).to be(i + 1)
end end
expect(p.ready_for_work?).to be(false)
end end
it 'it decreases as jobs are run' do it 'it becomes ready again after jobs complete' do
p = Pool.new(1) p = Pool.new(5)
p.start
job = proc {} job = proc {}
expect(p.jobs_waiting).to be(0) 5.times do
3.times do expect(p.ready_for_work?).to be(true)
p.schedule(&job) p.schedule(&job)
end end
p.start expect(p.ready_for_work?).to be(false)
sleep 2 sleep 5 # give the pool time do get at least one task done
expect(p.jobs_waiting).to be(0) expect(p.ready_for_work?).to be(true)
end end
end end
@ -90,6 +94,18 @@ describe GRPC::Pool do
expect(q.pop).to be(o) expect(q.pop).to be(o)
p.stop p.stop
end end
it 'it throws an error if all of the workers have tasks to do' do
p = Pool.new(5)
p.start
job = proc {}
5.times do
expect(p.ready_for_work?).to be(true)
p.schedule(&job)
end
expect { p.schedule(&job) }.to raise_error
expect { p.schedule(&job) }.to raise_error
end
end end
describe '#stop' do describe '#stop' do
@ -113,18 +129,8 @@ describe GRPC::Pool do
end end
describe '#start' do describe '#start' do
it 'runs pre-scheduled jobs' do it 'runs jobs as they are scheduled' do
p = Pool.new(2) p = Pool.new(5)
o, q = Object.new, Queue.new
n = 5 # arbitrary
n.times { p.schedule(o, &q.method(:push)) }
p.start
n.times { expect(q.pop).to be(o) }
p.stop
end
it 'runs jobs as they are scheduled ' do
p = Pool.new(2)
o, q = Object.new, Queue.new o, q = Object.new, Queue.new
p.start p.start
n = 5 # arbitrary n = 5 # arbitrary

Loading…
Cancel
Save