diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 6da7d3c8305..f4ae6fab846 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -607,19 +607,19 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, rb_raise(grpc_rb_eCallError, "grpc_call_start_batch failed with %s (code=%d)", grpc_call_error_detail_of(err), err); - return; + return Qnil; } ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout); if (ev == NULL) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); - return; + return Qnil; } if (ev->data.op_complete != GRPC_OP_OK) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)", ev->data.op_complete); - return; + return Qnil; } /* Build and return the BatchResult struct result */ diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec index c6335791025..19b3e21cb6d 100755 --- a/src/ruby/grpc.gemspec +++ b/src/ruby/grpc.gemspec @@ -26,7 +26,7 @@ Gem::Specification.new do |s| s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests - s.add_dependency 'logging', '~> 1.8' + s.add_dependency 'logging', '~> 2.0' s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests s.add_development_dependency 'simplecov', '~> 0.9' diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index b0f68035cd6..80b5743e914 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -30,6 +30,7 @@ require 'grpc/errors' require 'grpc/grpc' require 'grpc/logconfig' +require 'grpc/notifier' require 'grpc/version' require 'grpc/core/time_consts' require 'grpc/generic/active_call' diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 43ba5499059..947c39cd226 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -188,7 +188,7 @@ module GRPC # @param marshalled [false, true] indicates if the object is already # marshalled. def remote_send(req, marshalled = false) - logger.debug("sending #{req.inspect}, marshalled? #{marshalled}") + logger.debug("sending #{req}, marshalled? #{marshalled}") if marshalled payload = req else diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index b813ab5b542..4ca3004d6f0 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -123,8 +123,7 @@ module GRPC break if req.equal?(END_OF_READS) yield req end - @loop_th.join - @enq_th.join + @enq_th.join if @enq_th.alive? end # during bidi-streaming, read the requests to send from a separate thread diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 88c24aa92bb..3375fcf20ac 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -54,6 +54,18 @@ module GRPC end module_function :handle_signals + # Sets up a signal handler that adds signals to the signal handling global. + # + # Signal handlers should do as little as humanly possible. + # Here, they just add themselves to $grpc_signals + # + # RpcServer (and later other parts of gRPC) monitors the signals + # $grpc_signals in its own non-signal context. + def trap_signals + %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } + end + module_function :trap_signals + # Pool is a simple thread pool. class Pool # Default keep alive period is 1s @@ -172,17 +184,6 @@ module GRPC # Signal check period is 0.25s SIGNAL_CHECK_PERIOD = 0.25 - # Sets up a signal handler that adds signals to the signal handling global. - # - # Signal handlers should do as little as humanly possible. - # Here, they just add themselves to $grpc_signals - # - # RpcServer (and later other parts of gRPC) monitors the signals - # $grpc_signals in its own non-signal context. - def self.trap_signals - %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } - end - # setup_cq is used by #initialize to constuct a Core::CompletionQueue from # its arguments. def self.setup_cq(alt_cq) @@ -299,12 +300,12 @@ module GRPC # Runs the server in its own thread, then waits for signal INT or TERM on # the current thread to terminate it. def run_till_terminated - self.class.trap_signals + GRPC.trap_signals t = Thread.new { run } wait_till_running loop do sleep SIGNAL_CHECK_PERIOD - break unless handle_signals + break unless GRPC.handle_signals end stop t.join diff --git a/src/ruby/lib/grpc/notifier.rb b/src/ruby/lib/grpc/notifier.rb new file mode 100644 index 00000000000..caa18bbed6d --- /dev/null +++ b/src/ruby/lib/grpc/notifier.rb @@ -0,0 +1,60 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# GRPC contains the General RPC module. +module GRPC + # Notifier is useful high-level synchronization primitive. + class Notifier + attr_reader :payload, :notified + alias_method :notified?, :notified + + def initialize + @mutex = Mutex.new + @cvar = ConditionVariable.new + @notified = false + @payload = nil + end + + def wait + @mutex.synchronize do + @cvar.wait(@mutex) until notified? + end + end + + def notify(payload) + @mutex.synchronize do + return Error.new('already notified') if notified? + @payload = payload + @notified = true + @cvar.signal + return nil + end + end + end +end diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 88c6b44c229..98d68ccfbb8 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -29,37 +29,8 @@ require 'grpc' -# Notifier is useful high-level synchronization primitive. -class Notifier - attr_reader :payload, :notified - alias_method :notified?, :notified - - def initialize - @mutex = Mutex.new - @cvar = ConditionVariable.new - @notified = false - @payload = nil - end - - def wait - @mutex.synchronize do - @cvar.wait(@mutex) until notified? - end - end - - def notify(payload) - @mutex.synchronize do - return Error.new('already notified') if notified? - @payload = payload - @notified = true - @cvar.signal - return nil - end - end -end - def wakey_thread(&blk) - n = Notifier.new + n = GRPC::Notifier.new t = Thread.new do blk.call(n) end