Updates the module name in the idiomatic and stub layers

pull/567/head
Tim Emiola 10 years ago
parent 409e6c804b
commit 7e7911f70d
  1. 2
      src/ruby/lib/grpc.rb
  2. 60
      src/ruby/lib/grpc/auth/compute_engine.rb
  3. 42
      src/ruby/lib/grpc/auth/service_account.rb
  4. 21
      src/ruby/lib/grpc/core/event.rb
  5. 69
      src/ruby/lib/grpc/core/time_consts.rb
  6. 50
      src/ruby/lib/grpc/errors.rb
  7. 904
      src/ruby/lib/grpc/generic/active_call.rb
  8. 318
      src/ruby/lib/grpc/generic/bidi_call.rb
  9. 704
      src/ruby/lib/grpc/generic/client_stub.rb
  10. 201
      src/ruby/lib/grpc/generic/rpc_desc.rb
  11. 640
      src/ruby/lib/grpc/generic/rpc_server.rb
  12. 312
      src/ruby/lib/grpc/generic/service.rb
  13. 6
      src/ruby/lib/grpc/logconfig.rb
  14. 8
      src/ruby/lib/grpc/version.rb
  15. 4
      src/ruby/spec/auth/compute_engine_spec.rb
  16. 4
      src/ruby/spec/auth/service_account_spec.rb

@ -41,4 +41,4 @@ require 'grpc/generic/service'
require 'grpc/generic/rpc_server' require 'grpc/generic/rpc_server'
# alias GRPC # alias GRPC
GRPC = Google::RPC GRPC = GRPC

@ -30,39 +30,37 @@
require 'faraday' require 'faraday'
require 'grpc/auth/signet' require 'grpc/auth/signet'
module Google module GRPC
module RPC # Module Auth provides classes that provide Google-specific authentication
# Module Auth provides classes that provide Google-specific authentication # used to access Google gRPC services.
# used to access Google gRPC services. module Auth
module Auth # Extends Signet::OAuth2::Client so that the auth token is obtained from
# Extends Signet::OAuth2::Client so that the auth token is obtained from # the GCE metadata server.
# the GCE metadata server. class GCECredentials < Signet::OAuth2::Client
class GCECredentials < Signet::OAuth2::Client COMPUTE_AUTH_TOKEN_URI = 'http://metadata/computeMetadata/v1/'\
COMPUTE_AUTH_TOKEN_URI = 'http://metadata/computeMetadata/v1/'\ 'instance/service-accounts/default/token'
'instance/service-accounts/default/token' COMPUTE_CHECK_URI = 'http://metadata.google.internal'
COMPUTE_CHECK_URI = 'http://metadata.google.internal'
# Detect if this appear to be a GCE instance, by checking if metadata # Detect if this appear to be a GCE instance, by checking if metadata
# is available # is available
def self.on_gce?(options = {}) def self.on_gce?(options = {})
c = options[:connection] || Faraday.default_connection c = options[:connection] || Faraday.default_connection
resp = c.get(COMPUTE_CHECK_URI) resp = c.get(COMPUTE_CHECK_URI)
return false unless resp.status == 200 return false unless resp.status == 200
return false unless resp.headers.key?('Metadata-Flavor') return false unless resp.headers.key?('Metadata-Flavor')
return resp.headers['Metadata-Flavor'] == 'Google' return resp.headers['Metadata-Flavor'] == 'Google'
rescue Faraday::ConnectionFailed rescue Faraday::ConnectionFailed
return false return false
end end
# Overrides the super class method to change how access tokens are # Overrides the super class method to change how access tokens are
# fetched. # fetched.
def fetch_access_token(options = {}) def fetch_access_token(options = {})
c = options[:connection] || Faraday.default_connection c = options[:connection] || Faraday.default_connection
c.headers = { 'Metadata-Flavor' => 'Google' } c.headers = { 'Metadata-Flavor' => 'Google' }
resp = c.get(COMPUTE_AUTH_TOKEN_URI) resp = c.get(COMPUTE_AUTH_TOKEN_URI)
Signet::OAuth2.parse_credentials(resp.body, Signet::OAuth2.parse_credentials(resp.body,
resp.headers['content-type']) resp.headers['content-type'])
end
end end
end end
end end

@ -39,29 +39,27 @@ def read_json_key(json_key_io)
[json_key['private_key'], json_key['client_email']] [json_key['private_key'], json_key['client_email']]
end end
module Google module GRPC
module RPC # Module Auth provides classes that provide Google-specific authentication
# Module Auth provides classes that provide Google-specific authentication # used to access Google gRPC services.
# used to access Google gRPC services. module Auth
module Auth # Authenticates requests using Google's Service Account credentials.
# Authenticates requests using Google's Service Account credentials. # (cf https://developers.google.com/accounts/docs/OAuth2ServiceAccount)
# (cf https://developers.google.com/accounts/docs/OAuth2ServiceAccount) class ServiceAccountCredentials < Signet::OAuth2::Client
class ServiceAccountCredentials < Signet::OAuth2::Client TOKEN_CRED_URI = 'https://www.googleapis.com/oauth2/v3/token'
TOKEN_CRED_URI = 'https://www.googleapis.com/oauth2/v3/token' AUDIENCE = TOKEN_CRED_URI
AUDIENCE = TOKEN_CRED_URI
# Initializes a ServiceAccountCredentials. # Initializes a ServiceAccountCredentials.
# #
# @param scope [string|array] the scope(s) to access # @param scope [string|array] the scope(s) to access
# @param json_key_io [IO] an IO from which the JSON key can be read # @param json_key_io [IO] an IO from which the JSON key can be read
def initialize(scope, json_key_io) def initialize(scope, json_key_io)
private_key, client_email = read_json_key(json_key_io) private_key, client_email = read_json_key(json_key_io)
super(token_credential_uri: TOKEN_CRED_URI, super(token_credential_uri: TOKEN_CRED_URI,
audience: AUDIENCE, audience: AUDIENCE,
scope: scope, scope: scope,
issuer: client_email, issuer: client_email,
signing_key: OpenSSL::PKey::RSA.new(private_key)) signing_key: OpenSSL::PKey::RSA.new(private_key))
end
end end
end end
end end

@ -27,16 +27,17 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
module Google require 'grpc'
module RPC
module Core # GRPC contains the General RPC module.
# Event is a class defined in the c extension module GRPC
# module Core
# Here, we add an inspect method. # Event is a class defined in the c extension
class Event #
def inspect # Here, we add an inspect method.
"<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>" class Event
end def inspect
"<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>"
end end
end end
end end

@ -29,44 +29,43 @@
require 'grpc' require 'grpc'
module Google # GRPC contains the General RPC module.
module RPC module GRPC
module Core module Core
# TimeConsts is a module from the C extension. # TimeConsts is a module from the C extension.
#
# Here it's re-opened to add a utility func.
module TimeConsts
# Converts a time delta to an absolute deadline.
# #
# Here it's re-opened to add a utility func. # Assumes timeish is a relative time, and converts its to an absolute,
module TimeConsts # with following exceptions:
# Converts a time delta to an absolute deadline. #
# # * if timish is one of the TimeConsts.TimeSpec constants the value is
# Assumes timeish is a relative time, and converts its to an absolute, # preserved.
# with following exceptions: # * timish < 0 => TimeConsts.INFINITE_FUTURE
# # * timish == 0 => TimeConsts.ZERO
# * if timish is one of the TimeConsts.TimeSpec constants the value is #
# preserved. # @param timeish [Number|TimeSpec]
# * timish < 0 => TimeConsts.INFINITE_FUTURE # @return timeish [Number|TimeSpec]
# * timish == 0 => TimeConsts.ZERO def from_relative_time(timeish)
# if timeish.is_a? TimeSpec
# @param timeish [Number|TimeSpec] timeish
# @return timeish [Number|TimeSpec] elsif timeish.nil?
def from_relative_time(timeish) TimeConsts::ZERO
if timeish.is_a? TimeSpec elsif !timeish.is_a? Numeric
timeish fail(TypeError,
elsif timeish.nil? "Cannot make an absolute deadline from #{timeish.inspect}")
TimeConsts::ZERO elsif timeish < 0
elsif !timeish.is_a? Numeric TimeConsts::INFINITE_FUTURE
fail(TypeError, elsif timeish == 0
"Cannot make an absolute deadline from #{timeish.inspect}") TimeConsts::ZERO
elsif timeish < 0 else
TimeConsts::INFINITE_FUTURE Time.now + timeish
elsif timeish == 0
TimeConsts::ZERO
else
Time.now + timeish
end
end end
module_function :from_relative_time
end end
module_function :from_relative_time
end end
end end
end end

@ -29,35 +29,33 @@
require 'grpc' require 'grpc'
module Google # GRPC contains the General RPC module.
# Google::RPC contains the General RPC module. module GRPC
module RPC # OutOfTime is an exception class that indicates that an RPC exceeded its
# OutOfTime is an exception class that indicates that an RPC exceeded its # deadline.
# deadline. OutOfTime = Class.new(StandardError)
OutOfTime = Class.new(StandardError)
# BadStatus is an exception class that indicates that an error occurred at # BadStatus is an exception class that indicates that an error occurred at
# either end of a GRPC connection. When raised, it indicates that a status # either end of a GRPC connection. When raised, it indicates that a status
# error should be returned to the other end of a GRPC connection; when # error should be returned to the other end of a GRPC connection; when
# caught it means that this end received a status error. # caught it means that this end received a status error.
class BadStatus < StandardError class BadStatus < StandardError
attr_reader :code, :details attr_reader :code, :details
# @param code [Numeric] the status code # @param code [Numeric] the status code
# @param details [String] the details of the exception # @param details [String] the details of the exception
def initialize(code, details = 'unknown cause') def initialize(code, details = 'unknown cause')
super("#{code}:#{details}") super("#{code}:#{details}")
@code = code @code = code
@details = details @details = details
end end
# Converts the exception to a GRPC::Status for use in the networking # Converts the exception to a GRPC::Status for use in the networking
# wrapper layer. # wrapper layer.
# #
# @return [Status] with the same code and details # @return [Status] with the same code and details
def to_status def to_status
Status.new(code, details) Status.new(code, details)
end
end end
end end
end end

@ -36,502 +36,500 @@ def assert_event_type(ev, want)
fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want
end end
module Google # GRPC contains the General RPC module.
# Google::RPC contains the General RPC module. module GRPC
module RPC # The ActiveCall class provides simple methods for sending marshallable
# The ActiveCall class provides simple methods for sending marshallable # data to a call
# data to a call class ActiveCall
class ActiveCall include Core::CompletionType
include Core::CompletionType include Core::StatusCodes
include Core::StatusCodes include Core::TimeConsts
include Core::TimeConsts attr_reader(:deadline)
attr_reader(:deadline)
# client_invoke begins a client invocation.
# client_invoke begins a client invocation. #
# # Flow Control note: this blocks until flow control accepts that client
# Flow Control note: this blocks until flow control accepts that client # request can go ahead.
# request can go ahead. #
# # deadline is the absolute deadline for the call.
# deadline is the absolute deadline for the call. #
# # == Keyword Arguments ==
# == Keyword Arguments == # any keyword arguments are treated as metadata to be sent to the server
# any keyword arguments are treated as metadata to be sent to the server # if a keyword value is a list, multiple metadata for it's key are sent
# if a keyword value is a list, multiple metadata for it's key are sent #
# # @param call [Call] a call on which to start and invocation
# @param call [Call] a call on which to start and invocation # @param q [CompletionQueue] the completion queue
# @param q [CompletionQueue] the completion queue # @param deadline [Fixnum,TimeSpec] the deadline
# @param deadline [Fixnum,TimeSpec] the deadline def self.client_invoke(call, q, _deadline, **kw)
def self.client_invoke(call, q, _deadline, **kw) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue
unless q.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue')
fail(ArgumentError, 'not a CompletionQueue')
end
call.add_metadata(kw) if kw.length > 0
client_metadata_read = Object.new
finished_tag = Object.new
call.invoke(q, client_metadata_read, finished_tag)
[finished_tag, client_metadata_read]
end
# Creates an ActiveCall.
#
# ActiveCall should only be created after a call is accepted. That
# means different things on a client and a server. On the client, the
# call is accepted after calling call.invoke. On the server, this is
# after call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
# the ActiveCall methods are called.
#
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
# @param q [CompletionQueue] the completion queue used to accept
# the call
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag,
# if the call has begun
# @param read_metadata_tag [Object] the object used as the call's finish
# tag, if the call has begun
# @param started [true|false] indicates if the call has begun
def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
read_metadata_tag: nil, started: true)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@call = call
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@read_metadata_tag = read_metadata_tag
@marshal = marshal
@started = started
@unmarshal = unmarshal
end end
call.add_metadata(kw) if kw.length > 0
client_metadata_read = Object.new
finished_tag = Object.new
call.invoke(q, client_metadata_read, finished_tag)
[finished_tag, client_metadata_read]
end
# Obtains the status of the call. # Creates an ActiveCall.
# #
# this value is nil until the call completes # ActiveCall should only be created after a call is accepted. That
# @return this call's status # means different things on a client and a server. On the client, the
def status # call is accepted after calling call.invoke. On the server, this is
@call.status # after call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
# the ActiveCall methods are called.
#
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
# @param q [CompletionQueue] the completion queue used to accept
# the call
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag,
# if the call has begun
# @param read_metadata_tag [Object] the object used as the call's finish
# tag, if the call has begun
# @param started [true|false] indicates if the call has begun
def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
read_metadata_tag: nil, started: true)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end end
@call = call
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@read_metadata_tag = read_metadata_tag
@marshal = marshal
@started = started
@unmarshal = unmarshal
end
# Obtains the metadata of the call. # Obtains the status of the call.
# #
# At the start of the call this will be nil. During the call this gets # this value is nil until the call completes
# some values as soon as the other end of the connection acknowledges the # @return this call's status
# request. def status
# @call.status
# @return this calls's metadata end
def metadata
@call.metadata
end
# Cancels the call. # Obtains the metadata of the call.
# #
# Cancels the call. The call does not return any result, but once this it # At the start of the call this will be nil. During the call this gets
# has been called, the call should eventually terminate. Due to potential # some values as soon as the other end of the connection acknowledges the
# races between the execution of the cancel and the in-flight request, the # request.
# result of the call after calling #cancel is indeterminate: #
# # @return this calls's metadata
# - the call may terminate with a BadStatus exception, with code=CANCELLED def metadata
# - the call may terminate with OK Status, and return a response @call.metadata
# - the call may terminate with a different BadStatus exception if that end
# was happening
def cancel
@call.cancel
end
# indicates if the call is shutdown # Cancels the call.
def shutdown #
@shutdown ||= false # Cancels the call. The call does not return any result, but once this it
end # has been called, the call should eventually terminate. Due to potential
# races between the execution of the cancel and the in-flight request, the
# result of the call after calling #cancel is indeterminate:
#
# - the call may terminate with a BadStatus exception, with code=CANCELLED
# - the call may terminate with OK Status, and return a response
# - the call may terminate with a different BadStatus exception if that
# was happening
def cancel
@call.cancel
end
# indicates if the call is cancelled. # indicates if the call is shutdown
def cancelled def shutdown
@cancelled ||= false @shutdown ||= false
end end
# multi_req_view provides a restricted view of this ActiveCall for use # indicates if the call is cancelled.
# in a server client-streaming handler. def cancelled
def multi_req_view @cancelled ||= false
MultiReqView.new(self) end
end
# single_req_view provides a restricted view of this ActiveCall for use in # multi_req_view provides a restricted view of this ActiveCall for use
# a server request-response handler. # in a server client-streaming handler.
def single_req_view def multi_req_view
SingleReqView.new(self) MultiReqView.new(self)
end end
# operation provides a restricted view of this ActiveCall for use as # single_req_view provides a restricted view of this ActiveCall for use in
# a Operation. # a server request-response handler.
def operation def single_req_view
Operation.new(self) SingleReqView.new(self)
end end
# writes_done indicates that all writes are completed. # operation provides a restricted view of this ActiveCall for use as
# # a Operation.
# It blocks until the remote endpoint acknowledges by sending a FINISHED def operation
# event, unless assert_finished is set to false. Any calls to Operation.new(self)
# #remote_send after this call will fail. end
#
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def writes_done(assert_finished = true)
@call.writes_done(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
logger.debug("Writes done: waiting for finish? #{assert_finished}")
ensure
ev.close
end
return unless assert_finished # writes_done indicates that all writes are completed.
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) #
fail 'unexpected nil event' if ev.nil? # It blocks until the remote endpoint acknowledges by sending a FINISHED
# event, unless assert_finished is set to false. Any calls to
# #remote_send after this call will fail.
#
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def writes_done(assert_finished = true)
@call.writes_done(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
logger.debug("Writes done: waiting for finish? #{assert_finished}")
ensure
ev.close ev.close
@call.status
end end
# finished waits until the call is completed. return unless assert_finished
# ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
# It blocks until the remote endpoint acknowledges by sending a FINISHED fail 'unexpected nil event' if ev.nil?
# event. ev.close
def finished @call.status
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) end
begin
fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
if @call.metadata.nil?
@call.metadata = ev.result.metadata
else
@call.metadata.merge!(ev.result.metadata)
end
if ev.result.code != Core::StatusCodes::OK
fail BadStatus.new(ev.result.code, ev.result.details)
end
res = ev.result
ensure
ev.close
end
res
end
# remote_send sends a request to the remote endpoint. # finished waits until the call is completed.
# #
# It blocks until the remote endpoint acknowledges by sending a # It blocks until the remote endpoint acknowledges by sending a FINISHED
# WRITE_ACCEPTED. req can be marshalled already. # event.
# def finished
# @param req [Object, String] the object to send or it's marshal form. ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
# @param marshalled [false, true] indicates if the object is already begin
# marshalled. fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
def remote_send(req, marshalled = false) if @call.metadata.nil?
assert_queue_is_ready @call.metadata = ev.result.metadata
logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
if marshalled
payload = req
else else
payload = @marshal.call(req) @call.metadata.merge!(ev.result.metadata)
end
@call.start_write(Core::ByteBuffer.new(payload), self)
# call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
# until the flow control allows another send on this call.
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, WRITE_ACCEPTED)
ensure
ev.close
end end
end
# send_status sends a status to the remote endpoint if ev.result.code != Core::StatusCodes::OK
# fail BadStatus.new(ev.result.code, ev.result.details)
# @param code [int] the status code to send
# @param details [String] details
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def send_status(code = OK, details = '', assert_finished = false)
assert_queue_is_ready
@call.start_write_status(code, details, self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
ensure
ev.close
end end
logger.debug("Status sent: #{code}:'#{details}'") res = ev.result
return finished if assert_finished ensure
nil ev.close
end end
res
end
# remote_read reads a response from the remote endpoint. # remote_send sends a request to the remote endpoint.
# #
# It blocks until the remote endpoint sends a READ or FINISHED event. On # It blocks until the remote endpoint acknowledges by sending a
# a READ, it returns the response after unmarshalling it. On # WRITE_ACCEPTED. req can be marshalled already.
# FINISHED, it returns nil if the status is OK, otherwise raising #
# BadStatus # @param req [Object, String] the object to send or it's marshal form.
def remote_read # @param marshalled [false, true] indicates if the object is already
if @call.metadata.nil? && !@read_metadata_tag.nil? # marshalled.
ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE) def remote_send(req, marshalled = false)
assert_event_type(ev, CLIENT_METADATA_READ) assert_queue_is_ready
@call.metadata = ev.result logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
@read_metadata_tag = nil if marshalled
end payload = req
else
payload = @marshal.call(req)
end
@call.start_write(Core::ByteBuffer.new(payload), self)
# call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
# until the flow control allows another send on this call.
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, WRITE_ACCEPTED)
ensure
ev.close
end
end
@call.start_read(self) # send_status sends a status to the remote endpoint
ev = @cq.pluck(self, INFINITE_FUTURE) #
begin # @param code [int] the status code to send
assert_event_type(ev, READ) # @param details [String] details
logger.debug("received req: #{ev.result.inspect}") # @param assert_finished [true, false] when true(default), waits for
unless ev.result.nil? # FINISHED.
logger.debug("received req.to_s: #{ev.result}") def send_status(code = OK, details = '', assert_finished = false)
res = @unmarshal.call(ev.result.to_s) assert_queue_is_ready
logger.debug("received_req (unmarshalled): #{res.inspect}") @call.start_write_status(code, details, self)
return res ev = @cq.pluck(self, INFINITE_FUTURE)
end begin
ensure assert_event_type(ev, FINISH_ACCEPTED)
ev.close ensure
end ev.close
logger.debug('found nil; the final response has been sent')
nil
end end
logger.debug("Status sent: #{code}:'#{details}'")
return finished if assert_finished
nil
end
# each_remote_read passes each response to the given block or returns an # remote_read reads a response from the remote endpoint.
# enumerator the responses if no block is given. #
# # It blocks until the remote endpoint sends a READ or FINISHED event. On
# == Enumerator == # a READ, it returns the response after unmarshalling it. On
# # FINISHED, it returns nil if the status is OK, otherwise raising
# * #next blocks until the remote endpoint sends a READ or FINISHED # BadStatus
# * for each read, enumerator#next yields the response def remote_read
# * on status if @call.metadata.nil? && !@read_metadata_tag.nil?
# * if it's is OK, enumerator#next raises StopException ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
# * if is not OK, enumerator#next raises RuntimeException assert_event_type(ev, CLIENT_METADATA_READ)
# @call.metadata = ev.result
# == Block == @read_metadata_tag = nil
#
# * if provided it is executed for each response
# * the call blocks until no more responses are provided
#
# @return [Enumerator] if no block was given
def each_remote_read
return enum_for(:each_remote_read) unless block_given?
loop do
resp = remote_read
break if resp.is_a? Struct::Status # is an OK status
break if resp.nil? # the last response was received
yield resp
end
end end
# each_remote_read_then_finish passes each response to the given block or @call.start_read(self)
# returns an enumerator of the responses if no block is given. ev = @cq.pluck(self, INFINITE_FUTURE)
# begin
# It is like each_remote_read, but it blocks on finishing on detecting assert_event_type(ev, READ)
# the final message. logger.debug("received req: #{ev.result.inspect}")
# unless ev.result.nil?
# == Enumerator == logger.debug("received req.to_s: #{ev.result}")
# res = @unmarshal.call(ev.result.to_s)
# * #next blocks until the remote endpoint sends a READ or FINISHED logger.debug("received_req (unmarshalled): #{res.inspect}")
# * for each read, enumerator#next yields the response return res
# * on status
# * if it's is OK, enumerator#next raises StopException
# * if is not OK, enumerator#next raises RuntimeException
#
# == Block ==
#
# * if provided it is executed for each response
# * the call blocks until no more responses are provided
#
# @return [Enumerator] if no block was given
def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) unless block_given?
loop do
resp = remote_read
break if resp.is_a? Struct::Status # is an OK status
if resp.nil? # the last response was received, but not finished yet
finished
break
end
yield resp
end end
ensure
ev.close
end end
logger.debug('found nil; the final response has been sent')
nil
end
# request_response sends a request to a GRPC server, and returns the # each_remote_read passes each response to the given block or returns an
# response. # enumerator the responses if no block is given.
# #
# == Keyword Arguments == # == Enumerator ==
# any keyword arguments are treated as metadata to be sent to the server #
# if a keyword value is a list, multiple metadata for it's key are sent # * #next blocks until the remote endpoint sends a READ or FINISHED
# # * for each read, enumerator#next yields the response
# @param req [Object] the request sent to the server # * on status
# @return [Object] the response received from the server # * if it's is OK, enumerator#next raises StopException
def request_response(req, **kw) # * if is not OK, enumerator#next raises RuntimeException
start_call(**kw) unless @started #
remote_send(req) # == Block ==
writes_done(false) #
response = remote_read # * if provided it is executed for each response
finished unless response.is_a? Struct::Status # * the call blocks until no more responses are provided
response #
# @return [Enumerator] if no block was given
def each_remote_read
return enum_for(:each_remote_read) unless block_given?
loop do
resp = remote_read
break if resp.is_a? Struct::Status # is an OK status
break if resp.nil? # the last response was received
yield resp
end end
end
# client_streamer sends a stream of requests to a GRPC server, and # each_remote_read_then_finish passes each response to the given block or
# returns a single response. # returns an enumerator of the responses if no block is given.
# #
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's # It is like each_remote_read, but it blocks on finishing on detecting
# #each enumeration protocol. In the simplest case, requests will be an # the final message.
# array of marshallable objects; in typical case it will be an Enumerable #
# that allows dynamic construction of the marshallable objects. # == Enumerator ==
# #
# == Keyword Arguments == # * #next blocks until the remote endpoint sends a READ or FINISHED
# any keyword arguments are treated as metadata to be sent to the server # * for each read, enumerator#next yields the response
# if a keyword value is a list, multiple metadata for it's key are sent # * on status
# # * if it's is OK, enumerator#next raises StopException
# @param requests [Object] an Enumerable of requests to send # * if is not OK, enumerator#next raises RuntimeException
# @return [Object] the response received from the server #
def client_streamer(requests, **kw) # == Block ==
start_call(**kw) unless @started #
requests.each { |r| remote_send(r) } # * if provided it is executed for each response
writes_done(false) # * the call blocks until no more responses are provided
response = remote_read #
finished unless response.is_a? Struct::Status # @return [Enumerator] if no block was given
response def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) unless block_given?
loop do
resp = remote_read
break if resp.is_a? Struct::Status # is an OK status
if resp.nil? # the last response was received, but not finished yet
finished
break
end
yield resp
end end
end
# server_streamer sends one request to the GRPC server, which yields a # request_response sends a request to a GRPC server, and returns the
# stream of responses. # response.
# #
# responses provides an enumerator over the streamed responses, i.e. it # == Keyword Arguments ==
# follows Ruby's #each iteration protocol. The enumerator blocks while # any keyword arguments are treated as metadata to be sent to the server
# waiting for each response, stops when the server signals that no # if a keyword value is a list, multiple metadata for it's key are sent
# further responses will be supplied. If the implicit block is provided, #
# it is executed with each response as the argument and no result is # @param req [Object] the request sent to the server
# returned. # @return [Object] the response received from the server
# def request_response(req, **kw)
# == Keyword Arguments == start_call(**kw) unless @started
# any keyword arguments are treated as metadata to be sent to the server remote_send(req)
# if a keyword value is a list, multiple metadata for it's key are sent writes_done(false)
# any keyword arguments are treated as metadata to be sent to the server. response = remote_read
# finished unless response.is_a? Struct::Status
# @param req [Object] the request sent to the server response
# @return [Enumerator|nil] a response Enumerator end
def server_streamer(req, **kw)
start_call(**kw) unless @started
remote_send(req)
writes_done(false)
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields # client_streamer sends a stream of requests to a GRPC server, and
# a stream of responses. # returns a single response.
# #
# This method takes an Enumerable of requests, and returns and enumerable # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# of responses. # #each enumeration protocol. In the simplest case, requests will be an
# # array of marshallable objects; in typical case it will be an Enumerable
# == requests == # that allows dynamic construction of the marshallable objects.
# #
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's # == Keyword Arguments ==
# #each enumeration protocol. In the simplest case, requests will be an # any keyword arguments are treated as metadata to be sent to the server
# array of marshallable objects; in typical case it will be an # if a keyword value is a list, multiple metadata for it's key are sent
# Enumerable that allows dynamic construction of the marshallable #
# objects. # @param requests [Object] an Enumerable of requests to send
# # @return [Object] the response received from the server
# == responses == def client_streamer(requests, **kw)
# start_call(**kw) unless @started
# This is an enumerator of responses. I.e, its #next method blocks requests.each { |r| remote_send(r) }
# waiting for the next response. Also, if at any point the block needs writes_done(false)
# to consume all the remaining responses, this can be done using #each or response = remote_read
# #collect. Calling #each or #collect should only be done if finished unless response.is_a? Struct::Status
# the_call#writes_done has been called, otherwise the block will loop response
# forever. end
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd.run_on_client(requests, &blk)
end
# run_server_bidi orchestrates a BiDi stream processing on a server. # server_streamer sends one request to the GRPC server, which yields a
# # stream of responses.
# N.B. gen_each_reply is a func(Enumerable<Requests>) #
# # responses provides an enumerator over the streamed responses, i.e. it
# It takes an enumerable of requests as an arg, in case there is a # follows Ruby's #each iteration protocol. The enumerator blocks while
# relationship between the stream of requests and the stream of replies. # waiting for each response, stops when the server signals that no
# # further responses will be supplied. If the implicit block is provided,
# This does not mean that must necessarily be one. E.g, the replies # it is executed with each response as the argument and no result is
# produced by gen_each_reply could ignore the received_msgs # returned.
# #
# @param gen_each_reply [Proc] generates the BiDi stream replies # == Keyword Arguments ==
def run_server_bidi(gen_each_reply) # any keyword arguments are treated as metadata to be sent to the server
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline, # if a keyword value is a list, multiple metadata for it's key are sent
@finished_tag) # any keyword arguments are treated as metadata to be sent to the server.
bd.run_on_server(gen_each_reply) #
end # @param req [Object] the request sent to the server
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, **kw)
start_call(**kw) unless @started
remote_send(req)
writes_done(false)
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
# a stream of responses.
#
# This method takes an Enumerable of requests, and returns and enumerable
# of responses.
#
# == requests ==
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an
# Enumerable that allows dynamic construction of the marshallable
# objects.
#
# == responses ==
#
# This is an enumerator of responses. I.e, its #next method blocks
# waiting for the next response. Also, if at any point the block needs
# to consume all the remaining responses, this can be done using #each or
# #collect. Calling #each or #collect should only be done if
# the_call#writes_done has been called, otherwise the block will loop
# forever.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd.run_on_client(requests, &blk)
end
private # run_server_bidi orchestrates a BiDi stream processing on a server.
#
# N.B. gen_each_reply is a func(Enumerable<Requests>)
#
# It takes an enumerable of requests as an arg, in case there is a
# relationship between the stream of requests and the stream of replies.
#
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies
def run_server_bidi(gen_each_reply)
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd.run_on_server(gen_each_reply)
end
def start_call(**kw) private
tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
@finished_tag, @read_metadata_tag = tags
@started = true
end
def self.view_class(*visible_methods) def start_call(**kw)
Class.new do tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
extend ::Forwardable @finished_tag, @read_metadata_tag = tags
def_delegators :@wrapped, *visible_methods @started = true
end
# @param wrapped [ActiveCall] the call whose methods are shielded def self.view_class(*visible_methods)
def initialize(wrapped) Class.new do
@wrapped = wrapped extend ::Forwardable
end def_delegators :@wrapped, *visible_methods
# @param wrapped [ActiveCall] the call whose methods are shielded
def initialize(wrapped)
@wrapped = wrapped
end end
end end
end
# SingleReqView limits access to an ActiveCall's methods for use in server # SingleReqView limits access to an ActiveCall's methods for use in server
# handlers that receive just one request. # handlers that receive just one request.
SingleReqView = view_class(:cancelled, :deadline) SingleReqView = view_class(:cancelled, :deadline)
# MultiReqView limits access to an ActiveCall's methods for use in # MultiReqView limits access to an ActiveCall's methods for use in
# server client_streamer handlers. # server client_streamer handlers.
MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg, MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
:each_remote_read) :each_remote_read)
# Operation limits access to an ActiveCall's methods for use as # Operation limits access to an ActiveCall's methods for use as
# a Operation on the client. # a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute, Operation = view_class(:cancel, :cancelled, :deadline, :execute,
:metadata, :status) :metadata, :status)
# confirms that no events are enqueued, and that the queue is not # confirms that no events are enqueued, and that the queue is not
# shutdown. # shutdown.
def assert_queue_is_ready def assert_queue_is_ready
ev = nil ev = nil
begin begin
ev = @cq.pluck(self, ZERO) ev = @cq.pluck(self, ZERO)
fail "unexpected event #{ev.inspect}" unless ev.nil? fail "unexpected event #{ev.inspect}" unless ev.nil?
rescue OutOfTime rescue OutOfTime
logging.debug('timed out waiting for next event') logging.debug('timed out waiting for next event')
# expected, nothing should be on the queue and the deadline was ZERO, # expected, nothing should be on the queue and the deadline was ZERO,
# except things using another tag # except things using another tag
ensure ensure
ev.close unless ev.nil? ev.close unless ev.nil?
end
end end
end end
end end

@ -36,186 +36,184 @@ def assert_event_type(ev, want)
fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want
end end
module Google # GRPC contains the General RPC module.
# Google::RPC contains the General RPC module. module GRPC
module RPC # The BiDiCall class orchestrates exection of a BiDi stream on a client or
# The BiDiCall class orchestrates exection of a BiDi stream on a client or # server.
# server. class BidiCall
class BidiCall include Core::CompletionType
include Core::CompletionType include Core::StatusCodes
include Core::StatusCodes include Core::TimeConsts
include Core::TimeConsts
# Creates a BidiCall. # Creates a BidiCall.
# #
# BidiCall should only be created after a call is accepted. That means # BidiCall should only be created after a call is accepted. That means
# different things on a client and a server. On the client, the call is # different things on a client and a server. On the client, the call is
# accepted after call.invoke. On the server, this is after call.accept. # accepted after call.invoke. On the server, this is after call.accept.
# #
# #initialize cannot determine if the call is accepted or not; so if a # #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until # call that's not accepted is used here, the error won't be visible until
# the BidiCall#run is called. # the BidiCall#run is called.
# #
# deadline is the absolute deadline for the call. # deadline is the absolute deadline for the call.
# #
# @param call [Call] the call used by the ActiveCall # @param call [Call] the call used by the ActiveCall
# @param q [CompletionQueue] the completion queue used to accept # @param q [CompletionQueue] the completion queue used to accept
# the call # the call
# @param marshal [Function] f(obj)->string that marshal requests # @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete # @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag, # @param finished_tag [Object] the object used as the call's finish tag,
def initialize(call, q, marshal, unmarshal, deadline, finished_tag) def initialize(call, q, marshal, unmarshal, deadline, finished_tag)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue') fail(ArgumentError, 'not a CompletionQueue')
end
@call = call
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@marshal = marshal
@readq = Queue.new
@unmarshal = unmarshal
end end
@call = call
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@marshal = marshal
@readq = Queue.new
@unmarshal = unmarshal
end
# Begins orchestration of the Bidi stream for a client sending requests. # Begins orchestration of the Bidi stream for a client sending requests.
# #
# The method either returns an Enumerator of the responses, or accepts a # The method either returns an Enumerator of the responses, or accepts a
# block that can be invoked with each response. # block that can be invoked with each response.
# #
# @param requests the Enumerable of requests to send # @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield # @return an Enumerator of requests to yield
def run_on_client(requests, &blk) def run_on_client(requests, &blk)
enq_th = start_write_loop(requests) enq_th = start_write_loop(requests)
loop_th = start_read_loop loop_th = start_read_loop
replies = each_queued_msg replies = each_queued_msg
return replies if blk.nil? return replies if blk.nil?
replies.each { |r| blk.call(r) } replies.each { |r| blk.call(r) }
enq_th.join enq_th.join
loop_th.join loop_th.join
end end
# Begins orchestration of the Bidi stream for a server generating replies. # Begins orchestration of the Bidi stream for a server generating replies.
# #
# N.B. gen_each_reply is a func(Enumerable<Requests>) # N.B. gen_each_reply is a func(Enumerable<Requests>)
# #
# It takes an enumerable of requests as an arg, in case there is a # It takes an enumerable of requests as an arg, in case there is a
# relationship between the stream of requests and the stream of replies. # relationship between the stream of requests and the stream of replies.
# #
# This does not mean that must necessarily be one. E.g, the replies # This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs # produced by gen_each_reply could ignore the received_msgs
# #
# @param gen_each_reply [Proc] generates the BiDi stream replies. # @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply) def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg) replys = gen_each_reply.call(each_queued_msg)
enq_th = start_write_loop(replys, is_client: false) enq_th = start_write_loop(replys, is_client: false)
loop_th = start_read_loop loop_th = start_read_loop
loop_th.join loop_th.join
enq_th.join enq_th.join
end end
private private
END_OF_READS = :end_of_reads END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes END_OF_WRITES = :end_of_writes
# each_queued_msg yields each message on this instances readq # each_queued_msg yields each message on this instances readq
# #
# - messages are added to the readq by #read_loop # - messages are added to the readq by #read_loop
# - iteration ends when the instance itself is added # - iteration ends when the instance itself is added
def each_queued_msg def each_queued_msg
return enum_for(:each_queued_msg) unless block_given? return enum_for(:each_queued_msg) unless block_given?
count = 0 count = 0
loop do loop do
logger.debug("each_queued_msg: msg##{count}") logger.debug("each_queued_msg: msg##{count}")
count += 1 count += 1
req = @readq.pop req = @readq.pop
throw req if req.is_a? StandardError throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS) break if req.equal?(END_OF_READS)
yield req yield req
end
end end
end
# during bidi-streaming, read the requests to send from a separate thread # during bidi-streaming, read the requests to send from a separate thread
# read so that read_loop does not block waiting for requests to read. # read so that read_loop does not block waiting for requests to read.
def start_write_loop(requests, is_client: true) def start_write_loop(requests, is_client: true)
Thread.new do # TODO: run on a thread pool Thread.new do # TODO: run on a thread pool
write_tag = Object.new write_tag = Object.new
begin begin
count = 0 count = 0
requests.each do |req| requests.each do |req|
count += 1 count += 1
payload = @marshal.call(req) payload = @marshal.call(req)
@call.start_write(Core::ByteBuffer.new(payload), write_tag) @call.start_write(Core::ByteBuffer.new(payload), write_tag)
ev = @cq.pluck(write_tag, INFINITE_FUTURE) ev = @cq.pluck(write_tag, INFINITE_FUTURE)
begin begin
assert_event_type(ev, WRITE_ACCEPTED) assert_event_type(ev, WRITE_ACCEPTED)
ensure ensure
ev.close ev.close
end
end end
if is_client end
@call.writes_done(write_tag) if is_client
ev = @cq.pluck(write_tag, INFINITE_FUTURE) @call.writes_done(write_tag)
begin ev = @cq.pluck(write_tag, INFINITE_FUTURE)
assert_event_type(ev, FINISH_ACCEPTED) begin
ensure assert_event_type(ev, FINISH_ACCEPTED)
ev.close ensure
end ev.close
logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISHED)
ensure
ev.close
end
logger.debug('bidi-client: finished received')
end end
rescue StandardError => e logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
logger.warn('bidi: write_loop failed') ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
logger.warn(e) begin
assert_event_type(ev, FINISHED)
ensure
ev.close
end
logger.debug('bidi-client: finished received')
end end
rescue StandardError => e
logger.warn('bidi: write_loop failed')
logger.warn(e)
end end
end end
end
# starts the read loop # starts the read loop
def start_read_loop def start_read_loop
Thread.new do Thread.new do
begin begin
read_tag = Object.new read_tag = Object.new
count = 0 count = 0
# queue the initial read before beginning the loop
loop do
logger.debug("waiting for read #{count}")
count += 1
@call.start_read(read_tag)
ev = @cq.pluck(read_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, READ)
# handle the next event. # queue the initial read before beginning the loop
if ev.result.nil? loop do
@readq.push(END_OF_READS) logger.debug("waiting for read #{count}")
logger.debug('done reading!') count += 1
break @call.start_read(read_tag)
end ev = @cq.pluck(read_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, READ)
# push the latest read onto the queue and continue reading # handle the next event.
logger.debug("received req: #{ev.result}") if ev.result.nil?
res = @unmarshal.call(ev.result.to_s) @readq.push(END_OF_READS)
@readq.push(res) logger.debug('done reading!')
ensure break
ev.close
end end
end
rescue StandardError => e # push the latest read onto the queue and continue reading
logger.warn('bidi: read_loop failed') logger.debug("received req: #{ev.result}")
logger.warn(e) res = @unmarshal.call(ev.result.to_s)
@readq.push(e) # let each_queued_msg terminate with this error @readq.push(res)
ensure
ev.close
end
end end
rescue StandardError => e
logger.warn('bidi: read_loop failed')
logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error
end end
end end
end end

@ -30,381 +30,379 @@
require 'grpc/generic/active_call' require 'grpc/generic/active_call'
require 'xray/thread_dump_signal_handler' require 'xray/thread_dump_signal_handler'
module Google # GRPC contains the General RPC module.
# Google::RPC contains the General RPC module. module GRPC
module RPC # ClientStub represents an endpoint used to send requests to GRPC servers.
# ClientStub represents an endpoint used to send requests to GRPC servers. class ClientStub
class ClientStub include Core::StatusCodes
include Core::StatusCodes
# Default deadline is 5 seconds. # Default deadline is 5 seconds.
DEFAULT_DEADLINE = 5 DEFAULT_DEADLINE = 5
# Creates a new ClientStub. # Creates a new ClientStub.
# #
# Minimally, a stub is created with the just the host of the gRPC service # Minimally, a stub is created with the just the host of the gRPC service
# it wishes to access, e.g., # it wishes to access, e.g.,
# #
# my_stub = ClientStub.new(example.host.com:50505) # my_stub = ClientStub.new(example.host.com:50505)
# #
# Any arbitrary keyword arguments are treated as channel arguments used to # Any arbitrary keyword arguments are treated as channel arguments used to
# configure the RPC connection to the host. # configure the RPC connection to the host.
# #
# There are some specific keyword args that are not used to configure the # There are some specific keyword args that are not used to configure the
# channel: # channel:
# #
# - :channel_override # - :channel_override
# when present, this must be a pre-created GRPC::Channel. If it's # when present, this must be a pre-created GRPC::Channel. If it's
# present the host and arbitrary keyword arg areignored, and the RPC # present the host and arbitrary keyword arg areignored, and the RPC
# connection uses this channel. # connection uses this channel.
# #
# - :deadline # - :deadline
# when present, this is the default deadline used for calls # when present, this is the default deadline used for calls
# #
# - :update_metadata # - :update_metadata
# when present, this a func that takes a hash and returns a hash # when present, this a func that takes a hash and returns a hash
# it can be used to update metadata, i.e, remove, change or update # it can be used to update metadata, i.e, remove, change or update
# amend metadata values. # amend metadata values.
# #
# @param host [String] the host the stub connects to # @param host [String] the host the stub connects to
# @param q [Core::CompletionQueue] used to wait for events # @param q [Core::CompletionQueue] used to wait for events
# @param channel_override [Core::Channel] a pre-created channel # @param channel_override [Core::Channel] a pre-created channel
# @param deadline [Number] the default deadline to use in requests # @param deadline [Number] the default deadline to use in requests
# @param creds [Core::Credentials] the channel # @param creds [Core::Credentials] the channel
# @param update_metadata a func that updates metadata as described above # @param update_metadata a func that updates metadata as described above
# @param kw [KeywordArgs]the channel arguments # @param kw [KeywordArgs]the channel arguments
def initialize(host, q, def initialize(host, q,
channel_override:nil, channel_override:nil,
deadline: DEFAULT_DEADLINE, deadline: DEFAULT_DEADLINE,
creds: nil, creds: nil,
update_metadata: nil, update_metadata: nil,
**kw) **kw)
unless q.is_a? Core::CompletionQueue unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue') fail(ArgumentError, 'not a CompletionQueue')
end end
@queue = q @queue = q
# set the channel instance # set the channel instance
if !channel_override.nil? if !channel_override.nil?
ch = channel_override ch = channel_override
fail(ArgumentError, 'not a Channel') unless ch.is_a? Core::Channel fail(ArgumentError, 'not a Channel') unless ch.is_a? Core::Channel
else
if creds.nil?
ch = Core::Channel.new(host, kw)
elsif !creds.is_a?(Core::Credentials)
fail(ArgumentError, 'not a Credentials')
else else
if creds.nil? ch = Core::Channel.new(host, kw, creds)
ch = Core::Channel.new(host, kw)
elsif !creds.is_a?(Core::Credentials)
fail(ArgumentError, 'not a Credentials')
else
ch = Core::Channel.new(host, kw, creds)
end
end end
@ch = ch
@update_metadata = nil
unless update_metadata.nil?
unless update_metadata.is_a? Proc
fail(ArgumentError, 'update_metadata is not a Proc')
end
@update_metadata = update_metadata
end
@host = host
@deadline = deadline
end end
@ch = ch
# request_response sends a request to a GRPC server, and returns the @update_metadata = nil
# response. unless update_metadata.nil?
# unless update_metadata.is_a? Proc
# == Flow Control == fail(ArgumentError, 'update_metadata is not a Proc')
# This is a blocking call.
#
# * it does not return until a response is received.
#
# * the requests is sent only when GRPC core's flow control allows it to
# be sent.
#
# == Errors ==
# An RuntimeError is raised if
#
# * the server responds with a non-OK status
#
# * the deadline is exceeded
#
# == Return Value ==
#
# If return_op is false, the call returns the response
#
# If return_op is true, the call returns an Operation, calling execute
# on the Operation returns the response.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] (optional) the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @return [Object] the response received from the server
def request_response(method, req, marshal, unmarshal, deadline = nil,
return_op: false, **kw)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.request_response(req, **md) unless return_op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #request_response.
op = c.operation
op.define_singleton_method(:execute) do
c.request_response(req, **md)
end end
op @update_metadata = update_metadata
end end
# client_streamer sends a stream of requests to a GRPC server, and @host = host
# returns a single response. @deadline = deadline
# end
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an # request_response sends a request to a GRPC server, and returns the
# array of marshallable objects; in typical case it will be an Enumerable # response.
# that allows dynamic construction of the marshallable objects. #
# # == Flow Control ==
# == Flow Control == # This is a blocking call.
# This is a blocking call. #
# # * it does not return until a response is received.
# * it does not return until a response is received. #
# # * the requests is sent only when GRPC core's flow control allows it to
# * each requests is sent only when GRPC core's flow control allows it to # be sent.
# be sent. #
# # == Errors ==
# == Errors == # An RuntimeError is raised if
# An RuntimeError is raised if #
# # * the server responds with a non-OK status
# * the server responds with a non-OK status #
# # * the deadline is exceeded
# * the deadline is exceeded #
# # == Return Value ==
# == Return Value == #
# # If return_op is false, the call returns the response
# If return_op is false, the call consumes the requests and returns #
# the response. # If return_op is true, the call returns an Operation, calling execute
# # on the Operation returns the response.
# If return_op is true, the call returns the response. #
# # == Keyword Args ==
# == Keyword Args == #
# # Unspecified keyword arguments are treated as metadata to be sent to the
# Unspecified keyword arguments are treated as metadata to be sent to the # server.
# server. #
# # @param method [String] the RPC method to call on the GRPC server
# @param method [String] the RPC method to call on the GRPC server # @param req [Object] the request sent to the server
# @param requests [Object] an Enumerable of requests to send # @param marshal [Function] f(obj)->string that marshals requests
# @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param deadline [Numeric] (optional) the max completion time in seconds
# @param deadline [Numeric] the max completion time in seconds # @param return_op [true|false] return an Operation if true
# @param return_op [true|false] return an Operation if true # @return [Object] the response received from the server
# @return [Object|Operation] the response received from the server def request_response(method, req, marshal, unmarshal, deadline = nil,
def client_streamer(method, requests, marshal, unmarshal, deadline = nil, return_op: false, **kw)
return_op: false, **kw) c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline) md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) return c.request_response(req, **md) unless return_op
return c.client_streamer(requests, **md) unless return_op
# return the operation view of the active_call; define #execute as a # return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #client_streamer. # new method for this instance that invokes #request_response.
op = c.operation op = c.operation
op.define_singleton_method(:execute) do op.define_singleton_method(:execute) do
c.client_streamer(requests, **md) c.request_response(req, **md)
end
op
end end
op
end
# server_streamer sends one request to the GRPC server, which yields a # client_streamer sends a stream of requests to a GRPC server, and
# stream of responses. # returns a single response.
# #
# responses provides an enumerator over the streamed responses, i.e. it # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# follows Ruby's #each iteration protocol. The enumerator blocks while # #each enumeration protocol. In the simplest case, requests will be an
# waiting for each response, stops when the server signals that no # array of marshallable objects; in typical case it will be an Enumerable
# further responses will be supplied. If the implicit block is provided, # that allows dynamic construction of the marshallable objects.
# it is executed with each response as the argument and no result is #
# returned. # == Flow Control ==
# # This is a blocking call.
# == Flow Control == #
# This is a blocking call. # * it does not return until a response is received.
# #
# * the request is sent only when GRPC core's flow control allows it to # * each requests is sent only when GRPC core's flow control allows it to
# be sent. # be sent.
# #
# * the request will not complete until the server sends the final # == Errors ==
# response followed by a status message. # An RuntimeError is raised if
# #
# == Errors == # * the server responds with a non-OK status
# An RuntimeError is raised if #
# # * the deadline is exceeded
# * the server responds with a non-OK status when any response is #
# * retrieved # == Return Value ==
# #
# * the deadline is exceeded # If return_op is false, the call consumes the requests and returns
# # the response.
# == Return Value == #
# # If return_op is true, the call returns the response.
# if the return_op is false, the return value is an Enumerator of the #
# results, unless a block is provided, in which case the block is # == Keyword Args ==
# executed with each response. #
# # Unspecified keyword arguments are treated as metadata to be sent to the
# if return_op is true, the function returns an Operation whose #execute # server.
# method runs server streamer call. Again, Operation#execute either #
# calls the given block with each response or returns an Enumerator of the # @param method [String] the RPC method to call on the GRPC server
# responses. # @param requests [Object] an Enumerable of requests to send
# # @param marshal [Function] f(obj)->string that marshals requests
# == Keyword Args == # @param unmarshal [Function] f(string)->obj that unmarshals responses
# # @param deadline [Numeric] the max completion time in seconds
# Unspecified keyword arguments are treated as metadata to be sent to the # @param return_op [true|false] return an Operation if true
# server. # @return [Object|Operation] the response received from the server
# def client_streamer(method, requests, marshal, unmarshal, deadline = nil,
# @param method [String] the RPC method to call on the GRPC server return_op: false, **kw)
# @param req [Object] the request sent to the server c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
# @param marshal [Function] f(obj)->string that marshals requests md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
# @param unmarshal [Function] f(string)->obj that unmarshals responses return c.client_streamer(requests, **md) unless return_op
# @param deadline [Numeric] the max completion time in seconds
# @param return_op [true|false]return an Operation if true
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above
def server_streamer(method, req, marshal, unmarshal, deadline = nil,
return_op: false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.server_streamer(req, **md, &blk) unless return_op
# return the operation view of the active_call; define #execute # return the operation view of the active_call; define #execute as a
# as a new method for this instance that invokes #server_streamer # new method for this instance that invokes #client_streamer.
op = c.operation op = c.operation
op.define_singleton_method(:execute) do op.define_singleton_method(:execute) do
c.server_streamer(req, **md, &blk) c.client_streamer(requests, **md)
end
op
end end
op
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields # server_streamer sends one request to the GRPC server, which yields a
# a stream of responses. # stream of responses.
# #
# This method takes an Enumerable of requests, and returns and enumerable # responses provides an enumerator over the streamed responses, i.e. it
# of responses. # follows Ruby's #each iteration protocol. The enumerator blocks while
# # waiting for each response, stops when the server signals that no
# == requests == # further responses will be supplied. If the implicit block is provided,
# # it is executed with each response as the argument and no result is
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's # returned.
# #each enumeration protocol. In the simplest case, requests will be an #
# array of marshallable objects; in typical case it will be an # == Flow Control ==
# Enumerable that allows dynamic construction of the marshallable # This is a blocking call.
# objects. #
# # * the request is sent only when GRPC core's flow control allows it to
# == responses == # be sent.
# #
# This is an enumerator of responses. I.e, its #next method blocks # * the request will not complete until the server sends the final
# waiting for the next response. Also, if at any point the block needs # response followed by a status message.
# to consume all the remaining responses, this can be done using #each or #
# #collect. Calling #each or #collect should only be done if # == Errors ==
# the_call#writes_done has been called, otherwise the block will loop # An RuntimeError is raised if
# forever. #
# # * the server responds with a non-OK status when any response is
# == Flow Control == # * retrieved
# This is a blocking call. #
# # * the deadline is exceeded
# * the call completes when the next call to provided block returns #
# * [False] # == Return Value ==
# #
# * the execution block parameters are two objects for sending and # if the return_op is false, the return value is an Enumerator of the
# receiving responses, each of which blocks waiting for flow control. # results, unless a block is provided, in which case the block is
# E.g, calles to bidi_call#remote_send will wait until flow control # executed with each response.
# allows another write before returning; and obviously calls to #
# responses#next block until the next response is available. # if return_op is true, the function returns an Operation whose #execute
# # method runs server streamer call. Again, Operation#execute either
# == Termination == # calls the given block with each response or returns an Enumerator of the
# # responses.
# As well as sending and receiving messages, the block passed to the #
# function is also responsible for: # == Keyword Args ==
# #
# * calling bidi_call#writes_done to indicate no further reqs will be # Unspecified keyword arguments are treated as metadata to be sent to the
# sent. # server.
# #
# * returning false if once the bidi stream is functionally completed. # @param method [String] the RPC method to call on the GRPC server
# # @param req [Object] the request sent to the server
# Note that response#next will indicate that there are no further # @param marshal [Function] f(obj)->string that marshals requests
# responses by throwing StopIteration, but can only happen either # @param unmarshal [Function] f(string)->obj that unmarshals responses
# if bidi_call#writes_done is called. # @param deadline [Numeric] the max completion time in seconds
# # @param return_op [true|false]return an Operation if true
# To terminate the RPC correctly the block: # @param blk [Block] when provided, is executed for each response
# # @return [Enumerator|Operation|nil] as discussed above
# * must call bidi#writes_done and then def server_streamer(method, req, marshal, unmarshal, deadline = nil,
#
# * either return false as soon as there is no need for other responses
#
# * loop on responses#next until no further responses are available
#
# == Errors ==
# An RuntimeError is raised if
#
# * the server responds with a non-OK status when any response is
# * retrieved
#
# * the deadline is exceeded
#
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# == Return Value ==
#
# if the return_op is false, the return value is an Enumerator of the
# results, unless a block is provided, in which case the block is
# executed with each response.
#
# if return_op is true, the function returns an Operation whose #execute
# method runs the Bidi call. Again, Operation#execute either calls a
# given block with each response or returns an Enumerator of the
# responses.
#
# @param method [String] the RPC method to call on the GRPC server
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] (optional) the max completion time in seconds
# @param blk [Block] when provided, is executed for each response
# @param return_op [true|false] return an Operation if true
# @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil,
return_op: false, **kw, &blk) return_op: false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline) c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.bidi_streamer(requests, **md, &blk) unless return_op return c.server_streamer(req, **md, &blk) unless return_op
# return the operation view of the active_call; define #execute # return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #bidi_streamer # as a new method for this instance that invokes #server_streamer
op = c.operation op = c.operation
op.define_singleton_method(:execute) do op.define_singleton_method(:execute) do
c.bidi_streamer(requests, **md, &blk) c.server_streamer(req, **md, &blk)
end
op
end end
op
end
private # bidi_streamer sends a stream of requests to the GRPC server, and yields
# a stream of responses.
#
# This method takes an Enumerable of requests, and returns and enumerable
# of responses.
#
# == requests ==
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an
# Enumerable that allows dynamic construction of the marshallable
# objects.
#
# == responses ==
#
# This is an enumerator of responses. I.e, its #next method blocks
# waiting for the next response. Also, if at any point the block needs
# to consume all the remaining responses, this can be done using #each or
# #collect. Calling #each or #collect should only be done if
# the_call#writes_done has been called, otherwise the block will loop
# forever.
#
# == Flow Control ==
# This is a blocking call.
#
# * the call completes when the next call to provided block returns
# * [False]
#
# * the execution block parameters are two objects for sending and
# receiving responses, each of which blocks waiting for flow control.
# E.g, calles to bidi_call#remote_send will wait until flow control
# allows another write before returning; and obviously calls to
# responses#next block until the next response is available.
#
# == Termination ==
#
# As well as sending and receiving messages, the block passed to the
# function is also responsible for:
#
# * calling bidi_call#writes_done to indicate no further reqs will be
# sent.
#
# * returning false if once the bidi stream is functionally completed.
#
# Note that response#next will indicate that there are no further
# responses by throwing StopIteration, but can only happen either
# if bidi_call#writes_done is called.
#
# To terminate the RPC correctly the block:
#
# * must call bidi#writes_done and then
#
# * either return false as soon as there is no need for other responses
#
# * loop on responses#next until no further responses are available
#
# == Errors ==
# An RuntimeError is raised if
#
# * the server responds with a non-OK status when any response is
# * retrieved
#
# * the deadline is exceeded
#
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# == Return Value ==
#
# if the return_op is false, the return value is an Enumerator of the
# results, unless a block is provided, in which case the block is
# executed with each response.
#
# if return_op is true, the function returns an Operation whose #execute
# method runs the Bidi call. Again, Operation#execute either calls a
# given block with each response or returns an Enumerator of the
# responses.
#
# @param method [String] the RPC method to call on the GRPC server
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] (optional) the max completion time in seconds
# @param blk [Block] when provided, is executed for each response
# @param return_op [true|false] return an Operation if true
# @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil,
return_op: false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.bidi_streamer(requests, **md, &blk) unless return_op
# Creates a new active stub # return the operation view of the active_call; define #execute
# # as a new method for this instance that invokes #bidi_streamer
# @param ch [GRPC::Channel] the channel used to create the stub. op = c.operation
# @param marshal [Function] f(obj)->string that marshals requests op.define_singleton_method(:execute) do
# @param unmarshal [Function] f(string)->obj that unmarshals responses c.bidi_streamer(requests, **md, &blk)
# @param deadline [TimeConst]
def new_active_call(ch, marshal, unmarshal, deadline = nil)
absolute_deadline = Core::TimeConsts.from_relative_time(deadline)
call = @ch.create_call(ch, @host, absolute_deadline)
ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
started: false)
end end
op
end
private
# Creates a new active stub
#
# @param ch [GRPC::Channel] the channel used to create the stub.
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [TimeConst]
def new_active_call(ch, marshal, unmarshal, deadline = nil)
absolute_deadline = Core::TimeConsts.from_relative_time(deadline)
call = @ch.create_call(ch, @host, absolute_deadline)
ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
started: false)
end end
end end
end end

@ -29,123 +29,122 @@
require 'grpc/grpc' require 'grpc/grpc'
module Google # GRPC contains the General RPC module.
module RPC module GRPC
# RpcDesc is a Descriptor of an RPC method. # RpcDesc is a Descriptor of an RPC method.
class RpcDesc < Struct.new(:name, :input, :output, :marshal_method, class RpcDesc < Struct.new(:name, :input, :output, :marshal_method,
:unmarshal_method) :unmarshal_method)
include Core::StatusCodes include Core::StatusCodes
# Used to wrap a message class to indicate that it needs to be streamed. # Used to wrap a message class to indicate that it needs to be streamed.
class Stream class Stream
attr_accessor :type attr_accessor :type
def initialize(type) def initialize(type)
@type = type @type = type
end
end end
end
# @return [Proc] { |instance| marshalled(instance) } # @return [Proc] { |instance| marshalled(instance) }
def marshal_proc def marshal_proc
proc { |o| o.class.method(marshal_method).call(o).to_s } proc { |o| o.class.method(marshal_method).call(o).to_s }
end end
# @param [:input, :output] target determines whether to produce the an # @param [:input, :output] target determines whether to produce the an
# unmarshal Proc for the rpc input parameter or # unmarshal Proc for the rpc input parameter or
# its output parameter # its output parameter
# #
# @return [Proc] An unmarshal proc { |marshalled(instance)| instance } # @return [Proc] An unmarshal proc { |marshalled(instance)| instance }
def unmarshal_proc(target) def unmarshal_proc(target)
fail ArgumentError unless [:input, :output].include?(target) fail ArgumentError unless [:input, :output].include?(target)
unmarshal_class = method(target).call unmarshal_class = method(target).call
unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream
proc { |o| unmarshal_class.method(unmarshal_method).call(o) } proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
end end
def run_server_method(active_call, mth) def run_server_method(active_call, mth)
# While a server method is running, it might be cancelled, its deadline # While a server method is running, it might be cancelled, its deadline
# might be reached, the handler could throw an unknown error, or a # might be reached, the handler could throw an unknown error, or a
# well-behaved handler could throw a StatusError. # well-behaved handler could throw a StatusError.
if request_response? if request_response?
req = active_call.remote_read req = active_call.remote_read
resp = mth.call(req, active_call.single_req_view) resp = mth.call(req, active_call.single_req_view)
active_call.remote_send(resp) active_call.remote_send(resp)
elsif client_streamer? elsif client_streamer?
resp = mth.call(active_call.multi_req_view) resp = mth.call(active_call.multi_req_view)
active_call.remote_send(resp) active_call.remote_send(resp)
elsif server_streamer? elsif server_streamer?
req = active_call.remote_read req = active_call.remote_read
replys = mth.call(req, active_call.single_req_view) replys = mth.call(req, active_call.single_req_view)
replys.each { |r| active_call.remote_send(r) } replys.each { |r| active_call.remote_send(r) }
else # is a bidi_stream else # is a bidi_stream
active_call.run_server_bidi(mth) active_call.run_server_bidi(mth)
end
send_status(active_call, OK, 'OK')
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application
# error code and detail message.
logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
send_status(active_call, e.code, e.details)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
rescue OutOfTime
# This is raised when active_call#method.call exceeeds the deadline
# event. Send a status of deadline exceeded
logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
rescue Core::EventError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
rescue StandardError => e
# This will usuaally be an unhandled error in the handling code.
# Send back a UNKNOWN status to the client
logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
logger.warn(e)
send_status(active_call, UNKNOWN, 'no reason given')
end end
send_status(active_call, OK, 'OK')
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application
# error code and detail message.
logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
send_status(active_call, e.code, e.details)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
rescue OutOfTime
# This is raised when active_call#method.call exceeeds the deadline
# event. Send a status of deadline exceeded
logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
rescue Core::EventError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
rescue StandardError => e
# This will usuaally be an unhandled error in the handling code.
# Send back a UNKNOWN status to the client
logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
logger.warn(e)
send_status(active_call, UNKNOWN, 'no reason given')
end
def assert_arity_matches(mth) def assert_arity_matches(mth)
if request_response? || server_streamer? if request_response? || server_streamer?
if mth.arity != 2 if mth.arity != 2
fail arity_error(mth, 2, "should be #{mth.name}(req, call)") fail arity_error(mth, 2, "should be #{mth.name}(req, call)")
end end
else else
if mth.arity != 1 if mth.arity != 1
fail arity_error(mth, 1, "should be #{mth.name}(call)") fail arity_error(mth, 1, "should be #{mth.name}(call)")
end
end end
end end
end
def request_response? def request_response?
!input.is_a?(Stream) && !output.is_a?(Stream) !input.is_a?(Stream) && !output.is_a?(Stream)
end end
def client_streamer? def client_streamer?
input.is_a?(Stream) && !output.is_a?(Stream) input.is_a?(Stream) && !output.is_a?(Stream)
end end
def server_streamer? def server_streamer?
!input.is_a?(Stream) && output.is_a?(Stream) !input.is_a?(Stream) && output.is_a?(Stream)
end end
def bidi_streamer? def bidi_streamer?
input.is_a?(Stream) && output.is_a?(Stream) input.is_a?(Stream) && output.is_a?(Stream)
end end
def arity_error(mth, want, msg) def arity_error(mth, want, msg)
"##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}"
end end
def send_status(active_client, code, details) def send_status(active_client, code, details)
details = 'Not sure why' if details.nil? details = 'Not sure why' if details.nil?
active_client.send_status(code, details) active_client.send_status(code, details)
rescue StandardError => e rescue StandardError => e
logger.warn("Could not send status #{code}:#{details}") logger.warn("Could not send status #{code}:#{details}")
logger.warn(e) logger.warn(e)
end
end end
end end
end end

@ -33,372 +33,370 @@ require 'grpc/generic/service'
require 'thread' require 'thread'
require 'xray/thread_dump_signal_handler' require 'xray/thread_dump_signal_handler'
module Google # GRPC contains the General RPC module.
# Google::RPC contains the General RPC module. module GRPC
module RPC # RpcServer hosts a number of services and makes them available on the
# RpcServer hosts a number of services and makes them available on the # network.
# network. class RpcServer
class RpcServer include Core::CompletionType
include Core::CompletionType include Core::TimeConsts
include Core::TimeConsts extend ::Forwardable
extend ::Forwardable
def_delegators :@server, :add_http2_port
def_delegators :@server, :add_http2_port
# Default thread pool size is 3
# Default thread pool size is 3 DEFAULT_POOL_SIZE = 3
DEFAULT_POOL_SIZE = 3
# Default max_waiting_requests size is 20
# Default max_waiting_requests size is 20 DEFAULT_MAX_WAITING_REQUESTS = 20
DEFAULT_MAX_WAITING_REQUESTS = 20
# Creates a new RpcServer.
# Creates a new RpcServer. #
# # The RPC server is configured using keyword arguments.
# The RPC server is configured using keyword arguments. #
# # There are some specific keyword args used to configure the RpcServer
# There are some specific keyword args used to configure the RpcServer # instance, however other arbitrary are allowed and when present are used
# instance, however other arbitrary are allowed and when present are used # to configure the listeninng connection set up by the RpcServer.
# to configure the listeninng connection set up by the RpcServer. #
# # * server_override: which if passed must be a [GRPC::Core::Server]. When
# * server_override: which if passed must be a [GRPC::Core::Server]. When # present.
# present. #
# # * poll_period: when present, the server polls for new events with this
# * poll_period: when present, the server polls for new events with this # period
# period #
# # * pool_size: the size of the thread pool the server uses to run its
# * pool_size: the size of the thread pool the server uses to run its # threads
# threads #
# # * completion_queue_override: when supplied, this will be used as the
# * completion_queue_override: when supplied, this will be used as the # completion_queue that the server uses to receive network events,
# completion_queue that the server uses to receive network events, # otherwise its creates a new instance itself
# otherwise its creates a new instance itself #
# # * creds: [GRPC::Core::ServerCredentials]
# * creds: [GRPC::Core::ServerCredentials] # the credentials used to secure the server
# the credentials used to secure the server #
# # * max_waiting_requests: the maximum number of requests that are not
# * max_waiting_requests: the maximum number of requests that are not # being handled to allow. When this limit is exceeded, the server responds
# being handled to allow. When this limit is exceeded, the server responds # with not available to new requests
# with not available to new requests def initialize(pool_size:DEFAULT_POOL_SIZE,
def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:INFINITE_FUTURE,
poll_period:INFINITE_FUTURE, completion_queue_override:nil,
completion_queue_override:nil, creds:nil,
creds:nil, server_override:nil,
server_override:nil, **kw)
**kw) if completion_queue_override.nil?
if completion_queue_override.nil? cq = Core::CompletionQueue.new
cq = Core::CompletionQueue.new else
else cq = completion_queue_override
cq = completion_queue_override unless cq.is_a? Core::CompletionQueue
unless cq.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue')
fail(ArgumentError, 'not a CompletionQueue')
end
end end
@cq = cq end
@cq = cq
if server_override.nil? if server_override.nil?
if creds.nil? if creds.nil?
srv = Core::Server.new(@cq, kw) srv = Core::Server.new(@cq, kw)
elsif !creds.is_a? Core::ServerCredentials elsif !creds.is_a? Core::ServerCredentials
fail(ArgumentError, 'not a ServerCredentials') fail(ArgumentError, 'not a ServerCredentials')
else
srv = Core::Server.new(@cq, kw, creds)
end
else else
srv = server_override srv = Core::Server.new(@cq, kw, creds)
fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server
end end
@server = srv else
srv = server_override
@pool_size = pool_size fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@run_mutex = Mutex.new
@run_cond = ConditionVariable.new
@pool = Pool.new(@pool_size)
end end
@server = srv
@pool_size = pool_size
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@run_mutex = Mutex.new
@run_cond = ConditionVariable.new
@pool = Pool.new(@pool_size)
end
# stops a running server # stops a running server
# #
# the call has no impact if the server is already stopped, otherwise # the call has no impact if the server is already stopped, otherwise
# server's current call loop is it's last. # server's current call loop is it's last.
def stop def stop
return unless @running return unless @running
@stopped = true @stopped = true
@pool.stop @pool.stop
end end
# determines if the server is currently running # determines if the server is currently running
def running? def running?
@running ||= false @running ||= false
end end
# Is called from other threads to wait for #run to start up the server. # Is called from other threads to wait for #run to start up the server.
# #
# If run has not been called, this returns immediately. # If run has not been called, this returns immediately.
# #
# @param timeout [Numeric] number of seconds to wait # @param timeout [Numeric] number of seconds to wait
# @result [true, false] true if the server is running, false otherwise # @result [true, false] true if the server is running, false otherwise
def wait_till_running(timeout = 0.1) def wait_till_running(timeout = 0.1)
end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100 end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100
while Time.now < end_time while Time.now < end_time
@run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running? @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running?
sleep(sleep_period) sleep(sleep_period)
end
running?
end end
running?
end
# determines if the server is currently stopped # determines if the server is currently stopped
def stopped? def stopped?
@stopped ||= false @stopped ||= false
end end
# handle registration of classes # handle registration of classes
# #
# service is either a class that includes GRPC::GenericService and whose # service is either a class that includes GRPC::GenericService and whose
# #new function can be called without argument or any instance of such a # #new function can be called without argument or any instance of such a
# class. # class.
# #
# E.g, after # E.g, after
# #
# class Divider # class Divider
# include GRPC::GenericService # include GRPC::GenericService
# rpc :div DivArgs, DivReply # single request, single response # rpc :div DivArgs, DivReply # single request, single response
# def initialize(optional_arg='default option') # no args # def initialize(optional_arg='default option') # no args
# ... # ...
# end # end
# #
# srv = GRPC::RpcServer.new(...) # srv = GRPC::RpcServer.new(...)
# #
# # Either of these works # # Either of these works
# #
# srv.handle(Divider) # srv.handle(Divider)
# #
# # or # # or
# #
# srv.handle(Divider.new('replace optional arg')) # srv.handle(Divider.new('replace optional arg'))
# #
# It raises RuntimeError: # It raises RuntimeError:
# - if service is not valid service class or object # - if service is not valid service class or object
# - its handler methods are already registered # - its handler methods are already registered
# - if the server is already running # - if the server is already running
# #
# @param service [Object|Class] a service class or object as described # @param service [Object|Class] a service class or object as described
# above # above
def handle(service) def handle(service)
fail 'cannot add services if the server is running' if running? fail 'cannot add services if the server is running' if running?
fail 'cannot add services if the server is stopped' if stopped? fail 'cannot add services if the server is stopped' if stopped?
cls = service.is_a?(Class) ? service : service.class cls = service.is_a?(Class) ? service : service.class
assert_valid_service_class(cls) assert_valid_service_class(cls)
add_rpc_descs_for(service) add_rpc_descs_for(service)
end end
# runs the server # runs the server
# #
# - if no rpc_descs are registered, this exits immediately, otherwise it # - if no rpc_descs are registered, this exits immediately, otherwise it
# continues running permanently and does not return until program exit. # continues running permanently and does not return until program exit.
# #
# - #running? returns true after this is called, until #stop cause the # - #running? returns true after this is called, until #stop cause the
# the server to stop. # the server to stop.
def run def run
if rpc_descs.size == 0 if rpc_descs.size == 0
logger.warn('did not run as no services were present') logger.warn('did not run as no services were present')
return return
end end
@run_mutex.synchronize do @run_mutex.synchronize do
@running = true @running = true
@run_cond.signal @run_cond.signal
end
@pool.start
@server.start
server_tag = Object.new
until stopped?
@server.request_call(server_tag)
ev = @cq.pluck(server_tag, @poll_period)
next if ev.nil?
if ev.type != SERVER_RPC_NEW
logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
ev.close
next
end end
@pool.start c = new_active_server_call(ev.call, ev.result)
@server.start unless c.nil?
server_tag = Object.new mth = ev.result.method.to_sym
until stopped? ev.close
@server.request_call(server_tag) @pool.schedule(c) do |call|
ev = @cq.pluck(server_tag, @poll_period) rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
next if ev.nil?
if ev.type != SERVER_RPC_NEW
logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
ev.close
next
end
c = new_active_server_call(ev.call, ev.result)
unless c.nil?
mth = ev.result.method.to_sym
ev.close
@pool.schedule(c) do |call|
rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end
end end
end end
@running = false
end end
@running = false
end
def new_active_server_call(call, new_server_rpc) def new_active_server_call(call, new_server_rpc)
# Accept the call. This is necessary even if a status is to be sent # Accept the call. This is necessary even if a status is to be sent
# back immediately # back immediately
finished_tag = Object.new finished_tag = Object.new
call_queue = Core::CompletionQueue.new call_queue = Core::CompletionQueue.new
call.metadata = new_server_rpc.metadata # store the metadata call.metadata = new_server_rpc.metadata # store the metadata
call.server_accept(call_queue, finished_tag) call.server_accept(call_queue, finished_tag)
call.server_end_initial_metadata call.server_end_initial_metadata
# Send UNAVAILABLE if there are too many unprocessed jobs # Send UNAVAILABLE if there are too many unprocessed jobs
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
logger.info("waiting: #{jobs_count}, max: #{max}") logger.info("waiting: #{jobs_count}, max: #{max}")
if @pool.jobs_waiting > @max_waiting_requests if @pool.jobs_waiting > @max_waiting_requests
logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}") logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}")
noop = proc { |x| x } noop = proc { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop, c = ActiveCall.new(call, call_queue, noop, noop,
new_server_rpc.deadline, new_server_rpc.deadline,
finished_tag: finished_tag) finished_tag: finished_tag)
c.send_status(StatusCodes::UNAVAILABLE, '') c.send_status(StatusCodes::UNAVAILABLE, '')
return nil return nil
end end
# Send NOT_FOUND if the method does not exist
mth = new_server_rpc.method.to_sym
unless rpc_descs.key?(mth)
logger.warn("NOT_FOUND: #{new_server_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop,
new_server_rpc.deadline,
finished_tag: finished_tag)
c.send_status(StatusCodes::NOT_FOUND, '')
return nil
end
# Create the ActiveCall # Send NOT_FOUND if the method does not exist
rpc_desc = rpc_descs[mth] mth = new_server_rpc.method.to_sym
logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})") unless rpc_descs.key?(mth)
ActiveCall.new(call, call_queue, logger.warn("NOT_FOUND: #{new_server_rpc}")
rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), noop = proc { |x| x }
new_server_rpc.deadline, finished_tag: finished_tag) c = ActiveCall.new(call, call_queue, noop, noop,
new_server_rpc.deadline,
finished_tag: finished_tag)
c.send_status(StatusCodes::NOT_FOUND, '')
return nil
end end
# Pool is a simple thread pool for running server requests. # Create the ActiveCall
class Pool rpc_desc = rpc_descs[mth]
def initialize(size) logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})")
fail 'pool size must be positive' unless size > 0 ActiveCall.new(call, call_queue,
@jobs = Queue.new rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
@size = size new_server_rpc.deadline, finished_tag: finished_tag)
@stopped = false end
@stop_mutex = Mutex.new
@stop_cond = ConditionVariable.new
@workers = []
end
# Returns the number of jobs waiting # Pool is a simple thread pool for running server requests.
def jobs_waiting class Pool
@jobs.size def initialize(size)
end fail 'pool size must be positive' unless size > 0
@jobs = Queue.new
@size = size
@stopped = false
@stop_mutex = Mutex.new
@stop_cond = ConditionVariable.new
@workers = []
end
# Runs the given block on the queue with the provided args. # Returns the number of jobs waiting
# def jobs_waiting
# @param args the args passed blk when it is called @jobs.size
# @param blk the block to call end
def schedule(*args, &blk)
fail 'already stopped' if @stopped # Runs the given block on the queue with the provided args.
return if blk.nil? #
logger.info('schedule another job') # @param args the args passed blk when it is called
@jobs << [blk, args] # @param blk the block to call
end def schedule(*args, &blk)
fail 'already stopped' if @stopped
return if blk.nil?
logger.info('schedule another job')
@jobs << [blk, args]
end
# Starts running the jobs in the thread pool. # Starts running the jobs in the thread pool.
def start def start
fail 'already stopped' if @stopped fail 'already stopped' if @stopped
until @workers.size == @size.to_i until @workers.size == @size.to_i
next_thread = Thread.new do next_thread = Thread.new do
catch(:exit) do # allows { throw :exit } to kill a thread catch(:exit) do # allows { throw :exit } to kill a thread
loop do loop do
begin begin
blk, args = @jobs.pop blk, args = @jobs.pop
blk.call(*args) blk.call(*args)
rescue StandardError => e rescue StandardError => e
logger.warn('Error in worker thread') logger.warn('Error in worker thread')
logger.warn(e) logger.warn(e)
end
end end
end end
end
# removes the threads from workers, and signal when all the # removes the threads from workers, and signal when all the
# threads are complete. # threads are complete.
@stop_mutex.synchronize do @stop_mutex.synchronize do
@workers.delete(Thread.current) @workers.delete(Thread.current)
@stop_cond.signal if @workers.size == 0 @stop_cond.signal if @workers.size == 0
end
end end
@workers << next_thread
end end
@workers << next_thread
end end
end
# Stops the jobs in the pool # Stops the jobs in the pool
def stop def stop
logger.info('stopping, will wait for all the workers to exit') logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } } @workers.size.times { schedule { throw :exit } }
@stopped = true @stopped = true
# TODO: allow configuration of the keepalive period # TODO: allow configuration of the keepalive period
keep_alive = 5 keep_alive = 5
@stop_mutex.synchronize do @stop_mutex.synchronize do
@stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0 @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0
end end
# Forcibly shutdown any threads that are still alive. # Forcibly shutdown any threads that are still alive.
if @workers.size > 0 if @workers.size > 0
logger.warn("forcibly terminating #{@workers.size} worker(s)") logger.warn("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t| @workers.each do |t|
next unless t.alive? next unless t.alive?
begin begin
t.exit t.exit
rescue StandardError => e rescue StandardError => e
logger.warn('error while terminating a worker') logger.warn('error while terminating a worker')
logger.warn(e) logger.warn(e)
end
end end
end end
logger.info('stopped, all workers are shutdown')
end end
logger.info('stopped, all workers are shutdown')
end end
end
protected protected
def rpc_descs def rpc_descs
@rpc_descs ||= {} @rpc_descs ||= {}
end end
def rpc_handlers def rpc_handlers
@rpc_handlers ||= {} @rpc_handlers ||= {}
end end
private private
def assert_valid_service_class(cls) def assert_valid_service_class(cls)
unless cls.include?(GenericService) unless cls.include?(GenericService)
fail "#{cls} should 'include GenericService'" fail "#{cls} should 'include GenericService'"
end end
if cls.rpc_descs.size == 0 if cls.rpc_descs.size == 0
fail "#{cls} should specify some rpc descriptions" fail "#{cls} should specify some rpc descriptions"
end
cls.assert_rpc_descs_have_methods
end end
cls.assert_rpc_descs_have_methods
end
def add_rpc_descs_for(service) def add_rpc_descs_for(service)
cls = service.is_a?(Class) ? service : service.class cls = service.is_a?(Class) ? service : service.class
specs = rpc_descs specs = rpc_descs
handlers = rpc_handlers handlers = rpc_handlers
cls.rpc_descs.each_pair do |name, spec| cls.rpc_descs.each_pair do |name, spec|
route = "/#{cls.service_name}/#{name}".to_sym route = "/#{cls.service_name}/#{name}".to_sym
if specs.key? route if specs.key? route
fail "Cannot add rpc #{route} from #{spec}, already registered" fail "Cannot add rpc #{route} from #{spec}, already registered"
else
specs[route] = spec
if service.is_a?(Class)
handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
else else
specs[route] = spec handlers[route] = service.method(name.to_s.underscore.to_sym)
if service.is_a?(Class)
handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
else
handlers[route] = service.method(name.to_s.underscore.to_sym)
end
logger.info("handling #{route} with #{handlers[route]}")
end end
logger.info("handling #{route} with #{handlers[route]}")
end end
end end
end end

@ -48,188 +48,186 @@ class String
end end
end end
module Google # GRPC contains the General RPC module.
# Google::RPC contains the General RPC module. module GRPC
module RPC # Provides behaviour used to implement schema-derived service classes.
# Provides behaviour used to implement schema-derived service classes. #
# # Is intended to be used to support both client and server
# Is intended to be used to support both client and server # IDL-schema-derived servers.
# IDL-schema-derived servers. module GenericService
module GenericService # Used to indicate that a name has already been specified
# Used to indicate that a name has already been specified class DuplicateRpcName < StandardError
class DuplicateRpcName < StandardError def initialize(name)
def initialize(name) super("rpc (#{name}) is already defined")
super("rpc (#{name}) is already defined")
end
end end
end
# Provides a simple DSL to describe RPC services. # Provides a simple DSL to describe RPC services.
#
# E.g, a Maths service that uses the serializable messages DivArgs,
# DivReply and Num might define its endpoint uses the following way:
#
# rpc :div DivArgs, DivReply # single request, single response
# rpc :sum stream(Num), Num # streamed input, single response
# rpc :fib FibArgs, stream(Num) # single request, streamed response
# rpc :div_many stream(DivArgs), stream(DivReply)
# # streamed req and resp
#
# Each 'rpc' adds an RpcDesc to classes including this module, and
# #assert_rpc_descs_have_methods is used to ensure the including class
# provides methods with signatures that support all the descriptors.
module Dsl
# This configures the method names that the serializable message
# implementation uses to marshal and unmarshal messages.
# #
# E.g, a Maths service that uses the serializable messages DivArgs, # - unmarshal_class method must be a class method on the serializable
# DivReply and Num might define its endpoint uses the following way: # message type that takes a string (byte stream) and produces and object
# #
# rpc :div DivArgs, DivReply # single request, single response # - marshal_class_method is called on a serializable message instance
# rpc :sum stream(Num), Num # streamed input, single response # and produces a serialized string.
# rpc :fib FibArgs, stream(Num) # single request, streamed response
# rpc :div_many stream(DivArgs), stream(DivReply)
# # streamed req and resp
# #
# Each 'rpc' adds an RpcDesc to classes including this module, and # The Dsl verifies that the types in the descriptor have both the
# #assert_rpc_descs_have_methods is used to ensure the including class # unmarshal and marshal methods.
# provides methods with signatures that support all the descriptors. attr_writer(:marshal_class_method, :unmarshal_class_method)
module Dsl
# This configures the method names that the serializable message
# implementation uses to marshal and unmarshal messages.
#
# - unmarshal_class method must be a class method on the serializable
# message type that takes a string (byte stream) and produces and object
#
# - marshal_class_method is called on a serializable message instance
# and produces a serialized string.
#
# The Dsl verifies that the types in the descriptor have both the
# unmarshal and marshal methods.
attr_writer(:marshal_class_method, :unmarshal_class_method)
# This allows configuration of the service name.
attr_accessor(:service_name)
# Adds an RPC spec.
#
# Takes the RPC name and the classes representing the types to be
# serialized, and adds them to the including classes rpc_desc hash.
#
# input and output should both have the methods #marshal and #unmarshal
# that are responsible for writing and reading an object instance from a
# byte buffer respectively.
#
# @param name [String] the name of the rpc
# @param input [Object] the input parameter's class
# @param output [Object] the output parameter's class
def rpc(name, input, output)
fail(DuplicateRpcName, name) if rpc_descs.key? name
assert_can_marshal(input)
assert_can_marshal(output)
rpc_descs[name] = RpcDesc.new(name, input, output,
marshal_class_method,
unmarshal_class_method)
end
def inherited(subclass) # This allows configuration of the service name.
# Each subclass should have a distinct class variable with its own attr_accessor(:service_name)
# rpc_descs
subclass.rpc_descs.merge!(rpc_descs)
subclass.service_name = service_name
end
# the name of the instance method used to marshal events to a byte # Adds an RPC spec.
# stream. #
def marshal_class_method # Takes the RPC name and the classes representing the types to be
@marshal_class_method ||= :marshal # serialized, and adds them to the including classes rpc_desc hash.
end #
# input and output should both have the methods #marshal and #unmarshal
# that are responsible for writing and reading an object instance from a
# byte buffer respectively.
#
# @param name [String] the name of the rpc
# @param input [Object] the input parameter's class
# @param output [Object] the output parameter's class
def rpc(name, input, output)
fail(DuplicateRpcName, name) if rpc_descs.key? name
assert_can_marshal(input)
assert_can_marshal(output)
rpc_descs[name] = RpcDesc.new(name, input, output,
marshal_class_method,
unmarshal_class_method)
end
# the name of the class method used to unmarshal from a byte stream. def inherited(subclass)
def unmarshal_class_method # Each subclass should have a distinct class variable with its own
@unmarshal_class_method ||= :unmarshal # rpc_descs
end subclass.rpc_descs.merge!(rpc_descs)
subclass.service_name = service_name
end
def assert_can_marshal(cls) # the name of the instance method used to marshal events to a byte
cls = cls.type if cls.is_a? RpcDesc::Stream # stream.
mth = unmarshal_class_method def marshal_class_method
unless cls.methods.include? mth @marshal_class_method ||= :marshal
fail(ArgumentError, "#{cls} needs #{cls}.#{mth}") end
end
mth = marshal_class_method # the name of the class method used to unmarshal from a byte stream.
return if cls.methods.include? mth def unmarshal_class_method
@unmarshal_class_method ||= :unmarshal
end
def assert_can_marshal(cls)
cls = cls.type if cls.is_a? RpcDesc::Stream
mth = unmarshal_class_method
unless cls.methods.include? mth
fail(ArgumentError, "#{cls} needs #{cls}.#{mth}") fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
end end
mth = marshal_class_method
return if cls.methods.include? mth
fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
end
# @param cls [Class] the class of a serializable type # @param cls [Class] the class of a serializable type
# @return cls wrapped in a RpcDesc::Stream # @return cls wrapped in a RpcDesc::Stream
def stream(cls) def stream(cls)
assert_can_marshal(cls) assert_can_marshal(cls)
RpcDesc::Stream.new(cls) RpcDesc::Stream.new(cls)
end end
# the RpcDescs defined for this GenericService, keyed by name. # the RpcDescs defined for this GenericService, keyed by name.
def rpc_descs def rpc_descs
@rpc_descs ||= {} @rpc_descs ||= {}
end end
# Creates a rpc client class with methods for accessing the methods # Creates a rpc client class with methods for accessing the methods
# currently in rpc_descs. # currently in rpc_descs.
def rpc_stub_class def rpc_stub_class
descs = rpc_descs descs = rpc_descs
route_prefix = service_name route_prefix = service_name
Class.new(ClientStub) do Class.new(ClientStub) do
# @param host [String] the host the stub connects to # @param host [String] the host the stub connects to
# @param kw [KeywordArgs] the channel arguments, plus any optional # @param kw [KeywordArgs] the channel arguments, plus any optional
# args for configuring the client's channel # args for configuring the client's channel
def initialize(host, **kw) def initialize(host, **kw)
super(host, Core::CompletionQueue.new, **kw) super(host, Core::CompletionQueue.new, **kw)
end end
# Used define_method to add a method for each rpc_desc. Each method # Used define_method to add a method for each rpc_desc. Each method
# calls the base class method for the given descriptor. # calls the base class method for the given descriptor.
descs.each_pair do |name, desc| descs.each_pair do |name, desc|
mth_name = name.to_s.underscore.to_sym mth_name = name.to_s.underscore.to_sym
marshal = desc.marshal_proc marshal = desc.marshal_proc
unmarshal = desc.unmarshal_proc(:output) unmarshal = desc.unmarshal_proc(:output)
route = "/#{route_prefix}/#{name}" route = "/#{route_prefix}/#{name}"
if desc.request_response? if desc.request_response?
define_method(mth_name) do |req, deadline = nil| define_method(mth_name) do |req, deadline = nil|
logger.debug("calling #{@host}:#{route}") logger.debug("calling #{@host}:#{route}")
request_response(route, req, marshal, unmarshal, deadline) request_response(route, req, marshal, unmarshal, deadline)
end end
elsif desc.client_streamer? elsif desc.client_streamer?
define_method(mth_name) do |reqs, deadline = nil| define_method(mth_name) do |reqs, deadline = nil|
logger.debug("calling #{@host}:#{route}") logger.debug("calling #{@host}:#{route}")
client_streamer(route, reqs, marshal, unmarshal, deadline) client_streamer(route, reqs, marshal, unmarshal, deadline)
end end
elsif desc.server_streamer? elsif desc.server_streamer?
define_method(mth_name) do |req, deadline = nil, &blk| define_method(mth_name) do |req, deadline = nil, &blk|
logger.debug("calling #{@host}:#{route}") logger.debug("calling #{@host}:#{route}")
server_streamer(route, req, marshal, unmarshal, deadline, server_streamer(route, req, marshal, unmarshal, deadline,
&blk) &blk)
end end
else # is a bidi_stream else # is a bidi_stream
define_method(mth_name) do |reqs, deadline = nil, &blk| define_method(mth_name) do |reqs, deadline = nil, &blk|
logger.debug("calling #{@host}:#{route}") logger.debug("calling #{@host}:#{route}")
bidi_streamer(route, reqs, marshal, unmarshal, deadline, &blk) bidi_streamer(route, reqs, marshal, unmarshal, deadline, &blk)
end
end end
end end
end end
end end
end
# Asserts that the appropriate methods are defined for each added rpc # Asserts that the appropriate methods are defined for each added rpc
# spec. Is intended to aid verifying that server classes are correctly # spec. Is intended to aid verifying that server classes are correctly
# implemented. # implemented.
def assert_rpc_descs_have_methods def assert_rpc_descs_have_methods
rpc_descs.each_pair do |m, spec| rpc_descs.each_pair do |m, spec|
mth_name = m.to_s.underscore.to_sym mth_name = m.to_s.underscore.to_sym
unless instance_methods.include?(mth_name) unless instance_methods.include?(mth_name)
fail "#{self} does not provide instance method '#{mth_name}'" fail "#{self} does not provide instance method '#{mth_name}'"
end
spec.assert_arity_matches(instance_method(mth_name))
end end
spec.assert_arity_matches(instance_method(mth_name))
end end
end end
end
def self.included(o) def self.included(o)
o.extend(Dsl) o.extend(Dsl)
# Update to the use the service name including module. Proivde a default # Update to the use the service name including module. Proivde a default
# that can be nil e,g. when modules are declared dynamically. # that can be nil e,g. when modules are declared dynamically.
return unless o.service_name.nil? return unless o.service_name.nil?
if o.name.nil? if o.name.nil?
o.service_name = 'GenericService' o.service_name = 'GenericService'
else
modules = o.name.split('::')
if modules.length > 2
o.service_name = modules[modules.length - 2]
else else
modules = o.name.split('::') o.service_name = modules.first
if modules.length > 2
o.service_name = modules[modules.length - 2]
else
o.service_name = modules.first
end
end end
end end
end end

@ -35,6 +35,6 @@ Logging.logger.root.appenders = Logging.appenders.stdout
Logging.logger.root.level = :info Logging.logger.root.level = :info
# TODO: provide command-line configuration for logging # TODO: provide command-line configuration for logging
Logging.logger['Google::RPC'].level = :debug Logging.logger['GRPC'].level = :debug
Logging.logger['Google::RPC::ActiveCall'].level = :info Logging.logger['GRPC::ActiveCall'].level = :info
Logging.logger['Google::RPC::BidiCall'].level = :info Logging.logger['GRPC::BidiCall'].level = :info

@ -27,9 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
module Google # GRPC contains the General RPC module.
# Google::RPC contains the General RPC module. module GRPC
module RPC VERSION = '0.0.1'
VERSION = '0.0.1'
end
end end

@ -36,9 +36,9 @@ require 'faraday'
require 'grpc/auth/compute_engine' require 'grpc/auth/compute_engine'
require 'spec_helper' require 'spec_helper'
describe Google::RPC::Auth::GCECredentials do describe GRPC::Auth::GCECredentials do
MD_URI = '/computeMetadata/v1/instance/service-accounts/default/token' MD_URI = '/computeMetadata/v1/instance/service-accounts/default/token'
GCECredentials = Google::RPC::Auth::GCECredentials GCECredentials = GRPC::Auth::GCECredentials
before(:example) do before(:example) do
@client = GCECredentials.new @client = GCECredentials.new

@ -38,7 +38,7 @@ require 'multi_json'
require 'openssl' require 'openssl'
require 'spec_helper' require 'spec_helper'
describe Google::RPC::Auth::ServiceAccountCredentials do describe GRPC::Auth::ServiceAccountCredentials do
before(:example) do before(:example) do
@key = OpenSSL::PKey::RSA.new(2048) @key = OpenSSL::PKey::RSA.new(2048)
cred_json = { cred_json = {
@ -49,7 +49,7 @@ describe Google::RPC::Auth::ServiceAccountCredentials do
type: 'service_account' type: 'service_account'
} }
cred_json_text = MultiJson.dump(cred_json) cred_json_text = MultiJson.dump(cred_json)
@client = Google::RPC::Auth::ServiceAccountCredentials.new( @client = GRPC::Auth::ServiceAccountCredentials.new(
'https://www.googleapis.com/auth/userinfo.profile', 'https://www.googleapis.com/auth/userinfo.profile',
StringIO.new(cred_json_text)) StringIO.new(cred_json_text))
end end

Loading…
Cancel
Save