From d48d84da195642c89d1108567398d576f2d3382d Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 9 Mar 2016 11:10:20 -0800 Subject: [PATCH 1/3] Ruby: fix some synchronization code in server implementation --- src/ruby/.rubocop.yml | 3 + src/ruby/lib/grpc/generic/rpc_server.rb | 96 ++++++++++++++---------- src/ruby/spec/generic/rpc_server_spec.rb | 23 +----- 3 files changed, 60 insertions(+), 62 deletions(-) diff --git a/src/ruby/.rubocop.yml b/src/ruby/.rubocop.yml index ff5cf8db831..d13ce426553 100644 --- a/src/ruby/.rubocop.yml +++ b/src/ruby/.rubocop.yml @@ -15,3 +15,6 @@ Metrics/CyclomaticComplexity: Metrics/PerceivedComplexity: Max: 8 + +Metrics/ClassLength: + Max: 250 diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index ef2997c9918..eefa9577c47 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -107,7 +107,9 @@ module GRPC # Starts running the jobs in the thread pool. def start - fail 'already stopped' if @stopped + @stop_mutex.synchronize do + fail 'already stopped' if @stopped + end until @workers.size == @size.to_i next_thread = Thread.new do catch(:exit) do # allows { throw :exit } to kill a thread @@ -264,10 +266,13 @@ module GRPC @pool = Pool.new(@pool_size) @run_cond = ConditionVariable.new @run_mutex = Mutex.new - @running = false + # running_state can take 4 values: :not_started, :running, :stopping, and + # :stopped. State transitions can only proceed in that order. + @running_state = :not_started @server = RpcServer.setup_srv(server_override, @cq, **kw) - @stopped = false - @stop_mutex = Mutex.new + # Mutex to synchronize registration of services and registered service + # count. @run_mutex should not be acquired while holding @handle_mutex + @handle_mutex = Mutex.new end # stops a running server @@ -275,27 +280,28 @@ module GRPC # the call has no impact if the server is already stopped, otherwise # server's current call loop is it's last. def stop - return unless @running - @stop_mutex.synchronize do - @stopped = true + @run_mutex.synchronize do + fail 'Cannot stop before starting' if @running_state == :not_started + return if @running_state != :running + @running_state = :stopping end deadline = from_relative_time(@poll_period) - return if @server.close(@cq, deadline) - deadline = from_relative_time(@poll_period) @server.close(@cq, deadline) @pool.stop end - # determines if the server has been stopped - def stopped? - @stop_mutex.synchronize do - return @stopped + def running_state + @run_mutex.synchronize do + return @running_state end end - # determines if the server is currently running def running? - @running + running_state == :running + end + + def stopped? + running_state == :stopped end # Is called from other threads to wait for #run to start up the server. @@ -304,13 +310,11 @@ module GRPC # # @param timeout [Numeric] number of seconds to wait # @result [true, false] true if the server is running, false otherwise - def wait_till_running(timeout = 0.1) - end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100 - while Time.now < end_time - @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running? - sleep(sleep_period) + def wait_till_running(timeout = nil) + @run_mutex.synchronize do + @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started + return @running_state == :running end - running? end # Runs the server in its own thread, then waits for signal INT or TERM on @@ -360,11 +364,16 @@ module GRPC # @param service [Object|Class] a service class or object as described # above def handle(service) - fail 'cannot add services if the server is running' if running? - fail 'cannot add services if the server is stopped' if stopped? - cls = service.is_a?(Class) ? service : service.class - assert_valid_service_class(cls) - add_rpc_descs_for(service) + @run_mutex.synchronize do + unless @running_state == :not_started + fail 'cannot add services if the server has been started' + end + cls = service.is_a?(Class) ? service : service.class + assert_valid_service_class(cls) + @handle_mutex.synchronize do + add_rpc_descs_for(service) + end + end end # runs the server @@ -375,16 +384,13 @@ module GRPC # - #running? returns true after this is called, until #stop cause the # the server to stop. def run - if rpc_descs.size.zero? - GRPC.logger.warn('did not run as no services were present') - return - end @run_mutex.synchronize do - @running = true - @run_cond.signal + fail 'cannot run without registering services' if rpc_descs.size.zero? + @pool.start + @server.start + @running_state = :running + @run_cond.broadcast end - @pool.start - @server.start loop_handle_server_calls end @@ -413,9 +419,9 @@ module GRPC # handles calls to the server def loop_handle_server_calls - fail 'not running' unless @running + fail 'not started' if running_state == :not_started loop_tag = Object.new - until stopped? + while running_state == :running begin an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE) break if (!an_rpc.nil?) && an_rpc.call.nil? @@ -430,11 +436,14 @@ module GRPC rescue Core::CallError, RuntimeError => e # these might happen for various reasonse. The correct behaviour of # the server is to log them and continue, if it's not shutting down. - GRPC.logger.warn("server call failed: #{e}") unless stopped? + if running_state == :running + GRPC.logger.warn("server call failed: #{e}") + end next end end - @running = false + # @running_state should be :stopping here + @run_mutex.synchronize { @running_state = :stopped } GRPC.logger.info("stopped: #{self}") end @@ -467,11 +476,15 @@ module GRPC protected def rpc_descs - @rpc_descs ||= {} + @handle_mutex.synchronize do + return @rpc_descs ||= {} + end end def rpc_handlers - @rpc_handlers ||= {} + @handle_mutex.synchronize do + @rpc_handlers ||= {} + end end def assert_valid_service_class(cls) @@ -484,9 +497,10 @@ module GRPC cls.assert_rpc_descs_have_methods end + # This should be called while holding @handle_mutex def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class - specs, handlers = rpc_descs, rpc_handlers + specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {}) cls.rpc_descs.each_pair do |name, spec| route = "/#{cls.service_name}/#{name}".to_sym fail "already registered: rpc #{route} from #{spec}" if specs.key? route diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index be6331d68bb..e16ba60387e 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -220,19 +220,10 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**opts) end - after(:each) do - @srv.stop - end - it 'starts out false' do expect(@srv.stopped?).to be(false) end - it 'stays false after a #stop is called before #run' do - @srv.stop - expect(@srv.stopped?).to be(false) - end - it 'stays false after the server starts running', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -247,8 +238,8 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running @srv.stop - expect(@srv.stopped?).to be(true) t.join + expect(@srv.stopped?).to be(true) end end @@ -266,9 +257,7 @@ describe GRPC::RpcServer do server_override: @server } r = RpcServer.new(**opts) - r.run - expect(r.running?).to be(false) - r.stop + expect { r.run }.to raise_error(RuntimeError) end it 'is true after run is called with a registered service' do @@ -293,10 +282,6 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**@opts) end - after(:each) do - @srv.stop - end - it 'raises if #run has already been called' do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -528,10 +513,6 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end - after(:each) do - @srv.stop - end - it 'should be added to BadStatus when requests fail', server: true do service = FailingService.new @srv.handle(service) From 3c77ff452fce5265c3454d26613bc6df529475ed Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 9 Mar 2016 11:44:56 -0800 Subject: [PATCH 2/3] Fix copyright --- src/ruby/spec/generic/rpc_server_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index e16ba60387e..dfaec6d6edc 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without From 1d68520c22fc708cdd6043962a0f59964a8aaf60 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 9 Mar 2016 17:30:37 -0800 Subject: [PATCH 3/3] Remove extraneous mutex, restrict state transitions --- src/ruby/lib/grpc/generic/rpc_server.rb | 37 ++++++++++++++----------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index eefa9577c47..b30d19dd2bb 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -270,9 +270,6 @@ module GRPC # :stopped. State transitions can only proceed in that order. @running_state = :not_started @server = RpcServer.setup_srv(server_override, @cq, **kw) - # Mutex to synchronize registration of services and registered service - # count. @run_mutex should not be acquired while holding @handle_mutex - @handle_mutex = Mutex.new end # stops a running server @@ -283,7 +280,7 @@ module GRPC @run_mutex.synchronize do fail 'Cannot stop before starting' if @running_state == :not_started return if @running_state != :running - @running_state = :stopping + transition_running_state(:stopping) end deadline = from_relative_time(@poll_period) @server.close(@cq, deadline) @@ -296,6 +293,20 @@ module GRPC end end + # Can only be called while holding @run_mutex + def transition_running_state(target_state) + state_transitions = { + not_started: :running, + running: :stopping, + stopping: :stopped + } + if state_transitions[@running_state] == target_state + @running_state = target_state + else + fail "Bad server state transition: #{@running_state}->#{target_state}" + end + end + def running? running_state == :running end @@ -370,9 +381,7 @@ module GRPC end cls = service.is_a?(Class) ? service : service.class assert_valid_service_class(cls) - @handle_mutex.synchronize do - add_rpc_descs_for(service) - end + add_rpc_descs_for(service) end end @@ -388,7 +397,7 @@ module GRPC fail 'cannot run without registering services' if rpc_descs.size.zero? @pool.start @server.start - @running_state = :running + transition_running_state(:running) @run_cond.broadcast end loop_handle_server_calls @@ -443,7 +452,7 @@ module GRPC end end # @running_state should be :stopping here - @run_mutex.synchronize { @running_state = :stopped } + @run_mutex.synchronize { transition_running_state(:stopped) } GRPC.logger.info("stopped: #{self}") end @@ -476,15 +485,11 @@ module GRPC protected def rpc_descs - @handle_mutex.synchronize do - return @rpc_descs ||= {} - end + @rpc_descs ||= {} end def rpc_handlers - @handle_mutex.synchronize do - @rpc_handlers ||= {} - end + @rpc_handlers ||= {} end def assert_valid_service_class(cls) @@ -497,7 +502,7 @@ module GRPC cls.assert_rpc_descs_have_methods end - # This should be called while holding @handle_mutex + # This should be called while holding @run_mutex def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})