Merge pull request #1362 from tbetbetbe/grpc_ruby_interop_fixes

Grpc ruby interop fixes
pull/1367/head
Stanley Cheung 10 years ago
commit 884f4dad58
  1. 6
      src/ruby/ext/grpc/rb_call.c
  2. 2
      src/ruby/grpc.gemspec
  3. 1
      src/ruby/lib/grpc.rb
  4. 2
      src/ruby/lib/grpc/generic/active_call.rb
  5. 3
      src/ruby/lib/grpc/generic/bidi_call.rb
  6. 27
      src/ruby/lib/grpc/generic/rpc_server.rb
  7. 60
      src/ruby/lib/grpc/notifier.rb
  8. 31
      src/ruby/spec/generic/client_stub_spec.rb

@ -607,19 +607,19 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
rb_raise(grpc_rb_eCallError, rb_raise(grpc_rb_eCallError,
"grpc_call_start_batch failed with %s (code=%d)", "grpc_call_start_batch failed with %s (code=%d)",
grpc_call_error_detail_of(err), err); grpc_call_error_detail_of(err), err);
return; return Qnil;
} }
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout); ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
if (ev == NULL) { if (ev == NULL) {
grpc_run_batch_stack_cleanup(&st); grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");
return; return Qnil;
} }
if (ev->data.op_complete != GRPC_OP_OK) { if (ev->data.op_complete != GRPC_OP_OK) {
grpc_run_batch_stack_cleanup(&st); grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)", rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)",
ev->data.op_complete); ev->data.op_complete);
return; return Qnil;
} }
/* Build and return the BatchResult struct result */ /* Build and return the BatchResult struct result */

@ -26,7 +26,7 @@ Gem::Specification.new do |s|
s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1'
s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests
s.add_dependency 'logging', '~> 1.8' s.add_dependency 'logging', '~> 2.0'
s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests
s.add_development_dependency 'simplecov', '~> 0.9' s.add_development_dependency 'simplecov', '~> 0.9'

@ -30,6 +30,7 @@
require 'grpc/errors' require 'grpc/errors'
require 'grpc/grpc' require 'grpc/grpc'
require 'grpc/logconfig' require 'grpc/logconfig'
require 'grpc/notifier'
require 'grpc/version' require 'grpc/version'
require 'grpc/core/time_consts' require 'grpc/core/time_consts'
require 'grpc/generic/active_call' require 'grpc/generic/active_call'

@ -188,7 +188,7 @@ module GRPC
# @param marshalled [false, true] indicates if the object is already # @param marshalled [false, true] indicates if the object is already
# marshalled. # marshalled.
def remote_send(req, marshalled = false) def remote_send(req, marshalled = false)
logger.debug("sending #{req.inspect}, marshalled? #{marshalled}") logger.debug("sending #{req}, marshalled? #{marshalled}")
if marshalled if marshalled
payload = req payload = req
else else

@ -123,8 +123,7 @@ module GRPC
break if req.equal?(END_OF_READS) break if req.equal?(END_OF_READS)
yield req yield req
end end
@loop_th.join @enq_th.join if @enq_th.alive?
@enq_th.join
end end
# during bidi-streaming, read the requests to send from a separate thread # during bidi-streaming, read the requests to send from a separate thread

@ -54,6 +54,18 @@ module GRPC
end end
module_function :handle_signals module_function :handle_signals
# Sets up a signal handler that adds signals to the signal handling global.
#
# Signal handlers should do as little as humanly possible.
# Here, they just add themselves to $grpc_signals
#
# RpcServer (and later other parts of gRPC) monitors the signals
# $grpc_signals in its own non-signal context.
def trap_signals
%w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
end
module_function :trap_signals
# Pool is a simple thread pool. # Pool is a simple thread pool.
class Pool class Pool
# Default keep alive period is 1s # Default keep alive period is 1s
@ -172,17 +184,6 @@ module GRPC
# Signal check period is 0.25s # Signal check period is 0.25s
SIGNAL_CHECK_PERIOD = 0.25 SIGNAL_CHECK_PERIOD = 0.25
# Sets up a signal handler that adds signals to the signal handling global.
#
# Signal handlers should do as little as humanly possible.
# Here, they just add themselves to $grpc_signals
#
# RpcServer (and later other parts of gRPC) monitors the signals
# $grpc_signals in its own non-signal context.
def self.trap_signals
%w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
end
# setup_cq is used by #initialize to constuct a Core::CompletionQueue from # setup_cq is used by #initialize to constuct a Core::CompletionQueue from
# its arguments. # its arguments.
def self.setup_cq(alt_cq) def self.setup_cq(alt_cq)
@ -299,12 +300,12 @@ module GRPC
# Runs the server in its own thread, then waits for signal INT or TERM on # Runs the server in its own thread, then waits for signal INT or TERM on
# the current thread to terminate it. # the current thread to terminate it.
def run_till_terminated def run_till_terminated
self.class.trap_signals GRPC.trap_signals
t = Thread.new { run } t = Thread.new { run }
wait_till_running wait_till_running
loop do loop do
sleep SIGNAL_CHECK_PERIOD sleep SIGNAL_CHECK_PERIOD
break unless handle_signals break unless GRPC.handle_signals
end end
stop stop
t.join t.join

@ -0,0 +1,60 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# GRPC contains the General RPC module.
module GRPC
# Notifier is useful high-level synchronization primitive.
class Notifier
attr_reader :payload, :notified
alias_method :notified?, :notified
def initialize
@mutex = Mutex.new
@cvar = ConditionVariable.new
@notified = false
@payload = nil
end
def wait
@mutex.synchronize do
@cvar.wait(@mutex) until notified?
end
end
def notify(payload)
@mutex.synchronize do
return Error.new('already notified') if notified?
@payload = payload
@notified = true
@cvar.signal
return nil
end
end
end
end

@ -29,37 +29,8 @@
require 'grpc' require 'grpc'
# Notifier is useful high-level synchronization primitive.
class Notifier
attr_reader :payload, :notified
alias_method :notified?, :notified
def initialize
@mutex = Mutex.new
@cvar = ConditionVariable.new
@notified = false
@payload = nil
end
def wait
@mutex.synchronize do
@cvar.wait(@mutex) until notified?
end
end
def notify(payload)
@mutex.synchronize do
return Error.new('already notified') if notified?
@payload = payload
@notified = true
@cvar.signal
return nil
end
end
end
def wakey_thread(&blk) def wakey_thread(&blk)
n = Notifier.new n = GRPC::Notifier.new
t = Thread.new do t = Thread.new do
blk.call(n) blk.call(n)
end end

Loading…
Cancel
Save