Merge pull request #5678 from murgatroid99/ruby_synchronization_fixes

Ruby: fix some synchronization code in server implementation
pull/5637/head^2
Nicolas Noble 9 years ago
commit fbdf51b07e
  1. 3
      src/ruby/.rubocop.yml
  2. 97
      src/ruby/lib/grpc/generic/rpc_server.rb
  3. 25
      src/ruby/spec/generic/rpc_server_spec.rb

@ -15,3 +15,6 @@ Metrics/CyclomaticComplexity:
Metrics/PerceivedComplexity:
Max: 8
Metrics/ClassLength:
Max: 250

@ -107,7 +107,9 @@ module GRPC
# Starts running the jobs in the thread pool.
def start
fail 'already stopped' if @stopped
@stop_mutex.synchronize do
fail 'already stopped' if @stopped
end
until @workers.size == @size.to_i
next_thread = Thread.new do
catch(:exit) do # allows { throw :exit } to kill a thread
@ -264,10 +266,10 @@ module GRPC
@pool = Pool.new(@pool_size)
@run_cond = ConditionVariable.new
@run_mutex = Mutex.new
@running = false
# running_state can take 4 values: :not_started, :running, :stopping, and
# :stopped. State transitions can only proceed in that order.
@running_state = :not_started
@server = RpcServer.setup_srv(server_override, @cq, **kw)
@stopped = false
@stop_mutex = Mutex.new
end
# stops a running server
@ -275,27 +277,42 @@ module GRPC
# the call has no impact if the server is already stopped, otherwise
# server's current call loop is it's last.
def stop
return unless @running
@stop_mutex.synchronize do
@stopped = true
@run_mutex.synchronize do
fail 'Cannot stop before starting' if @running_state == :not_started
return if @running_state != :running
transition_running_state(:stopping)
end
deadline = from_relative_time(@poll_period)
return if @server.close(@cq, deadline)
deadline = from_relative_time(@poll_period)
@server.close(@cq, deadline)
@pool.stop
end
# determines if the server has been stopped
def stopped?
@stop_mutex.synchronize do
return @stopped
def running_state
@run_mutex.synchronize do
return @running_state
end
end
# Can only be called while holding @run_mutex
def transition_running_state(target_state)
state_transitions = {
not_started: :running,
running: :stopping,
stopping: :stopped
}
if state_transitions[@running_state] == target_state
@running_state = target_state
else
fail "Bad server state transition: #{@running_state}->#{target_state}"
end
end
# determines if the server is currently running
def running?
@running
running_state == :running
end
def stopped?
running_state == :stopped
end
# Is called from other threads to wait for #run to start up the server.
@ -304,13 +321,11 @@ module GRPC
#
# @param timeout [Numeric] number of seconds to wait
# @result [true, false] true if the server is running, false otherwise
def wait_till_running(timeout = 0.1)
end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100
while Time.now < end_time
@run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running?
sleep(sleep_period)
def wait_till_running(timeout = nil)
@run_mutex.synchronize do
@run_cond.wait(@run_mutex, timeout) if @running_state == :not_started
return @running_state == :running
end
running?
end
# Runs the server in its own thread, then waits for signal INT or TERM on
@ -360,11 +375,14 @@ module GRPC
# @param service [Object|Class] a service class or object as described
# above
def handle(service)
fail 'cannot add services if the server is running' if running?
fail 'cannot add services if the server is stopped' if stopped?
cls = service.is_a?(Class) ? service : service.class
assert_valid_service_class(cls)
add_rpc_descs_for(service)
@run_mutex.synchronize do
unless @running_state == :not_started
fail 'cannot add services if the server has been started'
end
cls = service.is_a?(Class) ? service : service.class
assert_valid_service_class(cls)
add_rpc_descs_for(service)
end
end
# runs the server
@ -375,16 +393,13 @@ module GRPC
# - #running? returns true after this is called, until #stop cause the
# the server to stop.
def run
if rpc_descs.size.zero?
GRPC.logger.warn('did not run as no services were present')
return
end
@run_mutex.synchronize do
@running = true
@run_cond.signal
fail 'cannot run without registering services' if rpc_descs.size.zero?
@pool.start
@server.start
transition_running_state(:running)
@run_cond.broadcast
end
@pool.start
@server.start
loop_handle_server_calls
end
@ -413,9 +428,9 @@ module GRPC
# handles calls to the server
def loop_handle_server_calls
fail 'not running' unless @running
fail 'not started' if running_state == :not_started
loop_tag = Object.new
until stopped?
while running_state == :running
begin
an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE)
break if (!an_rpc.nil?) && an_rpc.call.nil?
@ -430,11 +445,14 @@ module GRPC
rescue Core::CallError, RuntimeError => e
# these might happen for various reasonse. The correct behaviour of
# the server is to log them and continue, if it's not shutting down.
GRPC.logger.warn("server call failed: #{e}") unless stopped?
if running_state == :running
GRPC.logger.warn("server call failed: #{e}")
end
next
end
end
@running = false
# @running_state should be :stopping here
@run_mutex.synchronize { transition_running_state(:stopped) }
GRPC.logger.info("stopped: #{self}")
end
@ -484,9 +502,10 @@ module GRPC
cls.assert_rpc_descs_have_methods
end
# This should be called while holding @run_mutex
def add_rpc_descs_for(service)
cls = service.is_a?(Class) ? service : service.class
specs, handlers = rpc_descs, rpc_handlers
specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
cls.rpc_descs.each_pair do |name, spec|
route = "/#{cls.service_name}/#{name}".to_sym
fail "already registered: rpc #{route} from #{spec}" if specs.key? route

@ -1,4 +1,4 @@
# Copyright 2015, Google Inc.
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -220,19 +220,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**opts)
end
after(:each) do
@srv.stop
end
it 'starts out false' do
expect(@srv.stopped?).to be(false)
end
it 'stays false after a #stop is called before #run' do
@srv.stop
expect(@srv.stopped?).to be(false)
end
it 'stays false after the server starts running', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@ -247,8 +238,8 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run }
@srv.wait_till_running
@srv.stop
expect(@srv.stopped?).to be(true)
t.join
expect(@srv.stopped?).to be(true)
end
end
@ -266,9 +257,7 @@ describe GRPC::RpcServer do
server_override: @server
}
r = RpcServer.new(**opts)
r.run
expect(r.running?).to be(false)
r.stop
expect { r.run }.to raise_error(RuntimeError)
end
it 'is true after run is called with a registered service' do
@ -293,10 +282,6 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**@opts)
end
after(:each) do
@srv.stop
end
it 'raises if #run has already been called' do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@ -528,10 +513,6 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
after(:each) do
@srv.stop
end
it 'should be added to BadStatus when requests fail', server: true do
service = FailingService.new
@srv.handle(service)

Loading…
Cancel
Save