From e6be7f31e45ecc180a3dff7c2d0d6ae8f72017df Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Thu, 16 Apr 2015 14:14:34 -0700 Subject: [PATCH 1/7] Refactor: Move the Pool out from RpcServer --- src/ruby/lib/grpc/generic/rpc_server.rb | 184 +++++++++--------- src/ruby/spec/generic/rpc_server_pool_spec.rb | 4 +- 2 files changed, 99 insertions(+), 89 deletions(-) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index bc2211ef7ed..6910d952832 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -38,6 +38,103 @@ $grpc_signals = [] # GRPC contains the General RPC module. module GRPC + # Pool is a simple thread pool. + class Pool + # Default keep alive period is 1s + DEFAULT_KEEP_ALIVE = 1 + + def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) + fail 'pool size must be positive' unless size > 0 + @jobs = Queue.new + @size = size + @stopped = false + @stop_mutex = Mutex.new + @stop_cond = ConditionVariable.new + @workers = [] + @keep_alive = keep_alive + end + + # Returns the number of jobs waiting + def jobs_waiting + @jobs.size + end + + # Runs the given block on the queue with the provided args. + # + # @param args the args passed blk when it is called + # @param blk the block to call + def schedule(*args, &blk) + fail 'already stopped' if @stopped + return if blk.nil? + logger.info('schedule another job') + @jobs << [blk, args] + end + + # Starts running the jobs in the thread pool. + def start + fail 'already stopped' if @stopped + until @workers.size == @size.to_i + next_thread = Thread.new do + catch(:exit) do # allows { throw :exit } to kill a thread + loop_execute_jobs + end + remove_current_thread + end + @workers << next_thread + end + end + + # Stops the jobs in the pool + def stop + logger.info('stopping, will wait for all the workers to exit') + @workers.size.times { schedule { throw :exit } } + @stopped = true + @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 + end + forcibly_stop_workers + logger.info('stopped, all workers are shutdown') + end + + protected + + # Forcibly shutdown any threads that are still alive. + def forcibly_stop_workers + return unless @workers.size > 0 + logger.info("forcibly terminating #{@workers.size} worker(s)") + @workers.each do |t| + next unless t.alive? + begin + t.exit + rescue StandardError => e + logger.warn('error while terminating a worker') + logger.warn(e) + end + end + end + + # removes the threads from workers, and signal when all the + # threads are complete. + def remove_current_thread + @stop_mutex.synchronize do + @workers.delete(Thread.current) + @stop_cond.signal if @workers.size == 0 + end + end + + def loop_execute_jobs + loop do + begin + blk, args = @jobs.pop + blk.call(*args) + rescue StandardError => e + logger.warn('Error in worker thread') + logger.warn(e) + end + end + end + end + # RpcServer hosts a number of services and makes them available on the # network. class RpcServer @@ -320,93 +417,6 @@ module GRPC an_rpc.deadline) end - # Pool is a simple thread pool for running server requests. - class Pool - # Default keep alive period is 1s - DEFAULT_KEEP_ALIVE = 1 - - def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) - fail 'pool size must be positive' unless size > 0 - @jobs = Queue.new - @size = size - @stopped = false - @stop_mutex = Mutex.new - @stop_cond = ConditionVariable.new - @workers = [] - @keep_alive = keep_alive - end - - # Returns the number of jobs waiting - def jobs_waiting - @jobs.size - end - - # Runs the given block on the queue with the provided args. - # - # @param args the args passed blk when it is called - # @param blk the block to call - def schedule(*args, &blk) - fail 'already stopped' if @stopped - return if blk.nil? - logger.info('schedule another job') - @jobs << [blk, args] - end - - # Starts running the jobs in the thread pool. - def start - fail 'already stopped' if @stopped - until @workers.size == @size.to_i - next_thread = Thread.new do - catch(:exit) do # allows { throw :exit } to kill a thread - loop do - begin - blk, args = @jobs.pop - blk.call(*args) - rescue StandardError => e - logger.warn('Error in worker thread') - logger.warn(e) - end - end - end - - # removes the threads from workers, and signal when all the - # threads are complete. - @stop_mutex.synchronize do - @workers.delete(Thread.current) - @stop_cond.signal if @workers.size == 0 - end - end - @workers << next_thread - end - end - - # Stops the jobs in the pool - def stop - logger.info('stopping, will wait for all the workers to exit') - @workers.size.times { schedule { throw :exit } } - @stopped = true - - @stop_mutex.synchronize do - @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 - end - - # Forcibly shutdown any threads that are still alive. - if @workers.size > 0 - logger.info("forcibly terminating #{@workers.size} worker(s)") - @workers.each do |t| - next unless t.alive? - begin - t.exit - rescue StandardError => e - logger.warn('error while terminating a worker') - logger.warn(e) - end - end - end - logger.info('stopped, all workers are shutdown') - end - end - protected def rpc_descs diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb index 8383dc1533e..a9a07dd06bd 100644 --- a/src/ruby/spec/generic/rpc_server_pool_spec.rb +++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb @@ -30,9 +30,9 @@ require 'grpc' require 'xray/thread_dump_signal_handler' -Pool = GRPC::RpcServer::Pool +describe GRPC::Pool do + Pool = GRPC::Pool -describe Pool do describe '#new' do it 'raises if a non-positive size is used' do expect { Pool.new(0) }.to raise_error From f9e77b3972cc84a62c76e85d6112a91db6424e7d Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Thu, 16 Apr 2015 14:50:11 -0700 Subject: [PATCH 2/7] Refactor: Shorter methods in RpcServer --- src/ruby/lib/grpc/generic/rpc_server.rb | 91 +++++++++++++------------ 1 file changed, 46 insertions(+), 45 deletions(-) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 6910d952832..8d91c31a651 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -167,6 +167,24 @@ module GRPC %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } end + # setup_cq is used by #initialize to constuct a Core::CompletionQueue from + # its arguments. + def self.setup_cq(alt_cq) + return Core::CompletionQueue.new if alt_cq.nil? + unless alt_cq.is_a? Core::CompletionQueue + fail(TypeError, '!CompletionQueue') + end + alt_cq + end + + # setup_srv is used by #initialize to constuct a Core::Server from its + # arguments. + def self.setup_srv(alt_srv, cq, **kw) + return Core::Server.new(cq, kw) if alt_srv.nil? + fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server + alt_srv + end + # Creates a new RpcServer. # # The RPC server is configured using keyword arguments. @@ -200,24 +218,8 @@ module GRPC completion_queue_override:nil, server_override:nil, **kw) - if completion_queue_override.nil? - cq = Core::CompletionQueue.new - else - cq = completion_queue_override - unless cq.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') - end - end - @cq = cq - - if server_override.nil? - srv = Core::Server.new(@cq, kw) - else - srv = server_override - fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server - end - @server = srv - + @cq = RpcServer.setup_cq(completion_queue_override) + @server = RpcServer.setup_srv(server_override, @cq, **kw) @pool_size = pool_size @max_waiting_requests = max_waiting_requests @poll_period = poll_period @@ -356,19 +358,7 @@ module GRPC end @pool.start @server.start - request_call_tag = Object.new - until stopped? - deadline = from_relative_time(@poll_period) - an_rpc = @server.request_call(@cq, request_call_tag, deadline) - next if an_rpc.nil? - c = new_active_server_call(an_rpc) - unless c.nil? - mth = an_rpc.method.to_sym - @pool.schedule(c) do |call| - rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) - end - end - end + loop_handle_server_calls @running = false end @@ -395,6 +385,23 @@ module GRPC nil end + # handles calls to the server + def loop_handle_server_calls + fail 'not running' unless @running + request_call_tag = Object.new + until stopped? + deadline = from_relative_time(@poll_period) + an_rpc = @server.request_call(@cq, request_call_tag, deadline) + c = new_active_server_call(an_rpc) + unless c.nil? + mth = an_rpc.method.to_sym + @pool.schedule(c) do |call| + rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) + end + end + end + end + def new_active_server_call(an_rpc) # Accept the call. This is necessary even if a status is to be sent # back immediately @@ -427,11 +434,9 @@ module GRPC @rpc_handlers ||= {} end - private - def assert_valid_service_class(cls) unless cls.include?(GenericService) - fail "#{cls} should 'include GenericService'" + fail "#{cls} must 'include GenericService'" end if cls.rpc_descs.size == 0 fail "#{cls} should specify some rpc descriptions" @@ -441,21 +446,17 @@ module GRPC def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class - specs = rpc_descs - handlers = rpc_handlers + specs, handlers = rpc_descs, rpc_handlers cls.rpc_descs.each_pair do |name, spec| route = "/#{cls.service_name}/#{name}".to_sym - if specs.key? route - fail "Cannot add rpc #{route} from #{spec}, already registered" + fail "already registered: rpc #{route} from #{spec}" if specs.key? route + specs[route] = spec + if service.is_a?(Class) + handlers[route] = cls.new.method(name.to_s.underscore.to_sym) else - specs[route] = spec - if service.is_a?(Class) - handlers[route] = cls.new.method(name.to_s.underscore.to_sym) - else - handlers[route] = service.method(name.to_s.underscore.to_sym) - end - logger.info("handling #{route} with #{handlers[route]}") + handlers[route] = service.method(name.to_s.underscore.to_sym) end + logger.info("handling #{route} with #{handlers[route]}") end end end From 3fd2be2e32ab9edf2030f06c707e0c5cca140755 Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Thu, 16 Apr 2015 17:43:59 -0700 Subject: [PATCH 3/7] Adds a hook for returning the client connect metadata --- src/ruby/lib/grpc/generic/rpc_server.rb | 59 ++++++++++------ src/ruby/spec/generic/rpc_server_spec.rb | 90 +++++++++++++++++------- 2 files changed, 103 insertions(+), 46 deletions(-) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 8d91c31a651..aa6c7e09895 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -38,6 +38,23 @@ $grpc_signals = [] # GRPC contains the General RPC module. module GRPC + # Handles the signals in $grpc_signals. + # + # @return false if the server should exit, true if not. + def handle_signals + loop do + sig = $grpc_signals.shift + case sig + when 'INT' + return false + when 'TERM' + return false + end + end + true + end + module_function :handle_signals + # Pool is a simple thread pool. class Pool # Default keep alive period is 1s @@ -185,6 +202,14 @@ module GRPC alt_srv end + # setup_connect_md_proc is used by #initialize to validate the + # connect_md_proc. + def self.setup_connect_md_proc(a_proc) + return nil if a_proc.nil? + fail(TypeError, '!Proc') unless a_proc.is_a? Proc + a_proc + end + # Creates a new RpcServer. # # The RPC server is configured using keyword arguments. @@ -212,14 +237,21 @@ module GRPC # * max_waiting_requests: the maximum number of requests that are not # being handled to allow. When this limit is exceeded, the server responds # with not available to new requests + # + # * connect_md_proc: + # when non-nil is a proc for determining metadata to to send back the client + # on receiving an invocation req. The proc signature is: + # {key: val, ..} func(method_name, {key: val, ...}) def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, + connect_md_proc:nil, **kw) @cq = RpcServer.setup_cq(completion_queue_override) @server = RpcServer.setup_srv(server_override, @cq, **kw) + @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @pool_size = pool_size @max_waiting_requests = max_waiting_requests @poll_period = poll_period @@ -279,22 +311,6 @@ module GRPC t.join end - # Handles the signals in $grpc_signals. - # - # @return false if the server should exit, true if not. - def handle_signals - loop do - sig = $grpc_signals.shift - case sig - when 'INT' - return false - when 'TERM' - return false - end - end - true - end - # Determines if the server is currently stopped def stopped? @stopped ||= false @@ -403,16 +419,17 @@ module GRPC end def new_active_server_call(an_rpc) - # Accept the call. This is necessary even if a status is to be sent - # back immediately return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call handle_call_tag = Object.new - an_rpc.call.metadata = an_rpc.metadata - # TODO: add a hook to send md + an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers + connect_md = nil + unless @connect_md_proc.nil? + connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) + end an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE, - SEND_INITIAL_METADATA => nil) + SEND_INITIAL_METADATA => connect_md) return nil unless available?(an_rpc) return nil unless found?(an_rpc) diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 1323bacfa60..202576c673d 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -301,21 +301,20 @@ describe GRPC::RpcServer do end describe '#run' do - before(:each) do - @client_opts = { - channel_override: @ch - } - @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc - @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) - server_opts = { - server_override: @server, - completion_queue_override: @server_queue, - poll_period: 1 - } - @srv = RpcServer.new(**server_opts) - end + let(:client_opts) { { channel_override: @ch } } + let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc } + let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) } + + context 'with no connect_metadata' do + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + end - describe 'when running' do it 'should return NOT_FOUND status on unknown methods', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -323,8 +322,8 @@ describe GRPC::RpcServer do req = EchoMsg.new blk = proc do cq = GRPC::Core::CompletionQueue.new - stub = GRPC::ClientStub.new(@host, cq, **@client_opts) - stub.request_response('/unknown', req, @marshal, @unmarshal) + stub = GRPC::ClientStub.new(@host, cq, **client_opts) + stub.request_response('/unknown', req, marshal, unmarshal) end expect(&blk).to raise_error GRPC::BadStatus @srv.stop @@ -337,7 +336,7 @@ describe GRPC::RpcServer do @srv.wait_till_running req = EchoMsg.new n = 5 # arbitrary - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) } @srv.stop t.join @@ -349,7 +348,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] expect(service.received_md).to eq(wanted_md) @@ -363,7 +362,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) deadline = service.delay + 1.0 # wait for long enough expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] @@ -378,7 +377,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) deadline = 0.1 # too short for SlowService to respond blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') } expect(&blk).to raise_error GRPC::BadStatus @@ -394,7 +393,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) Thread.new do # cancel the call sleep 0.1 @@ -411,11 +410,11 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - @client_opts[:update_metadata] = proc do |md| + client_opts[:update_metadata] = proc do |md| md[:k1] = 'updated-v1' md end - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2', 'jwt_aud_uri' => "https://#{@host}/EchoService" }] @@ -433,7 +432,7 @@ describe GRPC::RpcServer do threads = [] n.times do threads << Thread.new do - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) q << stub.an_rpc(req) end end @@ -461,7 +460,7 @@ describe GRPC::RpcServer do one_failed_as_unavailable = false n.times do threads << Thread.new do - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) begin stub.an_rpc(req) rescue GRPC::BadStatus => e @@ -474,5 +473,46 @@ describe GRPC::RpcServer do expect(one_failed_as_unavailable).to be(true) end end + + context 'with connect metadata' do + let(:test_md_proc) do + proc do |mth, md| + res = md.clone + res['method'] = mth + res['connect_k1'] = 'connect_v1' + res + end + end + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1, + connect_md_proc: test_md_proc + } + @srv = RpcServer.new(**server_opts) + end + + it 'should send connect metadata to the client', server: true do + service = EchoService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = EchoStub.new(@host, **client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + expect(op.metadata).to be nil + expect(op.execute).to be_a(EchoMsg) + wanted_md = { + 'k1' => 'v1', + 'k2' => 'v2', + 'method' => '/EchoService/an_rpc', + 'connect_k1' => 'connect_v1' + } + expect(op.metadata).to eq(wanted_md) + @srv.stop + t.join + end + end end end From 1c5faea6734f00dd8327a8dbca939b797fd5bdfb Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Fri, 17 Apr 2015 12:39:25 -0700 Subject: [PATCH 4/7] Allow BadStatus to contain metadata --- src/ruby/lib/grpc/errors.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb index 35e9c02a946..f1201c17040 100644 --- a/src/ruby/lib/grpc/errors.rb +++ b/src/ruby/lib/grpc/errors.rb @@ -36,14 +36,15 @@ module GRPC # error should be returned to the other end of a GRPC connection; when # caught it means that this end received a status error. class BadStatus < StandardError - attr_reader :code, :details + attr_reader :code, :details, :metadata # @param code [Numeric] the status code # @param details [String] the details of the exception - def initialize(code, details = 'unknown cause') + def initialize(code, details = 'unknown cause', **kw) super("#{code}:#{details}") @code = code @details = details + @metadata = kw end # Converts the exception to a GRPC::Status for use in the networking @@ -51,7 +52,7 @@ module GRPC # # @return [Status] with the same code and details def to_status - Status.new(code, details) + Struct::Status.new(code, details, @metadata) end end From 8661a43f784e6c2689466355dd5a0bdab4567346 Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Fri, 17 Apr 2015 13:29:59 -0700 Subject: [PATCH 5/7] Propagate metadata in BadStatus - allow BadStatus to contain metadata that's populated by keyword args - on servers, convert raised BadStatus metadata to trailers - on clients, convert trailers to BadStatus metadata when raising BadStatus --- src/ruby/.rubocop_todo.yml | 8 +-- src/ruby/lib/grpc/generic/active_call.rb | 13 +++- src/ruby/lib/grpc/generic/rpc_desc.rb | 10 +-- src/ruby/spec/generic/rpc_desc_spec.rb | 88 +++++++++++------------- src/ruby/spec/generic/rpc_server_spec.rb | 56 ++++++++++++++- 5 files changed, 113 insertions(+), 62 deletions(-) diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml index 02136a81a91..6b8f336055a 100644 --- a/src/ruby/.rubocop_todo.yml +++ b/src/ruby/.rubocop_todo.yml @@ -1,18 +1,18 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2015-04-16 12:30:09 -0700 using RuboCop version 0.30.0. +# on 2015-04-17 12:36:26 -0700 using RuboCop version 0.30.0. # The point is for the user to remove these configuration records # one by one as the offenses are removed from the code base. # Note that changes in the inspected code, or installation of new # versions of RuboCop, may require this file to be generated again. -# Offense count: 34 +# Offense count: 30 Metrics/AbcSize: - Max: 36 + Max: 37 # Offense count: 3 # Configuration parameters: CountComments. Metrics/ClassLength: - Max: 185 + Max: 179 # Offense count: 35 # Configuration parameters: CountComments. diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 8d63de41450..274372e4d52 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -39,7 +39,10 @@ class Struct return nil if status.nil? fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED if status.code != GRPC::Core::StatusCodes::OK - fail GRPC::BadStatus.new(status.code, status.details) + # raise BadStatus, propagating the metadata if present. + md = status.metadata + with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }] + fail GRPC::BadStatus.new(status.code, status.details, **with_sym_keys) end status end @@ -192,9 +195,13 @@ module GRPC # @param details [String] details # @param assert_finished [true, false] when true(default), waits for # FINISHED. - def send_status(code = OK, details = '', assert_finished = false) + # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + def send_status(code = OK, details = '', assert_finished = false, **kw) ops = { - SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details) + SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, kw) } ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished @call.run_batch(@cq, self, INFINITE_FUTURE, ops) diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 3e48b8e51d7..22b80d8938b 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -82,10 +82,10 @@ module GRPC end send_status(active_call, OK, 'OK') rescue BadStatus => e - # this is raised by handlers that want GRPC to send an application - # error code and detail message. + # this is raised by handlers that want GRPC to send an application error + # code and detail message and some additional app-specific metadata. logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}") - send_status(active_call, e.code, e.details) + send_status(active_call, e.code, e.details, **e.metadata) rescue Core::CallError => e # This is raised by GRPC internals but should rarely, if ever happen. # Log it, but don't notify the other endpoint.. @@ -135,9 +135,9 @@ module GRPC "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" end - def send_status(active_client, code, details) + def send_status(active_client, code, details, **kw) details = 'Not sure why' if details.nil? - active_client.send_status(code, details, code == OK) + active_client.send_status(code, details, code == OK, **kw) rescue StandardError => e logger.warn("Could not send status #{code}:#{details}") logger.warn(e) diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index a68299465ce..e5d05c8b9b1 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -52,41 +52,47 @@ describe GRPC::RpcDesc do @ok_response = Object.new end + shared_examples 'it handles errors' do + it 'sends the specified status if BadStatus is raised' do + expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) + this_desc.run_server_method(@call, method(:bad_status)) + end + + it 'sends status UNKNOWN if other StandardErrors are raised' do + expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, + false, {}) + this_desc.run_server_method(@call, method(:other_error)) + end + + it 'absorbs CallError with no further action' do + expect(@call).to receive(:remote_read).once.and_raise(CallError) + blk = proc do + this_desc.run_server_method(@call, method(:fake_reqresp)) + end + expect(&blk).to_not raise_error + end + end + describe '#run_server_method' do describe 'for request responses' do + let(:this_desc) { @request_response } before(:each) do @call = double('active_call') allow(@call).to receive(:single_req_view).and_return(@call) allow(@call).to receive(:gc) end - it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false) - @request_response.run_server_method(@call, method(:bad_status)) - end - - it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, - false) - @request_response.run_server_method(@call, method(:other_error)) - end - - it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) - blk = proc do - @request_response.run_server_method(@call, method(:fake_reqresp)) - end - expect(&blk).to_not raise_error - end + it_behaves_like 'it handles errors' it 'sends a response and closes the stream if there no errors' do req = Object.new expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true) - @request_response.run_server_method(@call, method(:fake_reqresp)) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {}) + this_desc.run_server_method(@call, method(:fake_reqresp)) end end @@ -98,13 +104,14 @@ describe GRPC::RpcDesc do end it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false) + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) @client_streamer.run_server_method(@call, method(:bad_status_alt)) end it 'sends status UNKNOWN if other StandardErrors are raised' do expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, - false) + false, {}) @client_streamer.run_server_method(@call, method(:other_error_alt)) end @@ -118,44 +125,26 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {}) @client_streamer.run_server_method(@call, method(:fake_clstream)) end end describe 'for server streaming' do + let(:this_desc) { @request_response } before(:each) do @call = double('active_call') allow(@call).to receive(:single_req_view).and_return(@call) allow(@call).to receive(:gc) end - it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false) - @server_streamer.run_server_method(@call, method(:bad_status)) - end - - it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, - false) - @server_streamer.run_server_method(@call, method(:other_error)) - end - - it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) - blk = proc do - @server_streamer.run_server_method(@call, method(:fake_svstream)) - end - expect(&blk).to_not raise_error - end + it_behaves_like 'it handles errors' it 'sends a response and closes the stream if there no errors' do req = Object.new expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).twice.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {}) @server_streamer.run_server_method(@call, method(:fake_svstream)) end end @@ -172,20 +161,21 @@ describe GRPC::RpcDesc do it 'sends the specified status if BadStatus is raised' do e = GRPC::BadStatus.new(@bs_code, 'NOK') expect(@call).to receive(:run_server_bidi).and_raise(e) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false) + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) @bidi_streamer.run_server_method(@call, method(:bad_status_alt)) end it 'sends status UNKNOWN if other StandardErrors are raised' do expect(@call).to receive(:run_server_bidi).and_raise(StandardError) expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason, - false) + false, {}) @bidi_streamer.run_server_method(@call, method(:other_error_alt)) end it 'closes the stream if there no errors' do expect(@call).to receive(:run_server_bidi) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {}) @bidi_streamer.run_server_method(@call, method(:fake_bidistream)) end end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 202576c673d..f1d95be5539 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -58,7 +58,7 @@ class NoRpcImplementation rpc :an_rpc, EchoMsg, EchoMsg end -# A test service with an implementation. +# A test service with an echo implementation. class EchoService include GRPC::GenericService rpc :an_rpc, EchoMsg, EchoMsg @@ -77,6 +77,25 @@ end EchoStub = EchoService.rpc_stub_class +# A test service with an implementation that fails with BadStatus +class FailingService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + attr_reader :details, :code, :md + + def initialize(_default_var = 'ignored') + @details = 'app error' + @code = 101 + @md = { failed_method: 'an_rpc' } + end + + def an_rpc(_req, _call) + fail GRPC::BadStatus.new(@code, @details, **@md) + end +end + +FailingStub = FailingService.rpc_stub_class + # A slow test service. class SlowService include GRPC::GenericService @@ -514,5 +533,40 @@ describe GRPC::RpcServer do t.join end end + + context 'with metadata on failing' do + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + end + + it 'should send receive metadata failed response', server: true do + service = FailingService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = FailingStub.new(@host, **client_opts) + blk = proc { stub.an_rpc(req) } + + # confirm it raise the expected error + expect(&blk).to raise_error GRPC::BadStatus + + # call again and confirm exception has the expected fields + begin + blk.call + rescue GRPC::BadStatus => e + expect(e.code).to eq(service.code) + expect(e.details).to eq(service.details) + expect(e.metadata).to eq(service.md) + end + @srv.stop + t.join + end + end end end From 9a0ae0380c71811a44789901583929a1fe9daa83 Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Fri, 17 Apr 2015 14:46:36 -0700 Subject: [PATCH 6/7] Adds support for updating the output metadata to calls --- src/ruby/.rubocop_todo.yml | 6 ++-- src/ruby/lib/grpc/generic/active_call.rb | 21 ++++++++++---- src/ruby/lib/grpc/generic/rpc_desc.rb | 2 +- src/ruby/spec/generic/rpc_desc_spec.rb | 21 ++++++++------ src/ruby/spec/generic/rpc_server_spec.rb | 35 ++++++++++++++++++++++-- 5 files changed, 64 insertions(+), 21 deletions(-) diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml index 6b8f336055a..ed4a4438b3f 100644 --- a/src/ruby/.rubocop_todo.yml +++ b/src/ruby/.rubocop_todo.yml @@ -1,5 +1,5 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2015-04-17 12:36:26 -0700 using RuboCop version 0.30.0. +# on 2015-04-17 14:43:27 -0700 using RuboCop version 0.30.0. # The point is for the user to remove these configuration records # one by one as the offenses are removed from the code base. # Note that changes in the inspected code, or installation of new @@ -7,12 +7,12 @@ # Offense count: 30 Metrics/AbcSize: - Max: 37 + Max: 40 # Offense count: 3 # Configuration parameters: CountComments. Metrics/ClassLength: - Max: 179 + Max: 184 # Offense count: 35 # Configuration parameters: CountComments. diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 274372e4d52..43ba5499059 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -122,6 +122,12 @@ module GRPC @metadata_tag = metadata_tag end + # output_metadata are provides access to hash that can be used to + # save metadata to be sent as trailer + def output_metadata + @output_metadata ||= {} + end + # multi_req_view provides a restricted view of this ActiveCall for use # in a server client-streaming handler. def multi_req_view @@ -164,10 +170,12 @@ module GRPC def finished batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, RECV_STATUS_ON_CLIENT => nil) - if @call.metadata.nil? - @call.metadata = batch_result.metadata - elsif !batch_result.metadata.nil? - @call.metadata.merge!(batch_result.metadata) + unless batch_result.status.nil? + if @call.metadata.nil? + @call.metadata = batch_result.status.metadata + else + @call.metadata.merge!(batch_result.status.metadata) + end end batch_result.check_status end @@ -445,12 +453,13 @@ module GRPC # SingleReqView limits access to an ActiveCall's methods for use in server # handlers that receive just one request. - SingleReqView = view_class(:cancelled, :deadline, :metadata) + SingleReqView = view_class(:cancelled, :deadline, :metadata, + :output_metadata) # MultiReqView limits access to an ActiveCall's methods for use in # server client_streamer handlers. MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg, - :each_remote_read, :metadata) + :each_remote_read, :metadata, :output_metadata) # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 22b80d8938b..10211ae2397 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -80,7 +80,7 @@ module GRPC else # is a bidi_stream active_call.run_server_bidi(mth) end - send_status(active_call, OK, 'OK') + send_status(active_call, OK, 'OK', **active_call.output_metadata) rescue BadStatus => e # this is raised by handlers that want GRPC to send an application error # code and detail message and some additional app-specific metadata. diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index e5d05c8b9b1..083632a080f 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -77,12 +77,12 @@ describe GRPC::RpcDesc do end describe '#run_server_method' do + let(:fake_md) { { k1: 'v1', k2: 'v2' } } describe 'for request responses' do let(:this_desc) { @request_response } before(:each) do @call = double('active_call') allow(@call).to receive(:single_req_view).and_return(@call) - allow(@call).to receive(:gc) end it_behaves_like 'it handles errors' @@ -91,7 +91,9 @@ describe GRPC::RpcDesc do req = Object.new expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {}) + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) this_desc.run_server_method(@call, method(:fake_reqresp)) end end @@ -100,7 +102,6 @@ describe GRPC::RpcDesc do before(:each) do @call = double('active_call') allow(@call).to receive(:multi_req_view).and_return(@call) - allow(@call).to receive(:gc) end it 'sends the specified status if BadStatus is raised' do @@ -125,7 +126,9 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {}) + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @client_streamer.run_server_method(@call, method(:fake_clstream)) end end @@ -135,7 +138,6 @@ describe GRPC::RpcDesc do before(:each) do @call = double('active_call') allow(@call).to receive(:single_req_view).and_return(@call) - allow(@call).to receive(:gc) end it_behaves_like 'it handles errors' @@ -144,7 +146,9 @@ describe GRPC::RpcDesc do req = Object.new expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).twice.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {}) + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @server_streamer.run_server_method(@call, method(:fake_svstream)) end end @@ -155,7 +159,6 @@ describe GRPC::RpcDesc do enq_th, rwl_th = double('enqueue_th'), ('read_write_loop_th') allow(enq_th).to receive(:join) allow(rwl_th).to receive(:join) - allow(@call).to receive(:gc) end it 'sends the specified status if BadStatus is raised' do @@ -175,7 +178,9 @@ describe GRPC::RpcDesc do it 'closes the stream if there no errors' do expect(@call).to receive(:run_server_bidi) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {}) + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @bidi_streamer.run_server_method(@call, method(:fake_bidistream)) end end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index f1d95be5539..c15d96926bc 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -64,12 +64,14 @@ class EchoService rpc :an_rpc, EchoMsg, EchoMsg attr_reader :received_md - def initialize(_default_var = 'ignored') + def initialize(**kw) + @trailing_metadata = kw @received_md = [] end def an_rpc(req, call) logger.info('echo service received a request') + call.output_metadata.update(@trailing_metadata) @received_md << call.metadata unless call.metadata.nil? req end @@ -534,7 +536,7 @@ describe GRPC::RpcServer do end end - context 'with metadata on failing' do + context 'with returned metadata on failing' do before(:each) do server_opts = { server_override: @server, @@ -544,7 +546,7 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end - it 'should send receive metadata failed response', server: true do + it 'should receive the metadata in the BadStatus', server: true do service = FailingService.new @srv.handle(service) t = Thread.new { @srv.run } @@ -568,5 +570,32 @@ describe GRPC::RpcServer do t.join end end + + context 'with returned metadata on passing' do + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + end + + it 'should send connect metadata to the client', server: true do + wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' } + service = EchoService.new(k1: 'out_v1', k2: 'out_v2') + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = EchoStub.new(@host, **client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + expect(op.metadata).to be nil + expect(op.execute).to be_a(EchoMsg) + expect(op.metadata).to eq(wanted_trailers) + @srv.stop + t.join + end + end end end From a80aa7d86a1aa1ae64780c23341fecfb06fac640 Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Fri, 17 Apr 2015 16:49:30 -0700 Subject: [PATCH 7/7] clarify test descriptions --- src/ruby/spec/generic/rpc_server_spec.rb | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index c15d96926bc..2af569e0cd4 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -536,7 +536,7 @@ describe GRPC::RpcServer do end end - context 'with returned metadata on failing' do + context 'with trailing metadata' do before(:each) do server_opts = { server_override: @server, @@ -546,7 +546,7 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end - it 'should receive the metadata in the BadStatus', server: true do + it 'should be added to BadStatus when requests fail', server: true do service = FailingService.new @srv.handle(service) t = Thread.new { @srv.run } @@ -558,7 +558,7 @@ describe GRPC::RpcServer do # confirm it raise the expected error expect(&blk).to raise_error GRPC::BadStatus - # call again and confirm exception has the expected fields + # call again and confirm exception contained the trailing metadata. begin blk.call rescue GRPC::BadStatus => e @@ -569,19 +569,8 @@ describe GRPC::RpcServer do @srv.stop t.join end - end - context 'with returned metadata on passing' do - before(:each) do - server_opts = { - server_override: @server, - completion_queue_override: @server_queue, - poll_period: 1 - } - @srv = RpcServer.new(**server_opts) - end - - it 'should send connect metadata to the client', server: true do + it 'should be received by the client', server: true do wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' } service = EchoService.new(k1: 'out_v1', k2: 'out_v2') @srv.handle(service)