diff --git a/src/ruby/.rubocop.yml b/src/ruby/.rubocop.yml index ff5cf8db831..d13ce426553 100644 --- a/src/ruby/.rubocop.yml +++ b/src/ruby/.rubocop.yml @@ -15,3 +15,6 @@ Metrics/CyclomaticComplexity: Metrics/PerceivedComplexity: Max: 8 + +Metrics/ClassLength: + Max: 250 diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index ef2997c9918..b30d19dd2bb 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -107,7 +107,9 @@ module GRPC # Starts running the jobs in the thread pool. def start - fail 'already stopped' if @stopped + @stop_mutex.synchronize do + fail 'already stopped' if @stopped + end until @workers.size == @size.to_i next_thread = Thread.new do catch(:exit) do # allows { throw :exit } to kill a thread @@ -264,10 +266,10 @@ module GRPC @pool = Pool.new(@pool_size) @run_cond = ConditionVariable.new @run_mutex = Mutex.new - @running = false + # running_state can take 4 values: :not_started, :running, :stopping, and + # :stopped. State transitions can only proceed in that order. + @running_state = :not_started @server = RpcServer.setup_srv(server_override, @cq, **kw) - @stopped = false - @stop_mutex = Mutex.new end # stops a running server @@ -275,27 +277,42 @@ module GRPC # the call has no impact if the server is already stopped, otherwise # server's current call loop is it's last. def stop - return unless @running - @stop_mutex.synchronize do - @stopped = true + @run_mutex.synchronize do + fail 'Cannot stop before starting' if @running_state == :not_started + return if @running_state != :running + transition_running_state(:stopping) end deadline = from_relative_time(@poll_period) - return if @server.close(@cq, deadline) - deadline = from_relative_time(@poll_period) @server.close(@cq, deadline) @pool.stop end - # determines if the server has been stopped - def stopped? - @stop_mutex.synchronize do - return @stopped + def running_state + @run_mutex.synchronize do + return @running_state + end + end + + # Can only be called while holding @run_mutex + def transition_running_state(target_state) + state_transitions = { + not_started: :running, + running: :stopping, + stopping: :stopped + } + if state_transitions[@running_state] == target_state + @running_state = target_state + else + fail "Bad server state transition: #{@running_state}->#{target_state}" end end - # determines if the server is currently running def running? - @running + running_state == :running + end + + def stopped? + running_state == :stopped end # Is called from other threads to wait for #run to start up the server. @@ -304,13 +321,11 @@ module GRPC # # @param timeout [Numeric] number of seconds to wait # @result [true, false] true if the server is running, false otherwise - def wait_till_running(timeout = 0.1) - end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100 - while Time.now < end_time - @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running? - sleep(sleep_period) + def wait_till_running(timeout = nil) + @run_mutex.synchronize do + @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started + return @running_state == :running end - running? end # Runs the server in its own thread, then waits for signal INT or TERM on @@ -360,11 +375,14 @@ module GRPC # @param service [Object|Class] a service class or object as described # above def handle(service) - fail 'cannot add services if the server is running' if running? - fail 'cannot add services if the server is stopped' if stopped? - cls = service.is_a?(Class) ? service : service.class - assert_valid_service_class(cls) - add_rpc_descs_for(service) + @run_mutex.synchronize do + unless @running_state == :not_started + fail 'cannot add services if the server has been started' + end + cls = service.is_a?(Class) ? service : service.class + assert_valid_service_class(cls) + add_rpc_descs_for(service) + end end # runs the server @@ -375,16 +393,13 @@ module GRPC # - #running? returns true after this is called, until #stop cause the # the server to stop. def run - if rpc_descs.size.zero? - GRPC.logger.warn('did not run as no services were present') - return - end @run_mutex.synchronize do - @running = true - @run_cond.signal + fail 'cannot run without registering services' if rpc_descs.size.zero? + @pool.start + @server.start + transition_running_state(:running) + @run_cond.broadcast end - @pool.start - @server.start loop_handle_server_calls end @@ -413,9 +428,9 @@ module GRPC # handles calls to the server def loop_handle_server_calls - fail 'not running' unless @running + fail 'not started' if running_state == :not_started loop_tag = Object.new - until stopped? + while running_state == :running begin an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE) break if (!an_rpc.nil?) && an_rpc.call.nil? @@ -430,11 +445,14 @@ module GRPC rescue Core::CallError, RuntimeError => e # these might happen for various reasonse. The correct behaviour of # the server is to log them and continue, if it's not shutting down. - GRPC.logger.warn("server call failed: #{e}") unless stopped? + if running_state == :running + GRPC.logger.warn("server call failed: #{e}") + end next end end - @running = false + # @running_state should be :stopping here + @run_mutex.synchronize { transition_running_state(:stopped) } GRPC.logger.info("stopped: #{self}") end @@ -484,9 +502,10 @@ module GRPC cls.assert_rpc_descs_have_methods end + # This should be called while holding @run_mutex def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class - specs, handlers = rpc_descs, rpc_handlers + specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {}) cls.rpc_descs.each_pair do |name, spec| route = "/#{cls.service_name}/#{name}".to_sym fail "already registered: rpc #{route} from #{spec}" if specs.key? route diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index be6331d68bb..dfaec6d6edc 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -220,19 +220,10 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**opts) end - after(:each) do - @srv.stop - end - it 'starts out false' do expect(@srv.stopped?).to be(false) end - it 'stays false after a #stop is called before #run' do - @srv.stop - expect(@srv.stopped?).to be(false) - end - it 'stays false after the server starts running', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -247,8 +238,8 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running @srv.stop - expect(@srv.stopped?).to be(true) t.join + expect(@srv.stopped?).to be(true) end end @@ -266,9 +257,7 @@ describe GRPC::RpcServer do server_override: @server } r = RpcServer.new(**opts) - r.run - expect(r.running?).to be(false) - r.stop + expect { r.run }.to raise_error(RuntimeError) end it 'is true after run is called with a registered service' do @@ -293,10 +282,6 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**@opts) end - after(:each) do - @srv.stop - end - it 'raises if #run has already been called' do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -528,10 +513,6 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end - after(:each) do - @srv.stop - end - it 'should be added to BadStatus when requests fail', server: true do service = FailingService.new @srv.handle(service)