Merge pull request #6662 from murgatroid99/ruby_explicit_kw_args

Avoid using unspecified keyword args where possible
pull/6703/head
Jan Tattermusch 9 years ago
commit 6db0232c3a
  1. 5
      src/ruby/lib/grpc/errors.rb
  2. 65
      src/ruby/lib/grpc/generic/active_call.rb
  3. 85
      src/ruby/lib/grpc/generic/client_stub.rb
  4. 8
      src/ruby/lib/grpc/generic/rpc_desc.rb
  5. 19
      src/ruby/lib/grpc/generic/rpc_server.rb
  6. 16
      src/ruby/lib/grpc/generic/service.rb
  7. 7
      src/ruby/pb/test/client.rb
  8. 4
      src/ruby/qps/client.rb
  9. 5
      src/ruby/qps/server.rb
  10. 5
      src/ruby/spec/generic/active_call_spec.rb
  11. 51
      src/ruby/spec/generic/client_stub_spec.rb
  12. 22
      src/ruby/spec/generic/rpc_desc_spec.rb
  13. 103
      src/ruby/spec/generic/rpc_server_spec.rb
  14. 8
      src/ruby/spec/pb/health/checker_spec.rb

@ -40,11 +40,12 @@ module GRPC
# @param code [Numeric] the status code
# @param details [String] the details of the exception
def initialize(code, details = 'unknown cause', **kw)
# @param metadata [Hash] the error's metadata
def initialize(code, details = 'unknown cause', metadata = {})
super("#{code}:#{details}")
@code = code
@details = details
@metadata = kw
@metadata = metadata
end
# Converts the exception to a GRPC::Status for use in the networking

@ -44,7 +44,7 @@ class Struct
# raise BadStatus, propagating the metadata if present.
md = status.metadata
with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }]
fail GRPC::BadStatus.new(status.code, status.details, **with_sym_keys)
fail GRPC::BadStatus.new(status.code, status.details, with_sym_keys)
end
status
end
@ -76,14 +76,15 @@ module GRPC
#
# @param call [Call] a call on which to start and invocation
# @param q [CompletionQueue] the completion queue
def self.client_invoke(call, q, **kw)
# @param metadata [Hash] the metadata
def self.client_invoke(call, q, metadata = {})
fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(TypeError, '!Core::CompletionQueue')
end
metadata_tag = Object.new
call.run_batch(q, metadata_tag, INFINITE_FUTURE,
SEND_INITIAL_METADATA => kw)
SEND_INITIAL_METADATA => metadata)
metadata_tag
end
@ -211,13 +212,12 @@ module GRPC
# @param details [String] details
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
def send_status(code = OK, details = '', assert_finished = false, **kw)
# @param metadata [Hash] metadata to send to the server. If a value is a
# list, mulitple metadata for its key are sent
def send_status(code = OK, details = '', assert_finished = false,
metadata: {})
ops = {
SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, kw)
SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata)
}
ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
@call.run_batch(@cq, self, INFINITE_FUTURE, ops)
@ -311,14 +311,12 @@ module GRPC
# request_response sends a request to a GRPC server, and returns the
# response.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param req [Object] the request sent to the server
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def request_response(req, **kw)
start_call(**kw) unless @started
def request_response(req, metadata: {})
start_call(metadata) unless @started
remote_send(req)
writes_done(false)
response = remote_read
@ -337,14 +335,12 @@ module GRPC
# array of marshallable objects; in typical case it will be an Enumerable
# that allows dynamic construction of the marshallable objects.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def client_streamer(requests, **kw)
start_call(**kw) unless @started
def client_streamer(requests, metadata: {})
start_call(metadata) unless @started
requests.each { |r| remote_send(r) }
writes_done(false)
response = remote_read
@ -365,15 +361,12 @@ module GRPC
# it is executed with each response as the argument and no result is
# returned.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
# any keyword arguments are treated as metadata to be sent to the server.
#
# @param req [Object] the request sent to the server
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, **kw)
start_call(**kw) unless @started
def server_streamer(req, metadata: {})
start_call(metadata) unless @started
remote_send(req)
writes_done(false)
replies = enum_for(:each_remote_read_then_finish)
@ -407,14 +400,12 @@ module GRPC
# the_call#writes_done has been called, otherwise the block will loop
# forever.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
def bidi_streamer(requests, metadata: {}, &blk)
start_call(metadata) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal,
metadata_tag: @metadata_tag)
@metadata_tag = nil # run_on_client ensures metadata is read
@ -453,9 +444,11 @@ module GRPC
private
# Starts the call if not already started
def start_call(**kw)
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
def start_call(metadata = {})
return if @started
@metadata_tag = ActiveCall.client_invoke(@call, @cq, **kw)
@metadata_tag = ActiveCall.client_invoke(@call, @cq, metadata)
@started = true
end

@ -44,21 +44,21 @@ module GRPC
# setup_channel is used by #initialize to constuct a channel from its
# arguments.
def self.setup_channel(alt_chan, host, creds, **kw)
def self.setup_channel(alt_chan, host, creds, channel_args = {})
unless alt_chan.nil?
fail(TypeError, '!Channel') unless alt_chan.is_a?(Core::Channel)
return alt_chan
end
if kw['grpc.primary_user_agent'].nil?
kw['grpc.primary_user_agent'] = ''
if channel_args['grpc.primary_user_agent'].nil?
channel_args['grpc.primary_user_agent'] = ''
else
kw['grpc.primary_user_agent'] += ' '
channel_args['grpc.primary_user_agent'] += ' '
end
kw['grpc.primary_user_agent'] += "grpc-ruby/#{VERSION}"
channel_args['grpc.primary_user_agent'] += "grpc-ruby/#{VERSION}"
unless creds.is_a?(Core::ChannelCredentials) || creds.is_a?(Symbol)
fail(TypeError, '!ChannelCredentials or Symbol')
end
Core::Channel.new(host, kw, creds)
Core::Channel.new(host, channel_args, creds)
end
# Allows users of the stub to modify the propagate mask.
@ -96,15 +96,16 @@ module GRPC
# :this_channel_is_insecure
# @param channel_override [Core::Channel] a pre-created channel
# @param timeout [Number] the default timeout to use in requests
# @param kw [KeywordArgs]the channel arguments
# @param channel_args [Hash] the channel arguments
def initialize(host, q, creds,
channel_override: nil,
timeout: nil,
propagate_mask: nil,
**kw)
channel_args: {})
fail(TypeError, '!CompletionQueue') unless q.is_a?(Core::CompletionQueue)
@ch = ClientStub.setup_channel(channel_override, host, creds, **kw)
alt_host = kw[Core::Channel::SSL_TARGET]
@ch = ClientStub.setup_channel(channel_override, host, creds,
channel_args)
alt_host = channel_args[Core::Channel::SSL_TARGET]
@host = alt_host.nil? ? host : alt_host
@propagate_mask = propagate_mask
@timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout
@ -135,42 +136,35 @@ module GRPC
# If return_op is true, the call returns an Operation, calling execute
# on the Operation returns the response.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds
# @param deadline [Time] (optional) the time the request should complete
# @param return_op [true|false] return an Operation if true
# @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @param credentials [Core::CallCredentials] credentials to use when making
# the call
# @param return_op [true|false] return an Operation if true
# @param metadata [Hash] metadata to be sent to the server
# @return [Object] the response received from the server
def request_response(method, req, marshal, unmarshal,
deadline: nil,
timeout: nil,
return_op: false,
parent: nil,
credentials: nil,
**kw)
metadata: {})
c = new_active_call(method, marshal, unmarshal,
deadline: deadline,
timeout: timeout,
parent: parent,
credentials: credentials)
return c.request_response(req, **kw) unless return_op
return c.request_response(req, metadata: metadata) unless return_op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #request_response.
op = c.operation
op.define_singleton_method(:execute) do
c.request_response(req, **kw)
c.request_response(req, metadata: metadata)
end
op
end
@ -205,42 +199,35 @@ module GRPC
#
# If return_op is true, the call returns the response.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds
# @param deadline [Time] (optional) the time the request should complete
# @param return_op [true|false] return an Operation if true
# @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @param credentials [Core::CallCredentials] credentials to use when making
# the call
# @param metadata [Hash] metadata to be sent to the server
# @return [Object|Operation] the response received from the server
def client_streamer(method, requests, marshal, unmarshal,
deadline: nil,
timeout: nil,
return_op: false,
parent: nil,
credentials: nil,
**kw)
metadata: {})
c = new_active_call(method, marshal, unmarshal,
deadline: deadline,
timeout: timeout,
parent: parent,
credentials: credentials)
return c.client_streamer(requests, **kw) unless return_op
return c.client_streamer(requests, metadata: metadata) unless return_op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #client_streamer.
op = c.operation
op.define_singleton_method(:execute) do
c.client_streamer(requests, **kw)
c.client_streamer(requests, metadata: metadata)
end
op
end
@ -292,35 +279,33 @@ module GRPC
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds
# @param deadline [Time] (optional) the time the request should complete
# @param return_op [true|false]return an Operation if true
# @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @param credentials [Core::CallCredentials] credentials to use when making
# the call
# @param metadata [Hash] metadata to be sent to the server
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above
def server_streamer(method, req, marshal, unmarshal,
deadline: nil,
timeout: nil,
return_op: false,
parent: nil,
credentials: nil,
**kw,
metadata: {},
&blk)
c = new_active_call(method, marshal, unmarshal,
deadline: deadline,
timeout: timeout,
parent: parent,
credentials: credentials)
return c.server_streamer(req, **kw, &blk) unless return_op
return c.server_streamer(req, metadata: metadata, &blk) unless return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #server_streamer
op = c.operation
op.define_singleton_method(:execute) do
c.server_streamer(req, **kw, &blk)
c.server_streamer(req, metadata: metadata, &blk)
end
op
end
@ -391,11 +376,6 @@ module GRPC
# * the deadline is exceeded
#
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# == Return Value ==
#
# if the return_op is false, the return value is an Enumerator of the
@ -411,36 +391,35 @@ module GRPC
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds
# @param deadline [Time] (optional) the time the request should complete
# @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @param credentials [Core::CallCredentials] credentials to use when making
# the call
# @param return_op [true|false] return an Operation if true
# @param metadata [Hash] metadata to be sent to the server
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal,
deadline: nil,
timeout: nil,
return_op: false,
parent: nil,
credentials: nil,
**kw,
metadata: {},
&blk)
c = new_active_call(method, marshal, unmarshal,
deadline: deadline,
timeout: timeout,
parent: parent,
credentials: credentials)
return c.bidi_streamer(requests, **kw, &blk) unless return_op
return c.bidi_streamer(requests, metadata: metadata,
&blk) unless return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #bidi_streamer
op = c.operation
op.define_singleton_method(:execute) do
c.bidi_streamer(requests, **kw, &blk)
c.bidi_streamer(requests, metadata: metadata, &blk)
end
op
end
@ -457,12 +436,10 @@ module GRPC
# @param timeout [TimeConst]
def new_active_call(method, marshal, unmarshal,
deadline: nil,
timeout: nil,
parent: nil,
credentials: nil)
if deadline.nil?
deadline = from_relative_time(timeout.nil? ? @timeout : timeout)
end
deadline = from_relative_time(@timeout) if deadline.nil?
# Provide each new client call with its own completion queue
call_queue = Core::CompletionQueue.new
call = @ch.create_call(call_queue,

@ -80,12 +80,12 @@ module GRPC
else # is a bidi_stream
active_call.run_server_bidi(mth)
end
send_status(active_call, OK, 'OK', **active_call.output_metadata)
send_status(active_call, OK, 'OK', active_call.output_metadata)
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application error
# code and detail message and some additional app-specific metadata.
GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}")
send_status(active_call, e.code, e.details, **e.metadata)
send_status(active_call, e.code, e.details, e.metadata)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
@ -135,10 +135,10 @@ module GRPC
"##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}"
end
def send_status(active_client, code, details, **kw)
def send_status(active_client, code, details, metadata = {})
details = 'Not sure why' if details.nil?
GRPC.logger.debug("Sending status #{code}:#{details}")
active_client.send_status(code, details, code == OK, **kw)
active_client.send_status(code, details, code == OK, metadata: metadata)
rescue StandardError => e
GRPC.logger.warn("Could not send status #{code}:#{details}")
GRPC.logger.warn(e)

@ -169,14 +169,6 @@ module GRPC
alt_cq
end
# setup_srv is used by #initialize to constuct a Core::Server from its
# arguments.
def self.setup_srv(alt_srv, cq, **kw)
return Core::Server.new(cq, kw) if alt_srv.nil?
fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server
alt_srv
end
# setup_connect_md_proc is used by #initialize to validate the
# connect_md_proc.
def self.setup_connect_md_proc(a_proc)
@ -193,9 +185,6 @@ module GRPC
# instance, however other arbitrary are allowed and when present are used
# to configure the listeninng connection set up by the RpcServer.
#
# * server_override: which if passed must be a [GRPC::Core::Server]. When
# present.
#
# * poll_period: when present, the server polls for new events with this
# period
#
@ -217,13 +206,15 @@ module GRPC
# when non-nil is a proc for determining metadata to to send back the client
# on receiving an invocation req. The proc signature is:
# {key: val, ..} func(method_name, {key: val, ...})
#
# * server_args:
# A server arguments hash to be passed down to the underlying core server
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:DEFAULT_POLL_PERIOD,
completion_queue_override:nil,
server_override:nil,
connect_md_proc:nil,
**kw)
server_args:{})
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@cq = RpcServer.setup_cq(completion_queue_override)
@max_waiting_requests = max_waiting_requests
@ -235,7 +226,7 @@ module GRPC
# running_state can take 4 values: :not_started, :running, :stopping, and
# :stopped. State transitions can only proceed in that order.
@running_state = :not_started
@server = RpcServer.setup_srv(server_override, @cq, **kw)
@server = Core::Server.new(@cq, server_args)
end
# stops a running server

@ -179,24 +179,24 @@ module GRPC
unmarshal = desc.unmarshal_proc(:output)
route = "/#{route_prefix}/#{name}"
if desc.request_response?
define_method(mth_name) do |req, **kw|
define_method(mth_name) do |req, metadata = {}|
GRPC.logger.debug("calling #{@host}:#{route}")
request_response(route, req, marshal, unmarshal, **kw)
request_response(route, req, marshal, unmarshal, metadata)
end
elsif desc.client_streamer?
define_method(mth_name) do |reqs, **kw|
define_method(mth_name) do |reqs, metadata = {}|
GRPC.logger.debug("calling #{@host}:#{route}")
client_streamer(route, reqs, marshal, unmarshal, **kw)
client_streamer(route, reqs, marshal, unmarshal, metadata)
end
elsif desc.server_streamer?
define_method(mth_name) do |req, **kw, &blk|
define_method(mth_name) do |req, metadata = {}, &blk|
GRPC.logger.debug("calling #{@host}:#{route}")
server_streamer(route, req, marshal, unmarshal, **kw, &blk)
server_streamer(route, req, marshal, unmarshal, metadata, &blk)
end
else # is a bidi_stream
define_method(mth_name) do |reqs, **kw, &blk|
define_method(mth_name) do |reqs, metadata = {}, &blk|
GRPC.logger.debug("calling #{@host}:#{route}")
bidi_streamer(route, reqs, marshal, unmarshal, **kw, &blk)
bidi_streamer(route, reqs, marshal, unmarshal, metadata, &blk)
end
end
end

@ -114,7 +114,9 @@ def create_stub(opts)
if opts.secure
creds = ssl_creds(opts.use_test_ca)
stub_opts = {
GRPC::Core::Channel::SSL_TARGET => opts.host_override
channel_args: {
GRPC::Core::Channel::SSL_TARGET => opts.host_override
}
}
# Add service account creds if specified
@ -315,7 +317,8 @@ class NamedTests
def timeout_on_sleeping_server
msg_sizes = [[27_182, 31_415]]
ppp = PingPongPlayer.new(msg_sizes)
resps = @stub.full_duplex_call(ppp.each_item, timeout: 0.001)
deadline = GRPC::Core::TimeConsts::from_relative_time(0.001)
resps = @stub.full_duplex_call(ppp.each_item, deadline: deadline)
resps.each { |r| ppp.queue.push(r) }
fail 'Should have raised GRPC::BadStatus(DEADLINE_EXCEEDED)'
rescue GRPC::BadStatus => e

@ -66,8 +66,10 @@ class BenchmarkClient
cred = GRPC::Core::ChannelCredentials.new()
end
if config.security_params.server_host_override
opts[GRPC::Core::Channel::SSL_TARGET] =
channel_args = {}
channel_args[GRPC::Core::Channel::SSL_TARGET] =
config.security_params.server_host_override
opts[:channel_args] = channel_args
end
else
cred = :this_channel_is_insecure

@ -75,13 +75,14 @@ class BenchmarkServer
@port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred)
@server.handle(BenchmarkServiceImpl.new)
@start_time = Time.now
Thread.new {
t = Thread.new {
@server.run
}
t.abort_on_exception
end
def mark(reset)
s = Grpc::Testing::ServerStats.new(time_elapsed:
(Time.now-@start_time).to_f)
(Time.now-@start_time).to_f)
@start_time = Time.now if reset
s
end

@ -159,9 +159,10 @@ describe GRPC::ActiveCall do
end
describe '#client_invoke' do
it 'sends keywords as metadata to the server when the are present' do
it 'sends metadata to the server when present' do
call = make_test_call
ActiveCall.client_invoke(call, @client_queue, k1: 'v1', k2: 'v2')
metadata = { k1: 'v1', k2: 'v2' }
ActiveCall.client_invoke(call, @client_queue, metadata)
recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
recvd_call = recvd_rpc.call
expect(recvd_call).to_not be_nil

@ -68,15 +68,7 @@ describe 'ClientStub' do
describe '#new' do
let(:fake_host) { 'localhost:0' }
it 'can be created from a host and args' do
opts = { a_channel_arg: 'an_arg' }
blk = proc do
GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts)
end
expect(&blk).not_to raise_error
end
it 'can be created with a default deadline' do
opts = { a_channel_arg: 'an_arg', deadline: 5 }
opts = { channel_args: { a_channel_arg: 'an_arg' } }
blk = proc do
GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts)
end
@ -84,7 +76,10 @@ describe 'ClientStub' do
end
it 'can be created with an channel override' do
opts = { a_channel_arg: 'an_arg', channel_override: @ch }
opts = {
channel_args: { a_channel_arg: 'an_arg' },
channel_override: @ch
}
blk = proc do
GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts)
end
@ -93,7 +88,10 @@ describe 'ClientStub' do
it 'cannot be created with a bad channel override' do
blk = proc do
opts = { a_channel_arg: 'an_arg', channel_override: Object.new }
opts = {
channel_args: { a_channel_arg: 'an_arg' },
channel_override: Object.new
}
GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts)
end
expect(&blk).to raise_error
@ -101,7 +99,7 @@ describe 'ClientStub' do
it 'cannot be created with bad credentials' do
blk = proc do
opts = { a_channel_arg: 'an_arg' }
opts = { channel_args: { a_channel_arg: 'an_arg' } }
GRPC::ClientStub.new(fake_host, @cq, Object.new, **opts)
end
expect(&blk).to raise_error
@ -111,8 +109,10 @@ describe 'ClientStub' do
certs = load_test_certs
blk = proc do
opts = {
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
a_channel_arg: 'an_arg'
channel_args: {
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
a_channel_arg: 'an_arg'
}
}
creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)
GRPC::ClientStub.new(fake_host, @cq, creds, **opts)
@ -172,7 +172,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_response(stub)
stub.request_response(@method, @sent_msg, noop, noop,
k1: 'v1', k2: 'v2')
metadata: { k1: 'v1', k2: 'v2' })
end
it_behaves_like 'request response'
@ -181,7 +181,8 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_response(stub)
op = stub.request_response(@method, @sent_msg, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
return_op: true,
metadata: { k1: 'v1', k2: 'v2' })
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute
end
@ -196,7 +197,7 @@ describe 'ClientStub' do
server_port = create_test_server
host = "localhost:#{server_port}"
@stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
@options = { k1: 'v1', k2: 'v2' }
@metadata = { k1: 'v1', k2: 'v2' }
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@resp = 'a_reply'
end
@ -208,7 +209,7 @@ describe 'ClientStub' do
end
it 'should send metadata to the server ok' do
th = run_client_streamer(@sent_msgs, @resp, @pass, @options)
th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
expect(get_response(@stub)).to eq(@resp)
th.join
end
@ -221,7 +222,7 @@ describe 'ClientStub' do
end
it 'should raise ArgumentError if metadata contains invalid values' do
@options.merge!(k3: 3)
@metadata.merge!(k3: 3)
expect do
get_response(@stub)
end.to raise_error(ArgumentError,
@ -231,7 +232,8 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_response(stub)
stub.client_streamer(@method, @sent_msgs, noop, noop, @options)
stub.client_streamer(@method, @sent_msgs, noop, noop,
metadata: @metadata)
end
it_behaves_like 'client streaming'
@ -240,7 +242,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_response(stub)
op = stub.client_streamer(@method, @sent_msgs, noop, noop,
@options.merge(return_op: true))
return_op: true, metadata: @metadata)
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute
end
@ -290,7 +292,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_responses(stub)
e = stub.server_streamer(@method, @sent_msg, noop, noop,
k1: 'v1', k2: 'v2')
metadata: { k1: 'v1', k2: 'v2' })
expect(e).to be_a(Enumerator)
e
end
@ -301,7 +303,8 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_responses(stub)
op = stub.server_streamer(@method, @sent_msg, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
return_op: true,
metadata: { k1: 'v1', k2: 'v2' })
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
expect(e).to be_a(Enumerator)
@ -383,7 +386,7 @@ describe 'ClientStub' do
stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure)
blk = proc do
e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
timeout: 0.001)
deadline: from_relative_time(0.001))
e.collect { |r| r }
end
expect(&blk).to raise_error GRPC::BadStatus, /Deadline Exceeded/

@ -56,14 +56,14 @@ describe GRPC::RpcDesc do
it 'sends the specified status if BadStatus is raised' do
expect(@call).to receive(:remote_read).once.and_return(Object.new)
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
{})
metadata: {})
this_desc.run_server_method(@call, method(:bad_status))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:remote_read).once.and_return(Object.new)
expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
false, {})
false, metadata: {})
this_desc.run_server_method(@call, method(:other_error))
end
@ -93,7 +93,7 @@ describe GRPC::RpcDesc do
expect(@call).to receive(:remote_send).once.with(@ok_response)
expect(@call).to receive(:output_metadata).and_return(fake_md)
expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
**fake_md)
metadata: fake_md)
this_desc.run_server_method(@call, method(:fake_reqresp))
end
end
@ -106,13 +106,13 @@ describe GRPC::RpcDesc do
it 'sends the specified status if BadStatus is raised' do
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
{})
metadata: {})
@client_streamer.run_server_method(@call, method(:bad_status_alt))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
false, {})
expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason,
false, metadata: {})
@client_streamer.run_server_method(@call, method(:other_error_alt))
end
@ -128,7 +128,7 @@ describe GRPC::RpcDesc do
expect(@call).to receive(:remote_send).once.with(@ok_response)
expect(@call).to receive(:output_metadata).and_return(fake_md)
expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
**fake_md)
metadata: fake_md)
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
end
@ -148,7 +148,7 @@ describe GRPC::RpcDesc do
expect(@call).to receive(:remote_send).twice.with(@ok_response)
expect(@call).to receive(:output_metadata).and_return(fake_md)
expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
**fake_md)
metadata: fake_md)
@server_streamer.run_server_method(@call, method(:fake_svstream))
end
end
@ -165,14 +165,14 @@ describe GRPC::RpcDesc do
e = GRPC::BadStatus.new(@bs_code, 'NOK')
expect(@call).to receive(:run_server_bidi).and_raise(e)
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
{})
metadata: {})
@bidi_streamer.run_server_method(@call, method(:bad_status_alt))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:run_server_bidi).and_raise(StandardError)
expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason,
false, {})
false, metadata: {})
@bidi_streamer.run_server_method(@call, method(:other_error_alt))
end
@ -180,7 +180,7 @@ describe GRPC::RpcDesc do
expect(@call).to receive(:run_server_bidi)
expect(@call).to receive(:output_metadata).and_return(fake_md)
expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
**fake_md)
metadata: fake_md)
@bidi_streamer.run_server_method(@call, method(:fake_bidistream))
end
end

@ -99,7 +99,7 @@ class FailingService
end
def an_rpc(_req, _call)
fail GRPC::BadStatus.new(@code, @details, **@md)
fail GRPC::BadStatus.new(@code, @details, @md)
end
end
@ -137,24 +137,11 @@ describe GRPC::RpcServer do
@noop = proc { |x| x }
@server_queue = GRPC::Core::CompletionQueue.new
server_host = '0.0.0.0:0'
@server = GRPC::Core::Server.new(@server_queue, nil)
server_port = @server.add_http2_port(server_host, :this_port_is_insecure)
@host = "localhost:#{server_port}"
@ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure)
end
describe '#new' do
it 'can be created with just some args' do
opts = { a_channel_arg: 'an_arg' }
blk = proc do
RpcServer.new(**opts)
end
expect(&blk).not_to raise_error
end
it 'can be created with a default deadline' do
opts = { a_channel_arg: 'an_arg', deadline: 5 }
opts = { server_args: { a_channel_arg: 'an_arg' } }
blk = proc do
RpcServer.new(**opts)
end
@ -163,7 +150,7 @@ describe GRPC::RpcServer do
it 'can be created with a completion queue override' do
opts = {
a_channel_arg: 'an_arg',
server_args: { a_channel_arg: 'an_arg' },
completion_queue_override: @server_queue
}
blk = proc do
@ -175,7 +162,7 @@ describe GRPC::RpcServer do
it 'cannot be created with a bad completion queue override' do
blk = proc do
opts = {
a_channel_arg: 'an_arg',
server_args: { a_channel_arg: 'an_arg' },
completion_queue_override: Object.new
}
RpcServer.new(**opts)
@ -186,38 +173,20 @@ describe GRPC::RpcServer do
it 'cannot be created with invalid ServerCredentials' do
blk = proc do
opts = {
a_channel_arg: 'an_arg',
server_args: { a_channel_arg: 'an_arg' },
creds: Object.new
}
RpcServer.new(**opts)
end
expect(&blk).to raise_error
end
it 'can be created with a server override' do
opts = { a_channel_arg: 'an_arg', server_override: @server }
blk = proc do
RpcServer.new(**opts)
end
expect(&blk).not_to raise_error
end
it 'cannot be created with a bad server override' do
blk = proc do
opts = {
a_channel_arg: 'an_arg',
server_override: Object.new
}
RpcServer.new(**opts)
end
expect(&blk).to raise_error
end
end
describe '#stopped?' do
before(:each) do
opts = { a_channel_arg: 'an_arg', poll_period: 1.5 }
opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1.5 }
@srv = RpcServer.new(**opts)
@srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
end
it 'starts out false' do
@ -245,28 +214,30 @@ describe GRPC::RpcServer do
describe '#running?' do
it 'starts out false' do
opts = { a_channel_arg: 'an_arg', server_override: @server }
opts = {
server_args: { a_channel_arg: 'an_arg' }
}
r = RpcServer.new(**opts)
expect(r.running?).to be(false)
end
it 'is false if run is called with no services registered', server: true do
opts = {
a_channel_arg: 'an_arg',
poll_period: 2,
server_override: @server
server_args: { a_channel_arg: 'an_arg' },
poll_period: 2
}
r = RpcServer.new(**opts)
r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
expect { r.run }.to raise_error(RuntimeError)
end
it 'is true after run is called with a registered service' do
opts = {
a_channel_arg: 'an_arg',
poll_period: 2.5,
server_override: @server
server_args: { a_channel_arg: 'an_arg' },
poll_period: 2.5
}
r = RpcServer.new(**opts)
r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
r.handle(EchoService)
t = Thread.new { r.run }
r.wait_till_running
@ -278,8 +249,9 @@ describe GRPC::RpcServer do
describe '#handle' do
before(:each) do
@opts = { a_channel_arg: 'an_arg', poll_period: 1 }
@opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1 }
@srv = RpcServer.new(**@opts)
@srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
end
it 'raises if #run has already been called' do
@ -322,11 +294,13 @@ describe GRPC::RpcServer do
context 'with no connect_metadata' do
before(:each) do
server_opts = {
server_override: @server,
completion_queue_override: @server_queue,
poll_period: 1
}
@srv = RpcServer.new(**server_opts)
server_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
@host = "localhost:#{server_port}"
@ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure)
end
it 'should return NOT_FOUND status on unknown methods', server: true do
@ -383,7 +357,8 @@ describe GRPC::RpcServer do
@srv.wait_till_running
req = EchoMsg.new
stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
expect(stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }))
.to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
check_md(wanted_md, service.received_md)
@srv.stop
@ -397,8 +372,11 @@ describe GRPC::RpcServer do
@srv.wait_till_running
req = EchoMsg.new
stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
timeout = service.delay + 1.0 # wait for long enough
resp = stub.an_rpc(req, timeout: timeout, k1: 'v1', k2: 'v2')
timeout = service.delay + 1.0
deadline = GRPC::Core::TimeConsts.from_relative_time(timeout)
resp = stub.an_rpc(req,
deadline: deadline,
metadata: { k1: 'v1', k2: 'v2' })
expect(resp).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
check_md(wanted_md, service.received_md)
@ -413,7 +391,7 @@ describe GRPC::RpcServer do
@srv.wait_till_running
req = EchoMsg.new
stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true)
Thread.new do # cancel the call
sleep 0.1
op.cancel
@ -443,8 +421,7 @@ describe GRPC::RpcServer do
it 'should return RESOURCE_EXHAUSTED on too many jobs', server: true do
opts = {
a_channel_arg: 'an_arg',
server_override: @server,
server_args: { a_channel_arg: 'an_arg' },
completion_queue_override: @server_queue,
pool_size: 1,
poll_period: 1,
@ -452,6 +429,8 @@ describe GRPC::RpcServer do
}
alt_srv = RpcServer.new(**opts)
alt_srv.handle(SlowService)
alt_port = alt_srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
alt_host = "0.0.0.0:#{alt_port}"
t = Thread.new { alt_srv.run }
alt_srv.wait_till_running
req = EchoMsg.new
@ -460,7 +439,7 @@ describe GRPC::RpcServer do
one_failed_as_unavailable = false
n.times do
threads << Thread.new do
stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
stub = SlowStub.new(alt_host, :this_channel_is_insecure)
begin
stub.an_rpc(req)
rescue GRPC::BadStatus => e
@ -487,12 +466,13 @@ describe GRPC::RpcServer do
end
before(:each) do
server_opts = {
server_override: @server,
completion_queue_override: @server_queue,
poll_period: 1,
connect_md_proc: test_md_proc
}
@srv = RpcServer.new(**server_opts)
alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
@alt_host = "0.0.0.0:#{alt_port}"
end
it 'should send connect metadata to the client', server: true do
@ -501,8 +481,8 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
stub = EchoStub.new(@alt_host, :this_channel_is_insecure)
op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true)
expect(op.metadata).to be nil
expect(op.execute).to be_a(EchoMsg)
wanted_md = {
@ -522,11 +502,12 @@ describe GRPC::RpcServer do
context 'with trailing metadata' do
before(:each) do
server_opts = {
server_override: @server,
completion_queue_override: @server_queue,
poll_period: 1
}
@srv = RpcServer.new(**server_opts)
alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
@alt_host = "0.0.0.0:#{alt_port}"
end
it 'should be added to BadStatus when requests fail', server: true do
@ -535,7 +516,7 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
stub = FailingStub.new(@host, :this_channel_is_insecure, **client_opts)
stub = FailingStub.new(@alt_host, :this_channel_is_insecure)
blk = proc { stub.an_rpc(req) }
# confirm it raise the expected error
@ -560,8 +541,8 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
stub = EchoStub.new(@alt_host, :this_channel_is_insecure)
op = stub.an_rpc(req, return_op: true, metadata: { k1: 'v1', k2: 'v2' })
expect(op.metadata).to be nil
expect(op.execute).to be_a(EchoMsg)
expect(op.metadata).to eq(wanted_trailers)

@ -170,17 +170,15 @@ describe Grpc::Health::Checker do
before(:each) do
@server_queue = GRPC::Core::CompletionQueue.new
server_host = '0.0.0.0:0'
@server = GRPC::Core::Server.new(@server_queue, nil)
server_port = @server.add_http2_port(server_host, :this_port_is_insecure)
@host = "localhost:#{server_port}"
@ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure)
@client_opts = { channel_override: @ch }
server_opts = {
server_override: @server,
completion_queue_override: @server_queue,
poll_period: 1
}
@srv = RpcServer.new(**server_opts)
server_port = @srv.add_http2_port(server_host, :this_port_is_insecure)
@host = "localhost:#{server_port}"
@ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure)
end
after(:each) do

Loading…
Cancel
Save