Update RPC server to use the new call API

pull/1188/head
Tim Emiola 10 years ago
parent 5684b4073c
commit b22a21ebe7
  1. 79
      src/ruby/lib/grpc/generic/rpc_server.rb

@ -38,7 +38,7 @@ module GRPC
# RpcServer hosts a number of services and makes them available on the # RpcServer hosts a number of services and makes them available on the
# network. # network.
class RpcServer class RpcServer
include Core::CompletionType include Core::CallOps
include Core::TimeConsts include Core::TimeConsts
extend ::Forwardable extend ::Forwardable
@ -202,20 +202,14 @@ module GRPC
end end
@pool.start @pool.start
@server.start @server.start
server_tag = Object.new request_call_tag = Object.new
until stopped? until stopped?
@server.request_call(server_tag) deadline = from_relative_time(@poll_period)
ev = @cq.pluck(server_tag, @poll_period) an_rpc = @server.request_call(@cq, request_call_tag, deadline)
next if ev.nil? next if an_rpc.nil?
if ev.type != SERVER_RPC_NEW c = new_active_server_call(an_rpc)
logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
ev.close
next
end
c = new_active_server_call(ev.call, ev.result)
unless c.nil? unless c.nil?
mth = ev.result.method.to_sym mth = an_rpc.method.to_sym
ev.close
@pool.schedule(c) do |call| @pool.schedule(c) do |call|
rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end end
@ -224,46 +218,49 @@ module GRPC
@running = false @running = false
end end
def new_active_server_call(call, new_server_rpc) # Sends UNAVAILABLE if there are too many unprocessed jobs
# Accept the call. This is necessary even if a status is to be sent def available?(an_rpc)
# back immediately
finished_tag = Object.new
call_queue = Core::CompletionQueue.new
call.metadata = new_server_rpc.metadata # store the metadata
call.server_accept(call_queue, finished_tag)
call.server_end_initial_metadata
# Send UNAVAILABLE if there are too many unprocessed jobs
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
logger.info("waiting: #{jobs_count}, max: #{max}") logger.info("waiting: #{jobs_count}, max: #{max}")
if @pool.jobs_waiting > @max_waiting_requests return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}") logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x } noop = proc { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop, c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
new_server_rpc.deadline,
finished_tag: finished_tag)
c.send_status(StatusCodes::UNAVAILABLE, '') c.send_status(StatusCodes::UNAVAILABLE, '')
return nil nil
end end
# Send NOT_FOUND if the method does not exist # Sends NOT_FOUND if the method can't be found
mth = new_server_rpc.method.to_sym def found?(an_rpc)
unless rpc_descs.key?(mth) mth = an_rpc.method.to_sym
logger.warn("NOT_FOUND: #{new_server_rpc}") return an_rpc if rpc_descs.key?(mth)
logger.warn("NOT_FOUND: #{an_rpc}")
noop = proc { |x| x } noop = proc { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop, c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
new_server_rpc.deadline,
finished_tag: finished_tag)
c.send_status(StatusCodes::NOT_FOUND, '') c.send_status(StatusCodes::NOT_FOUND, '')
return nil nil
end end
def new_active_server_call(an_rpc)
# Accept the call. This is necessary even if a status is to be sent
# back immediately
return nil if an_rpc.nil? || an_rpc.call.nil?
# allow the metadata to be accessed from the call
handle_call_tag = Object.new
an_rpc.call.metadata = an_rpc.metadata
# TODO: add a hook to send md
an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE,
SEND_INITIAL_METADATA => nil)
return nil unless available?(an_rpc)
return nil unless found?(an_rpc)
# Create the ActiveCall # Create the ActiveCall
rpc_desc = rpc_descs[mth] logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym]
ActiveCall.new(call, call_queue, ActiveCall.new(an_rpc.call, @cq,
rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
new_server_rpc.deadline, finished_tag: finished_tag) an_rpc.deadline)
end end
# Pool is a simple thread pool for running server requests. # Pool is a simple thread pool for running server requests.

Loading…
Cancel
Save