|
|
@ -52,28 +52,31 @@ describe GRPC::Pool do |
|
|
|
expect(p.ready_for_work?).to be(false) |
|
|
|
expect(p.ready_for_work?).to be(false) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
it 'it stops being ready after all workers jobs waiting or running' do |
|
|
|
it 'it stops being ready after all workers are busy' do |
|
|
|
p = Pool.new(5) |
|
|
|
p = Pool.new(5) |
|
|
|
p.start |
|
|
|
p.start |
|
|
|
job = proc { sleep(3) } # sleep so workers busy when done scheduling |
|
|
|
|
|
|
|
5.times do |
|
|
|
wait_mu = Mutex.new |
|
|
|
expect(p.ready_for_work?).to be(true) |
|
|
|
wait_cv = ConditionVariable.new |
|
|
|
p.schedule(&job) |
|
|
|
wait = true |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
job = proc do |
|
|
|
|
|
|
|
wait_mu.synchronize do |
|
|
|
|
|
|
|
wait_cv.wait(wait_mu) while wait |
|
|
|
|
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
expect(p.ready_for_work?).to be(false) |
|
|
|
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
it 'it becomes ready again after jobs complete' do |
|
|
|
|
|
|
|
p = Pool.new(5) |
|
|
|
|
|
|
|
p.start |
|
|
|
|
|
|
|
job = proc {} |
|
|
|
|
|
|
|
5.times do |
|
|
|
5.times do |
|
|
|
expect(p.ready_for_work?).to be(true) |
|
|
|
expect(p.ready_for_work?).to be(true) |
|
|
|
p.schedule(&job) |
|
|
|
p.schedule(&job) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
expect(p.ready_for_work?).to be(false) |
|
|
|
expect(p.ready_for_work?).to be(false) |
|
|
|
sleep 5 # give the pool time do get at least one task done |
|
|
|
|
|
|
|
expect(p.ready_for_work?).to be(true) |
|
|
|
wait_mu.synchronize do |
|
|
|
|
|
|
|
wait = false |
|
|
|
|
|
|
|
wait_cv.broadcast |
|
|
|
|
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
@ -105,13 +108,20 @@ describe GRPC::Pool do |
|
|
|
it 'stops jobs when there are long running jobs' do |
|
|
|
it 'stops jobs when there are long running jobs' do |
|
|
|
p = Pool.new(1) |
|
|
|
p = Pool.new(1) |
|
|
|
p.start |
|
|
|
p.start |
|
|
|
o, q = Object.new, Queue.new |
|
|
|
|
|
|
|
|
|
|
|
wait_forever_mu = Mutex.new |
|
|
|
|
|
|
|
wait_forever_cv = ConditionVariable.new |
|
|
|
|
|
|
|
wait_forever = true |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
job_running = Queue.new |
|
|
|
job = proc do |
|
|
|
job = proc do |
|
|
|
sleep(5) # long running |
|
|
|
job_running.push(Object.new) |
|
|
|
q.push(o) |
|
|
|
wait_forever_mu.synchronize do |
|
|
|
|
|
|
|
wait_forever_cv.wait while wait_forever |
|
|
|
|
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
p.schedule(&job) |
|
|
|
p.schedule(&job) |
|
|
|
sleep(1) # should ensure the long job gets scheduled |
|
|
|
job_running.pop |
|
|
|
expect { p.stop }.not_to raise_error |
|
|
|
expect { p.stop }.not_to raise_error |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|