Merge pull request #3144 from tbetbetbe/grpc-ruby-fix-math-client-server

Grpc ruby fix math client and server
pull/3171/head
Stanley Cheung 10 years ago
commit a0581622bb
  1. 6
      src/ruby/bin/math_client.rb
  2. 16
      src/ruby/bin/math_server.rb
  3. 30
      src/ruby/ext/grpc/rb_server.c
  4. 15
      src/ruby/lib/grpc/generic/rpc_server.rb

@ -50,7 +50,7 @@ def do_div(stub)
GRPC.logger.info('----------------') GRPC.logger.info('----------------')
req = Math::DivArgs.new(dividend: 7, divisor: 3) req = Math::DivArgs.new(dividend: 7, divisor: 3)
GRPC.logger.info("div(7/3): req=#{req.inspect}") GRPC.logger.info("div(7/3): req=#{req.inspect}")
resp = stub.div(req, INFINITE_FUTURE) resp = stub.div(req, timeout: INFINITE_FUTURE)
GRPC.logger.info("Answer: #{resp.inspect}") GRPC.logger.info("Answer: #{resp.inspect}")
GRPC.logger.info('----------------') GRPC.logger.info('----------------')
end end
@ -71,7 +71,7 @@ def do_fib(stub)
GRPC.logger.info('----------------') GRPC.logger.info('----------------')
req = Math::FibArgs.new(limit: 11) req = Math::FibArgs.new(limit: 11)
GRPC.logger.info("fib(11): req=#{req.inspect}") GRPC.logger.info("fib(11): req=#{req.inspect}")
resp = stub.fib(req, INFINITE_FUTURE) resp = stub.fib(req, timeout: INFINITE_FUTURE)
resp.each do |r| resp.each do |r|
GRPC.logger.info("Answer: #{r.inspect}") GRPC.logger.info("Answer: #{r.inspect}")
end end
@ -86,7 +86,7 @@ def do_div_many(stub)
reqs << Math::DivArgs.new(dividend: 5, divisor: 2) reqs << Math::DivArgs.new(dividend: 5, divisor: 2)
reqs << Math::DivArgs.new(dividend: 7, divisor: 2) reqs << Math::DivArgs.new(dividend: 7, divisor: 2)
GRPC.logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}") GRPC.logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}")
resp = stub.div_many(reqs, 10) resp = stub.div_many(reqs, timeout: INFINITE_FUTURE)
resp.each do |r| resp.each do |r|
GRPC.logger.info("Answer: #{r.inspect}") GRPC.logger.info("Answer: #{r.inspect}")
end end

@ -41,9 +41,25 @@ $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
require 'forwardable' require 'forwardable'
require 'grpc' require 'grpc'
require 'logger'
require 'math_services' require 'math_services'
require 'optparse' require 'optparse'
# RubyLogger defines a logger for gRPC based on the standard ruby logger.
module RubyLogger
def logger
LOGGER
end
LOGGER = Logger.new(STDOUT)
end
# GRPC is the general RPC module
module GRPC
# Inject the noop #logger if no module-level logger method has been injected.
extend RubyLogger
end
# Holds state for a fibonacci series # Holds state for a fibonacci series
class Fibber class Fibber
def initialize(limit) def initialize(limit)

@ -234,6 +234,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
grpc_call_error_detail_of(err), err); grpc_call_error_detail_of(err), err);
return Qnil; return Qnil;
} }
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout); ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout);
if (ev.type == GRPC_QUEUE_TIMEOUT) { if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_request_call_stack_cleanup(&st); grpc_request_call_stack_cleanup(&st);
@ -298,36 +299,15 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) {
if (s->wrapped != NULL) { if (s->wrapped != NULL) {
grpc_server_shutdown_and_notify(s->wrapped, cq, NULL); grpc_server_shutdown_and_notify(s->wrapped, cq, NULL);
ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout); ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout);
if (!ev.success) { if (!ev.success) {
rb_warn("server shutdown failed, there will be a LEAKED object warning"); rb_warn("server shutdown failed, cancelling the calls, objects may leak");
return Qnil; grpc_server_cancel_all_calls(s->wrapped);
/* return Qfalse;
TODO: renable the rb_raise below.
At the moment if the timeout is INFINITE_FUTURE as recommended, the
pluck blocks forever, even though
the outstanding server_request_calls correctly fail on the other
thread that they are running on.
it's almost as if calls that fail on the other thread do not get
cleaned up by shutdown request, even though it caused htem to
terminate.
rb_raise(rb_eRuntimeError, "grpc server shutdown did not succeed");
return Qnil;
The workaround is just to use a timeout and return without really
shutting down the server, and rely on the grpc core garbage collection
it down as a 'LEAKED OBJECT'.
*/
} }
grpc_server_destroy(s->wrapped); grpc_server_destroy(s->wrapped);
s->wrapped = NULL; s->wrapped = NULL;
} }
return Qnil; return Qtrue;
} }
/* /*

@ -277,10 +277,11 @@ module GRPC
@stop_mutex.synchronize do @stop_mutex.synchronize do
@stopped = true @stopped = true
end end
@pool.stop
deadline = from_relative_time(@poll_period) deadline = from_relative_time(@poll_period)
return if @server.close(@cq, deadline)
deadline = from_relative_time(@poll_period)
@server.close(@cq, deadline) @server.close(@cq, deadline)
@pool.stop
end end
# determines if the server has been stopped # determines if the server has been stopped
@ -383,7 +384,6 @@ module GRPC
@pool.start @pool.start
@server.start @server.start
loop_handle_server_calls loop_handle_server_calls
@running = false
end end
# Sends UNAVAILABLE if there are too many unprocessed jobs # Sends UNAVAILABLE if there are too many unprocessed jobs
@ -414,14 +414,13 @@ module GRPC
fail 'not running' unless @running fail 'not running' unless @running
loop_tag = Object.new loop_tag = Object.new
until stopped? until stopped?
deadline = from_relative_time(@poll_period)
begin begin
an_rpc = @server.request_call(@cq, loop_tag, deadline) an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE)
c = new_active_server_call(an_rpc) c = new_active_server_call(an_rpc)
rescue Core::CallError, RuntimeError => e rescue Core::CallError, RuntimeError => e
# these might happen for various reasonse. The correct behaviour of # these might happen for various reasonse. The correct behaviour of
# the server is to log them and continue. # the server is to log them and continue, if it's not shutting down.
GRPC.logger.warn("server call failed: #{e}") GRPC.logger.warn("server call failed: #{e}") unless stopped?
next next
end end
unless c.nil? unless c.nil?
@ -431,6 +430,8 @@ module GRPC
end end
end end
end end
@running = false
GRPC.logger.info("stopped: #{self}")
end end
def new_active_server_call(an_rpc) def new_active_server_call(an_rpc)

Loading…
Cancel
Save