Merge pull request #82 from tbetbetbe/grpc_ruby_add_rubocop_fix_lint_style_issues

Grpc ruby add rubocop fix lint style issues
pull/89/head
Michael Lumish 10 years ago
commit 861c79b08a
  1. 10
      src/ruby/.rubocop.yml
  2. 52
      src/ruby/.rubocop_todo.yml
  3. 32
      src/ruby/Rakefile
  4. 53
      src/ruby/bin/interop/interop_client.rb
  5. 35
      src/ruby/bin/interop/interop_server.rb
  6. 25
      src/ruby/bin/math_client.rb
  7. 42
      src/ruby/bin/math_server.rb
  8. 12
      src/ruby/bin/noproto_client.rb
  9. 16
      src/ruby/bin/noproto_server.rb
  10. 6
      src/ruby/ext/grpc/extconf.rb
  11. 27
      src/ruby/grpc.gemspec
  12. 9
      src/ruby/lib/grpc/beefcake.rb
  13. 7
      src/ruby/lib/grpc/core/event.rb
  14. 14
      src/ruby/lib/grpc/core/time_consts.rb
  15. 7
      src/ruby/lib/grpc/errors.rb
  16. 97
      src/ruby/lib/grpc/generic/active_call.rb
  17. 33
      src/ruby/lib/grpc/generic/bidi_call.rb
  18. 60
      src/ruby/lib/grpc/generic/client_stub.rb
  19. 43
      src/ruby/lib/grpc/generic/rpc_desc.rb
  20. 106
      src/ruby/lib/grpc/generic/rpc_server.rb
  21. 58
      src/ruby/lib/grpc/generic/service.rb
  22. 1
      src/ruby/lib/grpc/version.rb
  23. 2
      src/ruby/spec/alloc_spec.rb
  24. 4
      src/ruby/spec/byte_buffer_spec.rb
  25. 40
      src/ruby/spec/call_spec.rb
  26. 35
      src/ruby/spec/channel_spec.rb
  27. 74
      src/ruby/spec/client_server_spec.rb
  28. 5
      src/ruby/spec/completion_queue_spec.rb
  29. 14
      src/ruby/spec/credentials_spec.rb
  30. 22
      src/ruby/spec/event_spec.rb
  31. 48
      src/ruby/spec/generic/active_call_spec.rb
  32. 127
      src/ruby/spec/generic/client_stub_spec.rb
  33. 117
      src/ruby/spec/generic/rpc_desc_spec.rb
  34. 35
      src/ruby/spec/generic/rpc_server_pool_spec.rb
  35. 139
      src/ruby/spec/generic/rpc_server_spec.rb
  36. 58
      src/ruby/spec/generic/service_spec.rb
  37. 2
      src/ruby/spec/metadata_spec.rb
  38. 4
      src/ruby/spec/port_picker.rb
  39. 13
      src/ruby/spec/server_credentials_spec.rb
  40. 48
      src/ruby/spec/server_spec.rb
  41. 4
      src/ruby/spec/time_consts_spec.rb

@ -0,0 +1,10 @@
# This is the configuration used to check the rubocop source code.
inherit_from: .rubocop_todo.yml
AllCops:
Exclude:
- 'bin/apis/**/*'
- 'bin/interop/test/**/*'
- 'bin/math.rb'
- 'bin/math_services.rb'

@ -0,0 +1,52 @@
# This configuration was generated by `rubocop --auto-gen-config`
# on 2015-01-16 02:30:04 -0800 using RuboCop version 0.28.0.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
# versions of RuboCop, may require this file to be generated again.
# Offense count: 3
# Lint/UselessAssignment:
# Enabled: false
# Offense count: 33
Metrics/AbcSize:
Max: 39
# Offense count: 3
# Configuration parameters: CountComments.
Metrics/ClassLength:
Max: 231
# Offense count: 2
Metrics/CyclomaticComplexity:
Max: 8
# Offense count: 36
# Configuration parameters: CountComments.
Metrics/MethodLength:
Max: 37
# Offense count: 8
# Configuration parameters: CountKeywordArgs.
Metrics/ParameterLists:
Max: 8
# Offense count: 2
Metrics/PerceivedComplexity:
Max: 10
# Offense count: 7
# Configuration parameters: AllowedVariables.
Style/GlobalVars:
Enabled: false
# Offense count: 1
# Configuration parameters: EnforcedStyle, MinBodyLength, SupportedStyles.
Style/Next:
Enabled: false
# Offense count: 2
# Configuration parameters: Methods.
Style/SingleLineBlockParams:
Enabled: false

@ -1,46 +1,44 @@
# -*- ruby -*-
require 'rake/extensiontask'
require 'rspec/core/rake_task'
require 'rubocop/rake_task'
desc 'Run Rubocop to check for style violations'
RuboCop::RakeTask.new
Rake::ExtensionTask.new 'grpc' do |ext|
ext.lib_dir = File.join('lib', 'grpc')
end
SPEC_SUITES = [
{ :id => :wrapper, :title => 'wrapper layer', :files => %w(spec/*.rb) },
{ :id => :idiomatic, :title => 'idiomatic layer', :dir => %w(spec/generic),
:tag => '~bidi' },
{ :id => :bidi, :title => 'bidi tests', :dir => %w(spec/generic),
:tag => 'bidi' }
{ id: :wrapper, title: 'wrapper layer', files: %w(spec/*.rb) },
{ id: :idiomatic, title: 'idiomatic layer', dir: %w(spec/generic),
tag: '~bidi' },
{ id: :bidi, title: 'bidi tests', dir: %w(spec/generic),
tag: 'bidi' }
]
desc "Run all RSpec tests"
desc 'Run all RSpec tests'
namespace :spec do
namespace :suite do
SPEC_SUITES.each do |suite|
desc "Run all specs in #{suite[:title]} spec suite"
RSpec::Core::RakeTask.new(suite[:id]) do |t|
spec_files = []
if suite[:files]
suite[:files].each { |f| spec_files += Dir[f] }
end
suite[:files].each { |f| spec_files += Dir[f] } if suite[:files]
if suite[:dirs]
suite[:dirs].each { |f| spec_files += Dir["#{f}/**/*_spec.rb"] }
end
t.pattern = spec_files
if suite[:tag]
t.rspec_opts = "--tag #{suite[:tag]}"
end
t.rspec_opts = "--tag #{suite[:tag]}" if suite[:tag]
end
end
end
end
task :default => "spec:suite:idiomatic" # this should be spec:suite:bidi
task "spec:suite:wrapper" => :compile
task "spec:suite:idiomatic" => "spec:suite:wrapper"
task "spec:suite:bidi" => "spec:suite:idiomatic"
task default: 'spec:suite:idiomatic' # this should be spec:suite:bidi
task 'spec:suite:wrapper' => :compile
task 'spec:suite:idiomatic' => 'spec:suite:wrapper'
task 'spec:suite:bidi' => 'spec:suite:idiomatic'

@ -65,7 +65,7 @@ end
# creates a Credentials from the test certificates.
def test_creds
certs = load_test_certs
creds = GRPC::Core::Credentials.new(certs[0])
GRPC::Core::Credentials.new(certs[0])
end
# creates a test stub that accesses host:port securely.
@ -73,15 +73,15 @@ def create_stub(host, port)
address = "#{host}:#{port}"
stub_opts = {
:creds => test_creds,
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com',
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com'
}
logger.info("... connecting securely to #{address}")
stub = Grpc::Testing::TestService::Stub.new(address, **stub_opts)
Grpc::Testing::TestService::Stub.new(address, **stub_opts)
end
# produces a string of null chars (\0) of length l.
def nulls(l)
raise 'requires #{l} to be +ve' if l < 0
fail 'requires #{l} to be +ve' if l < 0
[].pack('x' * l).force_encoding('utf-8')
end
@ -106,9 +106,9 @@ class PingPongPlayer
count = 0
@msg_sizes.each do |m|
req_size, resp_size = m
req = req_cls.new(:payload => Payload.new(:body => nulls(req_size)),
:response_type => COMPRESSABLE,
:response_parameters => [p_cls.new(:size => resp_size)])
req = req_cls.new(payload: Payload.new(body: nulls(req_size)),
response_type: COMPRESSABLE,
response_parameters: [p_cls.new(size: resp_size)])
yield req
resp = @queue.pop
assert_equal(PayloadType.lookup(COMPRESSABLE), resp.payload.type,
@ -148,11 +148,11 @@ class NamedTests
# ruby server
# FAILED
def large_unary
req_size, wanted_response_size = 271828, 314159
payload = Payload.new(:type => COMPRESSABLE, :body => nulls(req_size))
req = SimpleRequest.new(:response_type => COMPRESSABLE,
:response_size => wanted_response_size,
:payload => payload)
req_size, wanted_response_size = 271_828, 314_159
payload = Payload.new(type: COMPRESSABLE, body: nulls(req_size))
req = SimpleRequest.new(response_type: COMPRESSABLE,
response_size: wanted_response_size,
payload: payload)
resp = @stub.unary_call(req)
assert_equal(wanted_response_size, resp.payload.body.length,
'large_unary: payload had the wrong length')
@ -166,11 +166,11 @@ class NamedTests
# ruby server
# FAILED
def client_streaming
msg_sizes = [27182, 8, 1828, 45904]
wanted_aggregate_size = 74922
msg_sizes = [27_182, 8, 1828, 45_904]
wanted_aggregate_size = 74_922
reqs = msg_sizes.map do |x|
req = Payload.new(:body => nulls(x))
StreamingInputCallRequest.new(:payload => req)
req = Payload.new(body: nulls(x))
StreamingInputCallRequest.new(payload: req)
end
resp = @stub.streaming_input_call(reqs)
assert_equal(wanted_aggregate_size, resp.aggregated_payload_size,
@ -183,10 +183,10 @@ class NamedTests
# ruby server
# FAILED
def server_streaming
msg_sizes = [31415, 9, 2653, 58979]
response_spec = msg_sizes.map { |s| ResponseParameters.new(:size => s) }
req = StreamingOutputCallRequest.new(:response_type => COMPRESSABLE,
:response_parameters => response_spec)
msg_sizes = [31_415, 9, 2653, 58_979]
response_spec = msg_sizes.map { |s| ResponseParameters.new(size: s) }
req = StreamingOutputCallRequest.new(response_type: COMPRESSABLE,
response_parameters: response_spec)
resps = @stub.streaming_output_call(req)
resps.each_with_index do |r, i|
assert i < msg_sizes.length, 'too many responses'
@ -203,13 +203,12 @@ class NamedTests
# ruby server
# FAILED
def ping_pong
msg_sizes = [[27182, 31415], [8, 9], [1828, 2653], [45904, 58979]]
msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]]
ppp = PingPongPlayer.new(msg_sizes)
resps = @stub.full_duplex_call(ppp.each_item)
resps.each { |r| ppp.queue.push(r) }
p 'OK: ping_pong'
end
end
# validates the the command line options, returning them as a Hash.
@ -217,7 +216,7 @@ def parse_options
options = {
'server_host' => nil,
'server_port' => nil,
'test_case' => nil,
'test_case' => nil
}
OptionParser.new do |opts|
opts.banner = 'Usage: --server_host <server_host> --server_port server_port'
@ -228,17 +227,17 @@ def parse_options
options['server_port'] = v
end
# instance_methods(false) gives only the methods defined in that class
test_cases = NamedTests.instance_methods(false).map { |t| t.to_s }
test_cases = NamedTests.instance_methods(false).map(&:to_s)
test_case_list = test_cases.join(',')
opts.on("--test_case CODE", test_cases, {}, "select a test_case",
opts.on('--test_case CODE', test_cases, {}, 'select a test_case',
" (#{test_case_list})") do |v|
options['test_case'] = v
end
end.parse!
['server_host', 'server_port', 'test_case'].each do |arg|
%w(server_host, server_port, test_case).each do |arg|
if options[arg].nil?
raise OptionParser::MissingArgument.new("please specify --#{arg}")
fail(OptionParser::MissingArgument, "please specify --#{arg}")
end
end
options

@ -62,12 +62,12 @@ end
# creates a ServerCredentials from the test certificates.
def test_server_creds
certs = load_test_certs
server_creds = GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2])
GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2])
end
# produces a string of null chars (\0) of length l.
def nulls(l)
raise 'requires #{l} to be +ve' if l < 0
fail 'requires #{l} to be +ve' if l < 0
[].pack('x' * l).force_encoding('utf-8')
end
@ -86,7 +86,7 @@ class EnumeratorQueue
loop do
r = @q.pop
break if r.equal?(@sentinel)
raise r if r.is_a?Exception
fail r if r.is_a? Exception
yield r
end
end
@ -98,27 +98,27 @@ class TestTarget < Grpc::Testing::TestService::Service
include Grpc::Testing
include Grpc::Testing::PayloadType
def empty_call(empty, call)
def empty_call(_empty, _call)
Empty.new
end
def unary_call(simple_req, call)
def unary_call(simple_req, _call)
req_size = simple_req.response_size
SimpleResponse.new(:payload => Payload.new(:type => COMPRESSABLE,
:body => nulls(req_size)))
SimpleResponse.new(payload: Payload.new(type: COMPRESSABLE,
body: nulls(req_size)))
end
def streaming_input_call(call)
sizes = call.each_remote_read.map { |x| x.payload.body.length }
sum = sizes.inject { |sum,x| sum + x }
StreamingInputCallResponse.new(:aggregated_payload_size => sum)
sum = sizes.inject { |s, x| s + x }
StreamingInputCallResponse.new(aggregated_payload_size: sum)
end
def streaming_output_call(req, call)
def streaming_output_call(req, _call)
cls = StreamingOutputCallResponse
req.response_parameters.map do |p|
cls.new(:payload => Payload.new(:type => req.response_type,
:body => nulls(p.size)))
cls.new(payload: Payload.new(type: req.response_type,
body: nulls(p.size)))
end
end
@ -126,13 +126,13 @@ class TestTarget < Grpc::Testing::TestService::Service
# reqs is a lazy Enumerator of the requests sent by the client.
q = EnumeratorQueue.new(self)
cls = StreamingOutputCallResponse
t = Thread.new do
Thread.new do
begin
reqs.each do |req|
logger.info("read #{req.inspect}")
resp_size = req.response_parameters[0].size
resp = cls.new(:payload => Payload.new(:type => req.response_type,
:body => nulls(resp_size)))
resp = cls.new(payload: Payload.new(type: req.response_type,
body: nulls(resp_size)))
q.push(resp)
end
logger.info('finished reads')
@ -149,13 +149,12 @@ class TestTarget < Grpc::Testing::TestService::Service
# currently used in any tests
full_duplex_call(reqs)
end
end
# validates the the command line options, returning them as a Hash.
def parse_options
options = {
'port' => nil,
'port' => nil
}
OptionParser.new do |opts|
opts.banner = 'Usage: --port port'
@ -165,7 +164,7 @@ def parse_options
end.parse!
if options['port'].nil?
raise OptionParser::MissingArgument.new("please specify --port")
fail(OptionParser::MissingArgument, 'please specify --port')
end
options
end

@ -29,7 +29,6 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Sample app that accesses a Calc service running on a Ruby gRPC server and
# helps validate RpcServer as a gRPC server using proto2 serialization.
#
@ -49,9 +48,9 @@ include GRPC::Core::TimeConsts
def do_div(stub)
logger.info('request_response')
logger.info('----------------')
req = Math::DivArgs.new(:dividend => 7, :divisor => 3)
req = Math::DivArgs.new(dividend: 7, divisor: 3)
logger.info("div(7/3): req=#{req.inspect}")
resp = stub.div(req, deadline=INFINITE_FUTURE)
resp = stub.div(req, INFINITE_FUTURE)
logger.info("Answer: #{resp.inspect}")
logger.info('----------------')
end
@ -60,7 +59,7 @@ def do_sum(stub)
# to make client streaming requests, pass an enumerable of the inputs
logger.info('client_streamer')
logger.info('---------------')
reqs = [1, 2, 3, 4, 5].map { |x| Math::Num.new(:num => x) }
reqs = [1, 2, 3, 4, 5].map { |x| Math::Num.new(num: x) }
logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}")
resp = stub.sum(reqs) # reqs.is_a?(Enumerable)
logger.info("Answer: #{resp.inspect}")
@ -70,9 +69,9 @@ end
def do_fib(stub)
logger.info('server_streamer')
logger.info('----------------')
req = Math::FibArgs.new(:limit => 11)
req = Math::FibArgs.new(limit: 11)
logger.info("fib(11): req=#{req.inspect}")
resp = stub.fib(req, deadline=INFINITE_FUTURE)
resp = stub.fib(req, INFINITE_FUTURE)
resp.each do |r|
logger.info("Answer: #{r.inspect}")
end
@ -83,11 +82,11 @@ def do_div_many(stub)
logger.info('bidi_streamer')
logger.info('-------------')
reqs = []
reqs << Math::DivArgs.new(:dividend => 7, :divisor => 3)
reqs << Math::DivArgs.new(:dividend => 5, :divisor => 2)
reqs << Math::DivArgs.new(:dividend => 7, :divisor => 2)
reqs << Math::DivArgs.new(dividend: 7, divisor: 3)
reqs << Math::Di5AvArgs.new(dividend: 5, divisor: 2)
reqs << Math::DivArgs.new(dividend: 7, divisor: 2)
logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}")
resp = stub.div_many(reqs, deadline=10)
resp = stub.div_many(reqs, 10)
resp.each do |r|
logger.info("Answer: #{r.inspect}")
end
@ -103,7 +102,7 @@ end
def test_creds
certs = load_test_certs
creds = GRPC::Core::Credentials.new(certs[0])
GRPC::Core::Credentials.new(certs[0])
end
def main
@ -117,7 +116,7 @@ def main
options['host'] = v
end
opts.on('-s', '--secure', 'access using test creds') do |v|
options['secure'] = true
options['secure'] = v
end
end.parse!
@ -128,7 +127,7 @@ def main
if options['secure']
stub_opts = {
:creds => test_creds,
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com',
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com'
}
p stub_opts
p options['host']

@ -46,9 +46,8 @@ require 'optparse'
# Holds state for a fibonacci series
class Fibber
def initialize(limit)
raise "bad limit: got #{limit}, want limit > 0" if limit < 1
fail "bad limit: got #{limit}, want limit > 0" if limit < 1
@limit = limit
end
@ -57,14 +56,14 @@ class Fibber
idx, current, previous = 0, 1, 1
until idx == @limit
if idx == 0 || idx == 1
yield Math::Num.new(:num => 1)
yield Math::Num.new(num: 1)
idx += 1
next
end
tmp = current
current = previous + current
previous = tmp
yield Math::Num.new(:num => current)
yield Math::Num.new(num: current)
idx += 1
end
end
@ -85,43 +84,41 @@ class EnumeratorQueue
loop do
r = @q.pop
break if r.equal?(@sentinel)
raise r if r.is_a?Exception
fail r if r.is_a? Exception
yield r
end
end
end
# The Math::Math:: module occurs because the service has the same name as its
# package. That practice should be avoided by defining real services.
class Calculator < Math::Math::Service
def div(div_args, call)
def div(div_args, _call)
if div_args.divisor == 0
# To send non-OK status handlers raise a StatusError with the code and
# and detail they want sent as a Status.
raise GRPC::StatusError.new(GRPC::Status::INVALID_ARGUMENT,
fail GRPC::StatusError.new(GRPC::Status::INVALID_ARGUMENT,
'divisor cannot be 0')
end
Math::DivReply.new(:quotient => div_args.dividend/div_args.divisor,
:remainder => div_args.dividend % div_args.divisor)
Math::DivReply.new(quotient: div_args.dividend / div_args.divisor,
remainder: div_args.dividend % div_args.divisor)
end
def sum(call)
# the requests are accesible as the Enumerator call#each_request
nums = call.each_remote_read.collect { |x| x.num }
sum = nums.inject { |sum,x| sum + x }
Math::Num.new(:num => sum)
nums = call.each_remote_read.collect(&:num)
sum = nums.inject { |s, x| s + x }
Math::Num.new(num: sum)
end
def fib(fib_args, call)
def fib(fib_args, _call)
if fib_args.limit < 1
raise StatusError.new(Status::INVALID_ARGUMENT, 'limit must be >= 0')
fail StatusError.new(Status::INVALID_ARGUMENT, 'limit must be >= 0')
end
# return an Enumerator of Nums
Fibber.new(fib_args.limit).generator()
Fibber.new(fib_args.limit).generator
# just return the generator, GRPC::GenericServer sends each actual response
end
@ -132,10 +129,10 @@ class Calculator < Math::Math::Service
begin
requests.each do |req|
logger.info("read #{req.inspect}")
resp = Math::DivReply.new(:quotient => req.dividend/req.divisor,
:remainder => req.dividend % req.divisor)
resp = Math::DivReply.new(quotient: req.dividend / req.divisor,
remainder: req.dividend % req.divisor)
q.push(resp)
Thread::pass # let the internal Bidi threads run
Thread.pass # let the internal Bidi threads run
end
logger.info('finished reads')
q.push(self)
@ -147,7 +144,6 @@ class Calculator < Math::Math::Service
t.priority = -2 # hint that the div_many thread should not be favoured
q.each_item
end
end
def load_test_certs
@ -159,7 +155,7 @@ end
def test_server_creds
certs = load_test_certs
server_creds = GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2])
GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2])
end
def main
@ -173,7 +169,7 @@ def main
options['host'] = v
end
opts.on('-s', '--secure', 'access using test creds') do |v|
options['secure'] = true
options['secure'] = v
end
end.parse!

@ -40,16 +40,18 @@ $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
require 'grpc'
require 'optparse'
# a simple non-protobuf message class.
class NoProtoMsg
def self.marshal(o)
def self.marshal(_o)
''
end
def self.unmarshal(o)
def self.unmarshal(_o)
NoProtoMsg.new
end
end
# service the uses the non-protobuf message class.
class NoProtoService
include GRPC::GenericService
rpc :AnRPC, NoProtoMsg, NoProtoMsg
@ -66,7 +68,7 @@ end
def test_creds
certs = load_test_certs
creds = GRPC::Core::Credentials.new(certs[0])
GRPC::Core::Credentials.new(certs[0])
end
def main
@ -80,14 +82,14 @@ def main
options['host'] = v
end
opts.on('-s', '--secure', 'access using test creds') do |v|
options['secure'] = true
options['secure'] = v
end
end.parse!
if options['secure']
stub_opts = {
:creds => test_creds,
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com',
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com'
}
p stub_opts
p options['host']

@ -40,26 +40,29 @@ $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
require 'grpc'
require 'optparse'
# a simple non-protobuf message class.
class NoProtoMsg
def self.marshal(o)
def self.marshal(_o)
''
end
def self.unmarshal(o)
def self.unmarshal(_o)
NoProtoMsg.new
end
end
# service the uses the non-protobuf message class.
class NoProtoService
include GRPC::GenericService
rpc :AnRPC, NoProtoMsg, NoProtoMsg
end
# an implementation of the non-protobuf service.
class NoProto < NoProtoService
def initialize(default_var='ignored')
def initialize(_default_var = 'ignored')
end
def an_rpc(req, call)
def an_rpc(req, _call)
logger.info('echo service received a request')
req
end
@ -74,7 +77,7 @@ end
def test_server_creds
certs = load_test_certs
server_creds = GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2])
GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2])
end
def main
@ -88,7 +91,7 @@ def main
options['host'] = v
end
opts.on('-s', '--secure', 'access using test creds') do |v|
options['secure'] = true
options['secure'] = v
end
end.parse!
@ -106,5 +109,4 @@ def main
s.run
end
main

@ -40,7 +40,7 @@ HEADER_DIRS = [
'/usr/local/include',
# Check the ruby install locations
INCLUDEDIR,
INCLUDEDIR
]
LIB_DIRS = [
@ -51,11 +51,11 @@ LIB_DIRS = [
'/usr/local/lib',
# Check the ruby install locations
LIBDIR,
LIBDIR
]
def crash(msg)
print(" extconf failure: %s\n" % msg)
print(" extconf failure: #{msg}\n")
exit 1
end

@ -1,31 +1,34 @@
# encoding: utf-8
$:.push File.expand_path("../lib", __FILE__)
$LOAD_PATH.push File.expand_path('../lib', __FILE__)
require 'grpc/version'
Gem::Specification.new do |s|
s.name = "grpc"
s.name = 'grpc'
s.version = Google::RPC::VERSION
s.authors = ["One Platform Team"]
s.email = "stubby-team@google.com"
s.homepage = "http://go/grpc"
s.authors = ['One Platform Team']
s.email = 'stubby-team@google.com'
s.homepage = 'http://go/grpc'
s.summary = 'Google RPC system in Ruby'
s.description = 'Send RPCs from Ruby'
s.files = `git ls-files`.split("\n")
s.test_files = `git ls-files -- spec/*`.split("\n")
s.executables = `git ls-files -- examples/*.rb`.split("\n").map{ |f| File.basename(f) }
s.executables = `git ls-files -- bin/*.rb`.split("\n").map do |f|
File.basename(f)
end
s.require_paths = ['lib']
s.platform = Gem::Platform::RUBY
s.add_dependency 'xray'
s.add_dependency 'logging', '~> 1.8'
s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1'
s.add_dependency 'minitest', '~> 5.4' # not a dev dependency, used by the interop tests
s.add_dependency 'google-protobuf', '~> 3.0.0alpha'
s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests
s.add_development_dependency "bundler", "~> 1.7"
s.add_development_dependency "rake", "~> 10.0"
s.add_development_dependency 'bundler', '~> 1.7'
s.add_development_dependency 'rake', '~> 10.0'
s.add_development_dependency 'rake-compiler', '~> 0'
s.add_development_dependency 'rspec', "~> 3.0"
s.add_development_dependency 'rubocop', '~> 0.28.0'
s.add_development_dependency 'rspec', '~> 3.0'
s.extensions = %w[ext/grpc/extconf.rb]
s.extensions = %w(ext/grpc/extconf.rb)
end

@ -29,25 +29,21 @@
require 'beefcake'
module Beefcake
# Re-open the beefcake message module to add a static encode
#
# This is a temporary measure while beefcake is used as the default proto
# library for developing grpc ruby. Once that changes to the official proto
# library this can be removed. It's necessary to allow the update the service
# module to assume a static encode method.
#
# TODO(temiola): remove me, once official code generation is available in protoc
module Beefcake
# TODO(temiola): remove this.
module Message
# additional mixin module that adds static encode method when include
module StaticEncode
# encodes o with its instance#encode method
def encode(o)
o.encode
end
end
# extend self.included in Beefcake::Message to include StaticEncode
@ -57,6 +53,5 @@ module Beefcake
o.extend Decode
o.send(:include, Encode)
end
end
end

@ -30,9 +30,12 @@
module Google
module RPC
module Core
class Event # Add an inspect method to C-defined Event class.
# Event is a class defined in the c extension
#
# Here, we add an inspect method.
class Event
def inspect
'<%s: type:%s, tag:%s result:%s>' % [self.class, type, tag, result]
"<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>"
end
end
end

@ -32,9 +32,10 @@ require 'grpc'
module Google
module RPC
module Core
module TimeConsts # re-opens a module in the C extension.
# TimeConsts is a module from the C extension.
#
# Here it's re-opened to add a utility func.
module TimeConsts
# Converts a time delta to an absolute deadline.
#
# Assumes timeish is a relative time, and converts its to an absolute,
@ -53,19 +54,18 @@ module Google
elsif timeish.nil?
TimeConsts::ZERO
elsif !timeish.is_a? Numeric
raise TypeError('Cannot make an absolute deadline from %s',
timeish.inspect)
fail(TypeError,
"Cannot make an absolute deadline from #{timeish.inspect}")
elsif timeish < 0
TimeConsts::INFINITE_FUTURE
elsif timeish == 0
TimeConsts::ZERO
else !timeish.nil?
else
Time.now + timeish
end
end
module_function :from_relative_time
end
end
end

@ -30,9 +30,8 @@
require 'grpc'
module Google
# Google::RPC contains the General RPC module.
module RPC
# OutOfTime is an exception class that indicates that an RPC exceeded its
# deadline.
OutOfTime = Class.new(StandardError)
@ -42,7 +41,6 @@ module Google
# error should be returned to the other end of a GRPC connection; when
# caught it means that this end received a status error.
class BadStatus < StandardError
attr_reader :code, :details
# @param code [Numeric] the status code
@ -60,9 +58,6 @@ module Google
def to_status
Status.new(code, details)
end
end
end
end

@ -31,13 +31,14 @@ require 'forwardable'
require 'grpc/generic/bidi_call'
def assert_event_type(ev, want)
raise OutOfTime if ev.nil?
fail OutOfTime if ev.nil?
got = ev.type
raise 'Unexpected rpc event: got %s, want %s' % [got, want] unless got == want
fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want
end
module Google::RPC
module Google
# Google::RPC contains the General RPC module.
module RPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
class ActiveCall
@ -60,19 +61,20 @@ module Google::RPC
# @param call [Call] a call on which to start and invocation
# @param q [CompletionQueue] used to wait for INVOKE_ACCEPTED
# @param deadline [Fixnum,TimeSpec] the deadline for INVOKE_ACCEPTED
def self.client_start_invoke(call, q, deadline, **kw)
raise ArgumentError.new('not a call') unless call.is_a?Core::Call
if !q.is_a?Core::CompletionQueue
raise ArgumentError.new('not a CompletionQueue')
def self.client_start_invoke(call, q, _deadline, **kw)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
call.add_metadata(kw) if kw.length > 0
invoke_accepted, client_metadata_read = Object.new, Object.new
finished_tag = Object.new
call.start_invoke(q, invoke_accepted, client_metadata_read, finished_tag)
call.start_invoke(q, invoke_accepted, client_metadata_read,
finished_tag)
# wait for the invocation to be accepted
ev = q.pluck(invoke_accepted, INFINITE_FUTURE)
raise OutOfTime if ev.nil?
fail OutOfTime if ev.nil?
ev.close
[finished_tag, client_metadata_read]
@ -102,12 +104,12 @@ module Google::RPC
# if the call has begun
# @param read_metadata_tag [Object] the object used as the call's finish
# tag, if the call has begun
# @param started [true|false] (default true) indicates if the call has begun
# @param started [true|false] indicates if the call has begun
def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
read_metadata_tag: nil, started: true)
raise ArgumentError.new('not a call') unless call.is_a?Core::Call
if !q.is_a?Core::CompletionQueue
raise ArgumentError.new('not a CompletionQueue')
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@call = call
@cq = q
@ -147,8 +149,8 @@ module Google::RPC
#
# - the call may terminate with a BadStatus exception, with code=CANCELLED
# - the call may terminate with OK Status, and return a response
# - the call may terminate with a different BadStatus exception if that was
# happening
# - the call may terminate with a different BadStatus exception if that
# was happening
def cancel
@call.cancel
end
@ -199,12 +201,11 @@ module Google::RPC
ev.close
end
if assert_finished
return unless assert_finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
raise "unexpected event: #{ev.inspect}" if ev.nil?
fail 'unexpected nil event' if ev.nil?
ev.close
return @call.status
end
@call.status
end
# finished waits until the call is completed.
@ -214,7 +215,7 @@ module Google::RPC
def finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
begin
raise "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
if @call.metadata.nil?
@call.metadata = ev.result.metadata
else
@ -222,7 +223,7 @@ module Google::RPC
end
if ev.result.code != Core::StatusCodes::OK
raise BadStatus.new(ev.result.code, ev.result.details)
fail BadStatus.new(ev.result.code, ev.result.details)
end
res = ev.result
ensure
@ -241,7 +242,7 @@ module Google::RPC
# marshalled.
def remote_send(req, marshalled = false)
assert_queue_is_ready
logger.debug("sending payload #{req.inspect}, marshalled? #{marshalled}")
logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
if marshalled
payload = req
else
@ -275,9 +276,7 @@ module Google::RPC
ev.close
end
logger.debug("Status sent: #{code}:'#{details}'")
if assert_finished
return finished
end
return finished if assert_finished
nil
end
@ -285,7 +284,8 @@ module Google::RPC
#
# It blocks until the remote endpoint sends a READ or FINISHED event. On
# a READ, it returns the response after unmarshalling it. On
# FINISHED, it returns nil if the status is OK, otherwise raising BadStatus
# FINISHED, it returns nil if the status is OK, otherwise raising
# BadStatus
def remote_read
if @call.metadata.nil? && !@read_metadata_tag.nil?
ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
@ -299,8 +299,8 @@ module Google::RPC
begin
assert_event_type(ev, READ)
logger.debug("received req: #{ev.result.inspect}")
if !ev.result.nil?
logger.debug("received req.to_s: #{ev.result.to_s}")
unless ev.result.nil?
logger.debug("received req.to_s: #{ev.result}")
res = @unmarshal.call(ev.result.to_s)
logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
@ -330,10 +330,10 @@ module Google::RPC
#
# @return [Enumerator] if no block was given
def each_remote_read
return enum_for(:each_remote_read) if !block_given?
return enum_for(:each_remote_read) unless block_given?
loop do
resp = remote_read()
break if resp.is_a?Struct::Status # is an OK status, bad statii raise
resp = remote_read
break if resp.is_a? Struct::Status # is an OK status
break if resp.nil? # the last response was received
yield resp
end
@ -360,10 +360,10 @@ module Google::RPC
#
# @return [Enumerator] if no block was given
def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) if !block_given?
return enum_for(:each_remote_read_then_finish) unless block_given?
loop do
resp = remote_read
break if resp.is_a?Struct::Status # is an OK status, bad statii raise
break if resp.is_a? Struct::Status # is an OK status
if resp.nil? # the last response was received, but not finished yet
finished
break
@ -386,9 +386,7 @@ module Google::RPC
remote_send(req)
writes_done(false)
response = remote_read
if !response.is_a?(Struct::Status) # finish if status not yet received
finished
end
finished unless response.is_a? Struct::Status
response
end
@ -411,9 +409,7 @@ module Google::RPC
requests.each { |r| remote_send(r) }
writes_done(false)
response = remote_read
if !response.is_a?(Struct::Status) # finish if status not yet received
finished
end
finished unless response.is_a? Struct::Status
response
end
@ -439,7 +435,7 @@ module Google::RPC
remote_send(req)
writes_done(false)
replies = enum_for(:each_remote_read_then_finish)
return replies if !block_given?
return replies unless block_given?
replies.each { |r| yield r }
end
@ -451,10 +447,11 @@ module Google::RPC
#
# == requests ==
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's #each
# enumeration protocol. In the simplest case, requests will be an array of
# marshallable objects; in typical case it will be an Enumerable that
# allows dynamic construction of the marshallable objects.
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an
# Enumerable that allows dynamic construction of the marshallable
# objects.
#
# == responses ==
#
@ -526,8 +523,8 @@ module Google::RPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute, :metadata,
:status)
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
:metadata, :status)
# confirms that no events are enqueued, and that the queue is not
# shutdown.
@ -535,15 +532,15 @@ module Google::RPC
ev = nil
begin
ev = @cq.pluck(self, ZERO)
raise "unexpected event #{ev.inspect}" unless ev.nil?
fail "unexpected event #{ev.inspect}" unless ev.nil?
rescue OutOfTime
logging.debug('timed out waiting for next event')
# expected, nothing should be on the queue and the deadline was ZERO,
# except things using another tag
ensure
ev.close unless ev.nil?
end
end
end
end
end

@ -31,13 +31,14 @@ require 'forwardable'
require 'grpc/grpc'
def assert_event_type(ev, want)
raise OutOfTime if ev.nil?
fail OutOfTime if ev.nil?
got = ev.type
raise 'Unexpected rpc event: got %s, want %s' % [got, want] unless got == want
fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want
end
module Google::RPC
module Google
# Google::RPC contains the General RPC module.
module RPC
# The BiDiCall class orchestrates exection of a BiDi stream on a client or
# server.
class BidiCall
@ -49,8 +50,9 @@ module Google::RPC
#
# BidiCall should only be created after a call is accepted. That means
# different things on a client and a server. On the client, the call is
# accepted after call.start_invoke followed by receipt of the corresponding
# INVOKE_ACCEPTED. On the server, this is after call.accept.
# accepted after call.start_invoke followed by receipt of the
# corresponding INVOKE_ACCEPTED. On the server, this is after
# call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
@ -66,9 +68,9 @@ module Google::RPC
# @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag,
def initialize(call, q, marshal, unmarshal, deadline, finished_tag)
raise ArgumentError.new('not a call') unless call.is_a?Core::Call
if !q.is_a?Core::CompletionQueue
raise ArgumentError.new('not a CompletionQueue')
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@call = call
@cq = q
@ -110,7 +112,7 @@ module Google::RPC
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
enq_th = start_write_loop(replys, is_client: false)
loop_th = start_read_loop()
loop_th = start_read_loop
loop_th.join
enq_th.join
end
@ -125,7 +127,7 @@ module Google::RPC
# - messages are added to the readq by #read_loop
# - iteration ends when the instance itself is added
def each_queued_msg
return enum_for(:each_queued_msg) if !block_given?
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
logger.debug("each_queued_msg: msg##{count}")
@ -180,8 +182,8 @@ module Google::RPC
end
# starts the read loop
def start_read_loop()
t = Thread.new do
def start_read_loop
Thread.new do
begin
read_tag = Object.new
count = 0
@ -203,7 +205,7 @@ module Google::RPC
end
# push the latest read onto the queue and continue reading
logger.debug("received req.to_s: #{ev.result.to_s}")
logger.debug("received req: #{ev.result}")
res = @unmarshal.call(ev.result.to_s)
@readq.push(res)
ensure
@ -218,7 +220,6 @@ module Google::RPC
end
end
end
end
end
end

@ -30,8 +30,9 @@
require 'grpc/generic/active_call'
require 'xray/thread_dump_signal_handler'
module Google::RPC
module Google
# Google::RPC contains the General RPC module.
module RPC
# ClientStub represents an endpoint used to send requests to GRPC servers.
class ClientStub
include Core::StatusCodes
@ -53,9 +54,9 @@ module Google::RPC
# channel:
#
# - :channel_override
# when present, this must be a pre-created GRPC::Channel. If it's present
# the host and arbitrary keyword arg areignored, and the RPC connection uses
# this channel.
# when present, this must be a pre-created GRPC::Channel. If it's
# present the host and arbitrary keyword arg areignored, and the RPC
# connection uses this channel.
#
# - :deadline
# when present, this is the default deadline used for calls
@ -69,7 +70,7 @@ module Google::RPC
# @param q [Core::CompletionQueue] used to wait for events
# @param channel_override [Core::Channel] a pre-created channel
# @param deadline [Number] the default deadline to use in requests
# @param creds [Core::Credentials] secures and/or authenticates the channel
# @param creds [Core::Credentials] the channel
# @param update_metadata a func that updates metadata as described above
# @param kw [KeywordArgs]the channel arguments
def initialize(host, q,
@ -78,33 +79,34 @@ module Google::RPC
creds: nil,
update_metadata: nil,
**kw)
if !q.is_a?Core::CompletionQueue
raise ArgumentError.new('not a CompletionQueue')
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@queue = q
# set the channel instance
if !channel_override.nil?
ch = channel_override
raise ArgumentError.new('not a Channel') unless ch.is_a?(Core::Channel)
elsif creds.nil?
fail(ArgumentError, 'not a Channel') unless ch.is_a? Core::Channel
else
if creds.nil?
ch = Core::Channel.new(host, kw)
elsif !creds.is_a?(Core::Credentials)
raise ArgumentError.new('not a Credentials')
fail(ArgumentError, 'not a Credentials')
else
ch = Core::Channel.new(host, kw, creds)
end
end
@ch = ch
@update_metadata = nil
if !update_metadata.nil?
if !update_metadata.is_a?(Proc)
raise ArgumentError.new('update_metadata is not a Proc')
unless update_metadata.nil?
unless update_metadata.is_a? Proc
fail(ArgumentError, 'update_metadata is not a Proc')
end
@update_metadata = update_metadata
end
@host = host
@deadline = deadline
end
@ -144,7 +146,7 @@ module Google::RPC
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] (optional) the max completion time in seconds
# @param return_op [true|false] (default false) return an Operation if true
# @param return_op [true|false] return an Operation if true
# @return [Object] the response received from the server
def request_response(method, req, marshal, unmarshal, deadline = nil,
return_op: false, **kw)
@ -201,7 +203,7 @@ module Google::RPC
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] the max completion time in seconds
# @param return_op [true|false] (default false) return an Operation if true
# @param return_op [true|false] return an Operation if true
# @return [Object|Operation] the response received from the server
def client_streamer(method, requests, marshal, unmarshal, deadline = nil,
return_op: false, **kw)
@ -234,8 +236,8 @@ module Google::RPC
# * the request is sent only when GRPC core's flow control allows it to
# be sent.
#
# * the request will not complete until the server sends the final response
# followed by a status message.
# * the request will not complete until the server sends the final
# response followed by a status message.
#
# == Errors ==
# An RuntimeError is raised if
@ -266,7 +268,7 @@ module Google::RPC
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] the max completion time in seconds
# @param return_op [true|false] (default false) return an Operation if true
# @param return_op [true|false]return an Operation if true
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above
def server_streamer(method, req, marshal, unmarshal, deadline = nil,
@ -292,10 +294,11 @@ module Google::RPC
#
# == requests ==
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's #each
# enumeration protocol. In the simplest case, requests will be an array of
# marshallable objects; in typical case it will be an Enumerable that
# allows dynamic construction of the marshallable objects.
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an
# Enumerable that allows dynamic construction of the marshallable
# objects.
#
# == responses ==
#
@ -362,7 +365,8 @@ module Google::RPC
#
# if return_op is true, the function returns an Operation whose #execute
# method runs the Bidi call. Again, Operation#execute either calls a
# given block with each response or returns an Enumerator of the responses.
# given block with each response or returns an Enumerator of the
# responses.
#
# @param method [String] the RPC method to call on the GRPC server
# @param requests [Object] an Enumerable of requests to send
@ -370,7 +374,7 @@ module Google::RPC
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] (optional) the max completion time in seconds
# @param blk [Block] when provided, is executed for each response
# @param return_op [true|false] (default false) return an Operation if true
# @param return_op [true|false] return an Operation if true
# @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil,
return_op: false, **kw, &blk)
@ -388,6 +392,7 @@ module Google::RPC
end
private
# Creates a new active stub
#
# @param ch [GRPC::Channel] the channel used to create the stub.
@ -400,7 +405,6 @@ module Google::RPC
ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
started: false)
end
end
end
end

@ -29,8 +29,8 @@
require 'grpc/grpc'
module Google::RPC
module Google
module RPC
# RpcDesc is a Descriptor of an RPC method.
class RpcDesc < Struct.new(:name, :input, :output, :marshal_method,
:unmarshal_method)
@ -47,7 +47,7 @@ module Google::RPC
# @return [Proc] { |instance| marshalled(instance) }
def marshal_proc
Proc.new { |o| o.class.method(marshal_method).call(o).to_s }
proc { |o| o.class.method(marshal_method).call(o).to_s }
end
# @param [:input, :output] target determines whether to produce the an
@ -56,27 +56,24 @@ module Google::RPC
#
# @return [Proc] An unmarshal proc { |marshalled(instance)| instance }
def unmarshal_proc(target)
raise ArgumentError if not [:input, :output].include?(target)
fail ArgumentError unless [:input, :output].include?(target)
unmarshal_class = method(target).call
if unmarshal_class.is_a?Stream
unmarshal_class = unmarshal_class.type
end
Proc.new { |o| unmarshal_class.method(unmarshal_method).call(o) }
unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream
proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
end
def run_server_method(active_call, mth)
# While a server method is running, it might be cancelled, its deadline
# might be reached, the handler could throw an unknown error, or a
# well-behaved handler could throw a StatusError.
begin
if is_request_response?
if request_response?
req = active_call.remote_read
resp = mth.call(req, active_call.single_req_view)
active_call.remote_send(resp)
elsif is_client_streamer?
elsif client_streamer?
resp = mth.call(active_call.multi_req_view)
active_call.remote_send(resp)
elsif is_server_streamer?
elsif server_streamer?
req = active_call.remote_read
replys = mth.call(req, active_call.single_req_view)
replys.each { |r| active_call.remote_send(r) }
@ -88,7 +85,7 @@ module Google::RPC
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application
# error code and detail message.
logger.debug("app error: #{active_call}, status:#{e.code}:#{e.details}")
logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
send_status(active_call, e.code, e.details)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
@ -110,33 +107,32 @@ module Google::RPC
logger.warn(e)
send_status(active_call, UNKNOWN, 'no reason given')
end
end
def assert_arity_matches(mth)
if (is_request_response? || is_server_streamer?)
if request_response? || server_streamer?
if mth.arity != 2
raise arity_error(mth, 2, "should be #{mth.name}(req, call)")
fail arity_error(mth, 2, "should be #{mth.name}(req, call)")
end
else
if mth.arity != 1
raise arity_error(mth, 1, "should be #{mth.name}(call)")
fail arity_error(mth, 1, "should be #{mth.name}(call)")
end
end
end
def is_request_response?
def request_response?
!input.is_a?(Stream) && !output.is_a?(Stream)
end
def is_client_streamer?
def client_streamer?
input.is_a?(Stream) && !output.is_a?(Stream)
end
def is_server_streamer?
def server_streamer?
!input.is_a?(Stream) && output.is_a?(Stream)
end
def is_bidi_streamer?
def bidi_streamer?
input.is_a?(Stream) && output.is_a?(Stream)
end
@ -145,15 +141,12 @@ module Google::RPC
end
def send_status(active_client, code, details)
begin
details = 'Not sure why' if details.nil?
active_client.send_status(code, details)
rescue StandardError => e
logger.warn('Could not send status %d:%s' % [code, details])
logger.warn("Could not send status #{code}:#{details}")
logger.warn(e)
end
end
end
end

@ -33,8 +33,9 @@ require 'grpc/generic/service'
require 'thread'
require 'xray/thread_dump_signal_handler'
module Google::RPC
module Google
# Google::RPC contains the General RPC module.
module RPC
# RpcServer hosts a number of services and makes them available on the
# network.
class RpcServer
@ -84,26 +85,28 @@ module Google::RPC
creds:nil,
server_override:nil,
**kw)
if !completion_queue_override.nil?
if completion_queue_override.nil?
cq = Core::CompletionQueue.new
else
cq = completion_queue_override
if !cq.is_a?(Core::CompletionQueue)
raise ArgumentError.new('not a CompletionQueue')
unless cq.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
else
cq = Core::CompletionQueue.new
end
@cq = cq
if !server_override.nil?
srv = server_override
raise ArgumentError.new('not a Server') unless srv.is_a?(Core::Server)
elsif creds.nil?
if server_override.nil?
if creds.nil?
srv = Core::Server.new(@cq, kw)
elsif !creds.is_a?(Core::ServerCredentials)
raise ArgumentError.new('not a ServerCredentials')
elsif !creds.is_a? Core::ServerCredentials
fail(ArgumentError, 'not a ServerCredentials')
else
srv = Core::Server.new(@cq, kw, creds)
end
else
srv = server_override
fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server
end
@server = srv
@pool_size = pool_size
@ -119,11 +122,10 @@ module Google::RPC
# the call has no impact if the server is already stopped, otherwise
# server's current call loop is it's last.
def stop
if @running
return unless @running
@stopped = true
@pool.stop
end
end
# determines if the server is currently running
def running?
@ -139,12 +141,10 @@ module Google::RPC
def wait_till_running(timeout = 0.1)
end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100
while Time.now < end_time
if !running?
@run_mutex.synchronize { @run_cond.wait(@run_mutex) }
end
@run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running?
sleep(sleep_period)
end
return running?
running?
end
# determines if the server is currently stopped
@ -179,14 +179,14 @@ module Google::RPC
#
# It raises RuntimeError:
# - if service is not valid service class or object
# - if it is a valid service, but the handler methods are already registered
# - its handler methods are already registered
# - if the server is already running
#
# @param service [Object|Class] a service class or object as described
# above
def handle(service)
raise 'cannot add services if the server is running' if running?
raise 'cannot add services if the server is stopped' if stopped?
fail 'cannot add services if the server is running' if running?
fail 'cannot add services if the server is stopped' if stopped?
cls = service.is_a?(Class) ? service : service.class
assert_valid_service_class(cls)
add_rpc_descs_for(service)
@ -211,7 +211,7 @@ module Google::RPC
@pool.start
@server.start
server_tag = Object.new
while !stopped?
until stopped?
@server.request_call(server_tag)
ev = @cq.pluck(server_tag, @poll_period)
next if ev.nil?
@ -221,7 +221,7 @@ module Google::RPC
next
end
c = new_active_server_call(ev.call, ev.result)
if !c.nil?
unless c.nil?
mth = ev.result.method.to_sym
ev.close
@pool.schedule(c) do |call|
@ -233,37 +233,39 @@ module Google::RPC
end
def new_active_server_call(call, new_server_rpc)
# TODO(temiola): perhaps reuse the main server completion queue here, but
# for now, create a new completion queue per call, pending best practice
# usage advice from the c core.
# TODO(temiola): perhaps reuse the main server completion queue here,
# but for now, create a new completion queue per call, pending best
# practice usage advice from the c core.
# Accept the call. This is necessary even if a status is to be sent back
# immediately
# Accept the call. This is necessary even if a status is to be sent
# back immediately
finished_tag = Object.new
call_queue = Core::CompletionQueue.new
call.metadata = new_server_rpc.metadata # store the metadata on the call
call.metadata = new_server_rpc.metadata # store the metadata
call.server_accept(call_queue, finished_tag)
call.server_end_initial_metadata()
call.server_end_initial_metadata
# Send UNAVAILABLE if there are too many unprocessed jobs
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
logger.info("waiting: #{jobs_count}, max: #{max}")
if @pool.jobs_waiting > @max_waiting_requests
logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}")
noop = Proc.new { |x| x }
noop = proc { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop,
new_server_rpc.deadline, finished_tag: finished_tag)
new_server_rpc.deadline,
finished_tag: finished_tag)
c.send_status(StatusCodes::UNAVAILABLE, '')
return nil
end
# Send NOT_FOUND if the method does not exist
mth = new_server_rpc.method.to_sym
if !rpc_descs.has_key?(mth)
unless rpc_descs.key?(mth)
logger.warn("NOT_FOUND: #{new_server_rpc}")
noop = Proc.new { |x| x }
noop = proc { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop,
new_server_rpc.deadline, finished_tag: finished_tag)
new_server_rpc.deadline,
finished_tag: finished_tag)
c.send_status(StatusCodes::NOT_FOUND, '')
return nil
end
@ -278,9 +280,8 @@ module Google::RPC
# Pool is a simple thread pool for running server requests.
class Pool
def initialize(size)
raise 'pool size must be positive' unless size > 0
fail 'pool size must be positive' unless size > 0
@jobs = Queue.new
@size = size
@stopped = false
@ -299,7 +300,7 @@ module Google::RPC
# @param args the args passed blk when it is called
# @param blk the block to call
def schedule(*args, &blk)
raise 'already stopped' if @stopped
fail 'already stopped' if @stopped
return if blk.nil?
logger.info('schedule another job')
@jobs << [blk, args]
@ -307,7 +308,7 @@ module Google::RPC
# Starts running the jobs in the thread pool.
def start
raise 'already stopped' if @stopped
fail 'already stopped' if @stopped
until @workers.size == @size.to_i
next_thread = Thread.new do
catch(:exit) do # allows { throw :exit } to kill a thread
@ -322,13 +323,11 @@ module Google::RPC
end
end
# removes the threads from workers, and signal when all the threads
# are complete.
# removes the threads from workers, and signal when all the
# threads are complete.
@stop_mutex.synchronize do
@workers.delete(Thread.current)
if @workers.size == 0
@stop_cond.signal
end
@stop_cond.signal if @workers.size == 0
end
end
@workers << next_thread
@ -344,9 +343,7 @@ module Google::RPC
# TODO(temiola): allow configuration of the keepalive period
keep_alive = 5
@stop_mutex.synchronize do
if @workers.size > 0
@stop_cond.wait(@stop_mutex, keep_alive)
end
@stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0
end
# Forcibly shutdown any threads that are still alive.
@ -365,7 +362,6 @@ module Google::RPC
logger.info('stopped, all workers are shutdown')
end
end
protected
@ -381,11 +377,11 @@ module Google::RPC
private
def assert_valid_service_class(cls)
if !cls.include?(GenericService)
raise "#{cls} should 'include GenericService'"
unless cls.include?(GenericService)
fail "#{cls} should 'include GenericService'"
end
if cls.rpc_descs.size == 0
raise "#{cls} should specify some rpc descriptions"
fail "#{cls} should specify some rpc descriptions"
end
cls.assert_rpc_descs_have_methods
end
@ -396,8 +392,8 @@ module Google::RPC
handlers = rpc_handlers
cls.rpc_descs.each_pair do |name, spec|
route = "/#{cls.service_name}/#{name}".to_sym
if specs.has_key?(route)
raise "Cannot add rpc #{route} from #{spec}, already registered"
if specs.key? route
fail "Cannot add rpc #{route} from #{spec}, already registered"
else
specs[route] = spec
if service.is_a?(Class)
@ -410,5 +406,5 @@ module Google::RPC
end
end
end
end
end

@ -32,7 +32,6 @@ require 'grpc/generic/rpc_desc'
# Extend String to add a method underscore
class String
# creates a new string that is the underscore separate version of this one.
#
# E.g,
@ -40,24 +39,23 @@ class String
# AMethod -> a_method
# AnRpc -> an_rpc
def underscore
word = self.dup
word = dup
word.gsub!(/([A-Z]+)([A-Z][a-z])/, '\1_\2')
word.gsub!(/([a-z\d])([A-Z])/, '\1_\2')
word.tr!('-', '_')
word.downcase!
word
end
end
module Google::RPC
module Google
# Google::RPC contains the General RPC module.
module RPC
# Provides behaviour used to implement schema-derived service classes.
#
# Is intended to be used to support both client and server IDL-schema-derived
# servers.
# Is intended to be used to support both client and server
# IDL-schema-derived servers.
module GenericService
# Used to indicate that a name has already been specified
class DuplicateRpcName < StandardError
def initialize(name)
@ -80,7 +78,6 @@ module Google::RPC
# #assert_rpc_descs_have_methods is used to ensure the including class
# provides methods with signatures that support all the descriptors.
module Dsl
# This configures the method names that the serializable message
# implementation uses to marshal and unmarshal messages.
#
@ -110,7 +107,7 @@ module Google::RPC
# @param input [Object] the input parameter's class
# @param output [Object] the output parameter's class
def rpc(name, input, output)
raise DuplicateRpcName, name if rpc_descs.has_key?(name)
fail(DuplicateRpcName, name) if rpc_descs.key? name
assert_can_marshal(input)
assert_can_marshal(output)
rpc_descs[name] = RpcDesc.new(name, input, output,
@ -125,7 +122,8 @@ module Google::RPC
subclass.service_name = service_name
end
# the name of the instance method used to marshal events to a byte stream.
# the name of the instance method used to marshal events to a byte
# stream.
def marshal_class_method
@marshal_class_method ||= :marshal
end
@ -136,19 +134,14 @@ module Google::RPC
end
def assert_can_marshal(cls)
if cls.is_a?RpcDesc::Stream
cls = cls.type
end
cls = cls.type if cls.is_a? RpcDesc::Stream
mth = unmarshal_class_method
if !cls.methods.include?(mth)
raise ArgumentError, "#{cls} needs #{cls}.#{mth}"
unless cls.methods.include? mth
fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
end
mth = marshal_class_method
if !cls.methods.include?(mth)
raise ArgumentError, "#{cls} needs #{cls}.#{mth}"
end
return if cls.methods.include? mth
fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
end
# @param cls [Class] the class of a serializable type
@ -169,7 +162,6 @@ module Google::RPC
descs = rpc_descs
route_prefix = service_name
Class.new(ClientStub) do
# @param host [String] the host the stub connects to
# @param kw [KeywordArgs] the channel arguments, plus any optional
# args for configuring the client's channel
@ -184,20 +176,21 @@ module Google::RPC
marshal = desc.marshal_proc
unmarshal = desc.unmarshal_proc(:output)
route = "/#{route_prefix}/#{name}"
if desc.is_request_response?
if desc.request_response?
define_method(mth_name) do |req, deadline = nil|
logger.debug("calling #{@host}:#{route}")
request_response(route, req, marshal, unmarshal, deadline)
end
elsif desc.is_client_streamer?
elsif desc.client_streamer?
define_method(mth_name) do |reqs, deadline = nil|
logger.debug("calling #{@host}:#{route}")
client_streamer(route, reqs, marshal, unmarshal, deadline)
end
elsif desc.is_server_streamer?
elsif desc.server_streamer?
define_method(mth_name) do |req, deadline = nil, &blk|
logger.debug("calling #{@host}:#{route}")
server_streamer(route, req, marshal, unmarshal, deadline, &blk)
server_streamer(route, req, marshal, unmarshal, deadline,
&blk)
end
else # is a bidi_stream
define_method(mth_name) do |reqs, deadline = nil, &blk|
@ -206,9 +199,7 @@ module Google::RPC
end
end
end
end
end
# Asserts that the appropriate methods are defined for each added rpc
@ -217,18 +208,16 @@ module Google::RPC
def assert_rpc_descs_have_methods
rpc_descs.each_pair do |m, spec|
mth_name = m.to_s.underscore.to_sym
if !self.instance_methods.include?(mth_name)
raise "#{self} does not provide instance method '#{mth_name}'"
unless instance_methods.include?(mth_name)
fail "#{self} does not provide instance method '#{mth_name}'"
end
spec.assert_arity_matches(self.instance_method(mth_name))
spec.assert_arity_matches(instance_method(mth_name))
end
end
end
def self.included(o)
o.extend(Dsl)
# Update to the use the service name including module. Proivde a default
# that can be nil e,g. when modules are declared dynamically.
return unless o.service_name.nil?
@ -243,7 +232,6 @@ module Google::RPC
end
end
end
end
end
end

@ -28,6 +28,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
module Google
# Google::RPC contains the General RPC module.
module RPC
VERSION = '0.0.1'
end

@ -30,7 +30,6 @@
require 'grpc'
describe 'Wrapped classes where .new cannot create an instance' do
describe GRPC::Core::Event do
it 'should fail .new fail with a runtime error' do
expect { GRPC::Core::Event.new }.to raise_error(TypeError)
@ -42,5 +41,4 @@ describe 'Wrapped classes where .new cannot create an instance' do
expect { GRPC::Core::Event.new }.to raise_error(TypeError)
end
end
end

@ -30,9 +30,7 @@
require 'grpc'
describe GRPC::Core::ByteBuffer do
describe '#new' do
it 'is constructed from a string' do
expect { GRPC::Core::ByteBuffer.new('#new') }.not_to raise_error
end
@ -50,7 +48,6 @@ describe GRPC::Core::ByteBuffer do
expect { GRPC::Core::ByteBuffer.new(x) }.to raise_error TypeError
end
end
end
describe '#to_s' do
@ -67,5 +64,4 @@ describe GRPC::Core::ByteBuffer do
expect(a_copy.dup.to_s).to eq('#dup')
end
end
end

@ -33,20 +33,19 @@ require 'port_picker'
include GRPC::Core::StatusCodes
describe GRPC::Core::RpcErrors do
before(:each) do
@known_types = {
:OK => 0,
:ERROR => 1,
:NOT_ON_SERVER => 2,
:NOT_ON_CLIENT => 3,
:ALREADY_ACCEPTED => 4,
:ALREADY_INVOKED => 5,
:NOT_INVOKED => 6,
:ALREADY_FINISHED => 7,
:TOO_MANY_OPERATIONS => 8,
:INVALID_FLAGS => 9,
:ErrorMessages => {
OK: 0,
ERROR: 1,
NOT_ON_SERVER: 2,
NOT_ON_CLIENT: 3,
ALREADY_ACCEPTED: 4,
ALREADY_INVOKED: 5,
NOT_INVOKED: 6,
ALREADY_FINISHED: 7,
TOO_MANY_OPERATIONS: 8,
INVALID_FLAGS: 9,
ErrorMessages: {
0 => 'ok',
1 => 'unknown error',
2 => 'not available on a server',
@ -56,7 +55,7 @@ describe GRPC::Core::RpcErrors do
6 => 'call is not yet invoked',
7 => 'call is already finished',
8 => 'outstanding read or write present',
9=>'a bad flag was given',
9 => 'a bad flag was given'
}
}
end
@ -66,11 +65,9 @@ describe GRPC::Core::RpcErrors do
syms_and_codes = m.constants.collect { |c| [c, m.const_get(c)] }
expect(Hash[syms_and_codes]).to eq(@known_types)
end
end
describe GRPC::Core::Call do
before(:each) do
@tag = Object.new
@client_queue = GRPC::Core::CompletionQueue.new
@ -88,7 +85,7 @@ describe GRPC::Core::Call do
describe '#start_read' do
it 'should fail if called immediately' do
blk = Proc.new { make_test_call.start_read(@tag) }
blk = proc { make_test_call.start_read(@tag) }
expect(&blk).to raise_error GRPC::Core::CallError
end
end
@ -96,21 +93,21 @@ describe GRPC::Core::Call do
describe '#start_write' do
it 'should fail if called immediately' do
bytes = GRPC::Core::ByteBuffer.new('test string')
blk = Proc.new { make_test_call.start_write(bytes, @tag) }
blk = proc { make_test_call.start_write(bytes, @tag) }
expect(&blk).to raise_error GRPC::Core::CallError
end
end
describe '#start_write_status' do
it 'should fail if called immediately' do
blk = Proc.new { make_test_call.start_write_status(153, 'x', @tag) }
blk = proc { make_test_call.start_write_status(153, 'x', @tag) }
expect(&blk).to raise_error GRPC::Core::CallError
end
end
describe '#writes_done' do
it 'should fail if called immediately' do
blk = Proc.new { make_test_call.writes_done(Object.new) }
blk = proc { make_test_call.writes_done(Object.new) }
expect(&blk).to raise_error GRPC::Core::CallError
end
end
@ -119,7 +116,8 @@ describe GRPC::Core::Call do
it 'adds metadata to a call without fail' do
call = make_test_call
n = 37
metadata = Hash[n.times.collect { |i| ["key%d" % i, "value%d" %i] } ]
one_md = proc { |x| [sprintf('key%d', x), sprintf('value%d', x)] }
metadata = Hash[n.times.collect { |i| one_md.call i }]
expect { call.add_metadata(metadata) }.to_not raise_error
end
end
@ -191,7 +189,6 @@ describe GRPC::Core::Call do
end
end
def make_test_call
@ch.create_call('dummy_method', 'dummy_host', deadline)
end
@ -199,5 +196,4 @@ describe GRPC::Core::Call do
def deadline
Time.now + 2 # in 2 seconds; arbitrary
end
end

@ -37,8 +37,6 @@ def load_test_certs
end
describe GRPC::Core::Channel do
def create_test_cert
GRPC::Core::Credentials.new(load_test_certs[0])
end
@ -48,7 +46,6 @@ describe GRPC::Core::Channel do
end
shared_examples '#new' do
it 'take a host name without channel args' do
expect { GRPC::Core::Channel.new('dummy_host', nil) }.not_to raise_error
end
@ -61,14 +58,14 @@ describe GRPC::Core::Channel do
end
it 'does not take a hash with bad values as channel args' do
blk = construct_with_args(:symbol => Object.new)
blk = construct_with_args(symbol: Object.new)
expect(&blk).to raise_error TypeError
blk = construct_with_args('1' => Hash.new)
expect(&blk).to raise_error TypeError
end
it 'can take a hash with a symbol key as channel args' do
blk = construct_with_args(:a_symbol => 1)
blk = construct_with_args(a_symbol: 1)
expect(&blk).to_not raise_error
end
@ -78,17 +75,17 @@ describe GRPC::Core::Channel do
end
it 'can take a hash with a string value as channel args' do
blk = construct_with_args(:a_symbol => '1')
blk = construct_with_args(a_symbol: '1')
expect(&blk).to_not raise_error
end
it 'can take a hash with a symbol value as channel args' do
blk = construct_with_args(:a_symbol => :another_symbol)
blk = construct_with_args(a_symbol: :another_symbol)
expect(&blk).to_not raise_error
end
it 'can take a hash with a numeric value as channel args' do
blk = construct_with_args(:a_symbol => 1)
blk = construct_with_args(a_symbol: 1)
expect(&blk).to_not raise_error
end
@ -97,13 +94,11 @@ describe GRPC::Core::Channel do
blk = construct_with_args(args)
expect(&blk).to_not raise_error
end
end
describe '#new for secure channels' do
def construct_with_args(a)
Proc.new { GRPC::Core::Channel.new('dummy_host', a, create_test_cert) }
proc { GRPC::Core::Channel.new('dummy_host', a, create_test_cert) }
end
it_behaves_like '#new'
@ -113,7 +108,7 @@ describe GRPC::Core::Channel do
it_behaves_like '#new'
def construct_with_args(a)
Proc.new { GRPC::Core::Channel.new('dummy_host', a) }
proc { GRPC::Core::Channel.new('dummy_host', a) }
end
end
@ -125,7 +120,7 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5
blk = Proc.new do
blk = proc do
ch.create_call('dummy_method', 'dummy_host', deadline)
end
expect(&blk).to_not raise_error
@ -138,12 +133,11 @@ describe GRPC::Core::Channel do
ch.close
deadline = Time.now + 5
blk = Proc.new do
blk = proc do
ch.create_call('dummy_method', 'dummy_host', deadline)
end
expect(&blk).to raise_error(RuntimeError)
end
end
describe '#destroy' do
@ -151,7 +145,7 @@ describe GRPC::Core::Channel do
port = find_unused_tcp_port
host = "localhost:#{port}"
ch = GRPC::Core::Channel.new(host, nil)
blk = Proc.new { ch.destroy }
blk = proc { ch.destroy }
expect(&blk).to_not raise_error
end
@ -159,18 +153,16 @@ describe GRPC::Core::Channel do
port = find_unused_tcp_port
host = "localhost:#{port}"
ch = GRPC::Core::Channel.new(host, nil)
blk = Proc.new { ch.destroy }
blk = proc { ch.destroy }
blk.call
expect(&blk).to_not raise_error
end
end
describe '::SSL_TARGET' do
it 'is a symbol' do
expect(GRPC::Core::Channel::SSL_TARGET).to be_a(Symbol)
end
end
describe '#close' do
@ -178,7 +170,7 @@ describe GRPC::Core::Channel do
port = find_unused_tcp_port
host = "localhost:#{port}"
ch = GRPC::Core::Channel.new(host, nil)
blk = Proc.new { ch.close }
blk = proc { ch.close }
expect(&blk).to_not raise_error
end
@ -186,10 +178,9 @@ describe GRPC::Core::Channel do
port = find_unused_tcp_port
host = "localhost:#{port}"
ch = GRPC::Core::Channel.new(host, nil)
blk = Proc.new { ch.close }
blk = proc { ch.close }
blk.call
expect(&blk).to_not raise_error
end
end
end

@ -41,7 +41,6 @@ def load_test_certs
end
shared_context 'setup: tags' do
before(:example) do
@server_finished_tag = Object.new
@client_finished_tag = Object.new
@ -71,7 +70,7 @@ shared_context 'setup: tags' do
expect(ev).not_to be_nil
expect(ev.type).to be(SERVER_RPC_NEW)
ev.call.server_accept(@server_queue, @server_finished_tag)
ev.call.server_end_initial_metadata()
ev.call.server_end_initial_metadata
ev.call.start_read(@server_tag)
ev = @server_queue.pluck(@server_tag, TimeConsts::INFINITE_FUTURE)
expect(ev.type).to be(READ)
@ -79,7 +78,7 @@ shared_context 'setup: tags' do
ev = @server_queue.pluck(@server_tag, TimeConsts::INFINITE_FUTURE)
expect(ev).not_to be_nil
expect(ev.type).to be(WRITE_ACCEPTED)
return ev.call
ev.call
end
def client_sends(call, sent = 'a message')
@ -92,17 +91,15 @@ shared_context 'setup: tags' do
ev = @client_queue.pluck(@tag, TimeConsts::INFINITE_FUTURE)
expect(ev).not_to be_nil
expect(ev.type).to be(WRITE_ACCEPTED)
return sent
sent
end
def new_client_call
@ch.create_call('/method', 'localhost', deadline)
end
end
shared_examples 'basic GRPC message delivery is OK' do
include_context 'setup: tags'
it 'servers receive requests from clients and start responding' do
@ -126,7 +123,7 @@ shared_examples 'basic GRPC message delivery is OK' do
# the server response
server_call.start_write(reply, @server_tag)
ev = expect_next_event_on(@server_queue, WRITE_ACCEPTED, @server_tag)
expect_next_event_on(@server_queue, WRITE_ACCEPTED, @server_tag)
end
it 'responses written by servers are received by the client' do
@ -135,15 +132,14 @@ shared_examples 'basic GRPC message delivery is OK' do
server_receives_and_responds_with('server_response')
call.start_read(@tag)
ev = expect_next_event_on(@client_queue, CLIENT_METADATA_READ, @tag)
expect_next_event_on(@client_queue, CLIENT_METADATA_READ, @tag)
ev = expect_next_event_on(@client_queue, READ, @tag)
expect(ev.result.to_s).to eq('server_response')
end
it 'servers can ignore a client write and send a status' do
reply = ByteBuffer.new('the server payload')
call = new_client_call
msg = client_sends(call)
client_sends(call)
# check the server rpc new was received
@server.request_call(@server_tag)
@ -153,20 +149,20 @@ shared_examples 'basic GRPC message delivery is OK' do
# accept the call - need to do this to sent status.
server_call = ev.call
server_call.server_accept(@server_queue, @server_finished_tag)
server_call.server_end_initial_metadata()
server_call.server_end_initial_metadata
server_call.start_write_status(StatusCodes::NOT_FOUND, 'not found',
@server_tag)
# client gets an empty response for the read, preceeded by some metadata.
call.start_read(@tag)
ev = expect_next_event_on(@client_queue, CLIENT_METADATA_READ, @tag)
expect_next_event_on(@client_queue, CLIENT_METADATA_READ, @tag)
ev = expect_next_event_on(@client_queue, READ, @tag)
expect(ev.tag).to be(@tag)
expect(ev.result.to_s).to eq('')
# finally, after client sends writes_done, they get the finished.
call.writes_done(@tag)
ev = expect_next_event_on(@client_queue, FINISH_ACCEPTED, @tag)
expect_next_event_on(@client_queue, FINISH_ACCEPTED, @tag)
ev = expect_next_event_on(@client_queue, FINISHED, @client_finished_tag)
expect(ev.result.code).to eq(StatusCodes::NOT_FOUND)
end
@ -175,12 +171,12 @@ shared_examples 'basic GRPC message delivery is OK' do
call = new_client_call
client_sends(call)
server_call = server_receives_and_responds_with('server_response')
server_call.start_write_status(10101, 'status code is 10101', @server_tag)
server_call.start_write_status(10_101, 'status code is 10101', @server_tag)
# first the client says writes are done
call.start_read(@tag)
ev = expect_next_event_on(@client_queue, CLIENT_METADATA_READ, @tag)
ev = expect_next_event_on(@client_queue, READ, @tag)
expect_next_event_on(@client_queue, CLIENT_METADATA_READ, @tag)
expect_next_event_on(@client_queue, READ, @tag)
call.writes_done(@tag)
# but nothing happens until the server sends a status
@ -192,24 +188,23 @@ shared_examples 'basic GRPC message delivery is OK' do
expect_next_event_on(@client_queue, FINISH_ACCEPTED, @tag)
ev = expect_next_event_on(@client_queue, FINISHED, @client_finished_tag)
expect(ev.result.details).to eq('status code is 10101')
expect(ev.result.code).to eq(10101)
expect(ev.result.code).to eq(10_101)
end
end
shared_examples 'GRPC metadata delivery works OK' do
include_context 'setup: tags'
describe 'from client => server' do
before(:example) do
n = 7 # arbitrary number of metadata
diff_keys = Hash[n.times.collect { |i| ['k%d' % i, 'v%d' % i] }]
null_vals = Hash[n.times.collect { |i| ['k%d' % i, 'v\0%d' % i] }]
same_keys = Hash[n.times.collect { |i| ['k%d' % i, ['v%d' % i] * n] }]
symbol_key = {:a_key => 'a val'}
diff_keys_fn = proc { |i| [sprintf('k%d', i), sprintf('v%d', i)] }
diff_keys = Hash[n.times.collect { |x| diff_keys_fn.call x }]
null_vals_fn = proc { |i| [sprintf('k%d', i), sprintf('v\0%d', i)] }
null_vals = Hash[n.times.collect { |x| null_vals_fn.call x }]
same_keys_fn = proc { |i| [sprintf('k%d', i), [sprintf('v%d', i)] * n] }
same_keys = Hash[n.times.collect { |x| same_keys_fn.call x }]
symbol_key = { a_key: 'a val' }
@valid_metadata = [diff_keys, same_keys, null_vals, symbol_key]
@bad_keys = []
@bad_keys << { Object.new => 'a value' }
@ -239,7 +234,7 @@ shared_examples 'GRPC metadata delivery works OK' do
# Client begins a call OK
call.start_invoke(@client_queue, @tag, @tag, @client_finished_tag)
ev = expect_next_event_on(@client_queue, INVOKE_ACCEPTED, @tag)
expect_next_event_on(@client_queue, INVOKE_ACCEPTED, @tag)
# ... server has all metadata available even though the client did not
# send a write
@ -250,17 +245,18 @@ shared_examples 'GRPC metadata delivery works OK' do
expect(result.merge(replace_symbols)).to eq(result)
end
end
end
describe 'from server => client' do
before(:example) do
n = 7 # arbitrary number of metadata
diff_keys = Hash[n.times.collect { |i| ['k%d' % i, 'v%d' % i] }]
null_vals = Hash[n.times.collect { |i| ['k%d' % i, 'v\0%d' % i] }]
same_keys = Hash[n.times.collect { |i| ['k%d' % i, ['v%d' % i] * n] }]
symbol_key = {:a_key => 'a val'}
diff_keys_fn = proc { |i| [sprintf('k%d', i), sprintf('v%d', i)] }
diff_keys = Hash[n.times.collect { |x| diff_keys_fn.call x }]
null_vals_fn = proc { |i| [sprintf('k%d', i), sprintf('v\0%d', i)] }
null_vals = Hash[n.times.collect { |x| null_vals_fn.call x }]
same_keys_fn = proc { |i| [sprintf('k%d', i), [sprintf('v%d', i)] * n] }
same_keys = Hash[n.times.collect { |x| same_keys_fn.call x }]
symbol_key = { a_key: 'a val' }
@valid_metadata = [diff_keys, same_keys, null_vals, symbol_key]
@bad_keys = []
@bad_keys << { Object.new => 'a value' }
@ -290,7 +286,7 @@ shared_examples 'GRPC metadata delivery works OK' do
# ... server accepts the call without adding metadata
server_call.server_accept(@server_queue, @server_finished_tag)
server_call.server_end_initial_metadata()
server_call.server_end_initial_metadata
# ... these server sends some data, allowing the metadata read
server_call.start_write(ByteBuffer.new('reply with metadata'),
@ -300,7 +296,7 @@ shared_examples 'GRPC metadata delivery works OK' do
# there is the HTTP status metadata, though there should not be any
# TODO(temiola): update this with the bug number to be resolved
ev = expect_next_event_on(@client_queue, CLIENT_METADATA_READ, @tag)
expect(ev.result).to eq({':status' => '200'})
expect(ev.result).to eq(':status' => '200')
end
it 'sends all the pairs and status:200 when keys and values are valid' do
@ -316,7 +312,7 @@ shared_examples 'GRPC metadata delivery works OK' do
# ... server adds metadata and accepts the call
server_call.add_metadata(md)
server_call.server_accept(@server_queue, @server_finished_tag)
server_call.server_end_initial_metadata()
server_call.server_end_initial_metadata
# Now the client can read the metadata
ev = expect_next_event_on(@client_queue, CLIENT_METADATA_READ, @tag)
@ -324,16 +320,11 @@ shared_examples 'GRPC metadata delivery works OK' do
replace_symbols[':status'] = '200'
expect(ev.result).to eq(replace_symbols)
end
end
end
end
describe 'the http client/server' do
before(:example) do
port = find_unused_tcp_port
host = "localhost:#{port}"
@ -354,11 +345,9 @@ describe 'the http client/server' do
it_behaves_like 'GRPC metadata delivery works OK' do
end
end
describe 'the secure http client/server' do
before(:example) do
certs = load_test_certs
port = find_unused_tcp_port
@ -383,5 +372,4 @@ describe 'the secure http client/server' do
it_behaves_like 'GRPC metadata delivery works OK' do
end
end

@ -30,7 +30,6 @@
require 'grpc'
describe GRPC::Core::CompletionQueue do
describe '#new' do
it 'is constructed successufully' do
expect { GRPC::Core::CompletionQueue.new }.not_to raise_error
@ -53,7 +52,6 @@ describe GRPC::Core::CompletionQueue do
expect { ch.next(a_time) }.not_to raise_error
end
end
end
describe '#pluck' do
@ -74,8 +72,5 @@ describe GRPC::Core::CompletionQueue do
expect { ch.pluck(tag, a_time) }.not_to raise_error
end
end
end
end

@ -29,7 +29,6 @@
require 'grpc'
def load_test_certs
test_root = File.join(File.dirname(__FILE__), 'testdata')
files = ['ca.pem', 'server1.pem', 'server1.key']
@ -39,9 +38,7 @@ end
Credentials = GRPC::Core::Credentials
describe Credentials do
describe '#new' do
it 'can be constructed with fake inputs' do
expect { Credentials.new('root_certs', 'key', 'cert') }.not_to raise_error
end
@ -58,30 +55,23 @@ describe Credentials do
it 'cannot be constructed with a nil server roots' do
_, client_key, client_chain = load_test_certs
blk = Proc.new { Credentials.new(nil, client_key, client_chain) }
blk = proc { Credentials.new(nil, client_key, client_chain) }
expect(&blk).to raise_error
end
end
describe '#compose' do
it 'can be completed OK' do
certs = load_test_certs
cred1 = Credentials.new(*certs)
cred2 = Credentials.new(*certs)
expect { cred1.compose(cred2) }.to_not raise_error
end
end
describe 'Credentials#default' do
it 'is not implemented yet' do
expect { Credentials.default() }.to raise_error RuntimeError
expect { Credentials.default }.to raise_error RuntimeError
end
end
end

@ -30,25 +30,23 @@
require 'grpc'
describe GRPC::Core::CompletionType do
before(:each) do
@known_types = {
:QUEUE_SHUTDOWN => 0,
:READ => 1,
:INVOKE_ACCEPTED => 2,
:WRITE_ACCEPTED => 3,
:FINISH_ACCEPTED => 4,
:CLIENT_METADATA_READ => 5,
:FINISHED => 6,
:SERVER_RPC_NEW => 7,
:RESERVED => 8
QUEUE_SHUTDOWN: 0,
READ: 1,
INVOKE_ACCEPTED: 2,
WRITE_ACCEPTED: 3,
FINISH_ACCEPTED: 4,
CLIENT_METADATA_READ: 5,
FINISHED: 6,
SERVER_RPC_NEW: 7,
RESERVED: 8
}
end
it 'should have all the known types' do
mod = GRPC::Core::CompletionType
blk = Proc.new { Hash[mod.constants.collect { |c| [c, mod.const_get(c)] }] }
blk = proc { Hash[mod.constants.collect { |c| [c, mod.const_get(c)] }] }
expect(blk.call).to eq(@known_types)
end
end

@ -38,9 +38,9 @@ describe GRPC::ActiveCall do
CompletionType = GRPC::Core::CompletionType
before(:each) do
@pass_through = Proc.new { |x| x }
@pass_through = proc { |x| x }
@server_tag = Object.new
@server_done_tag, meta_tag = Object.new
@server_done_tag = Object.new
@tag = Object.new
@client_queue = GRPC::Core::CompletionQueue.new
@ -70,7 +70,7 @@ describe GRPC::ActiveCall do
describe '#multi_req_view' do
it 'exposes a fixed subset of the ActiveCall methods' do
want = ['cancelled', 'deadline', 'each_remote_read', 'shutdown']
want = %w(cancelled, deadline, each_remote_read, shutdown)
v = @client_call.multi_req_view
want.each do |w|
expect(v.methods.include?(w))
@ -80,7 +80,7 @@ describe GRPC::ActiveCall do
describe '#single_req_view' do
it 'exposes a fixed subset of the ActiveCall methods' do
want = ['cancelled', 'deadline', 'shutdown']
want = %w(cancelled, deadline, shutdown)
v = @client_call.single_req_view
want.each do |w|
expect(v.methods.include?(w))
@ -110,7 +110,7 @@ describe GRPC::ActiveCall do
# Accept the call, and verify that the server reads the response ok.
ev.call.server_accept(@client_queue, @server_tag)
ev.call.server_end_initial_metadata()
ev.call.server_end_initial_metadata
server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq(msg)
@ -120,7 +120,7 @@ describe GRPC::ActiveCall do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
marshal = Proc.new { |x| 'marshalled:' + x }
marshal = proc { |x| 'marshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, marshal,
@pass_through, deadline,
finished_tag: done_tag,
@ -132,33 +132,29 @@ describe GRPC::ActiveCall do
@server.request_call(@server_tag)
ev = @server_queue.next(deadline)
ev.call.server_accept(@client_queue, @server_tag)
ev.call.server_end_initial_metadata()
ev.call.server_end_initial_metadata
server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
end
describe '#client_start_invoke' do
it 'sends keywords as metadata to the server when the are present' do
call, pass_through = make_test_call, Proc.new { |x| x }
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline, k1: 'v1',
k2: 'v2')
call = make_test_call
ActiveCall.client_start_invoke(call, @client_queue, deadline,
k1: 'v1', k2: 'v2')
@server.request_call(@server_tag)
ev = @server_queue.next(deadline)
expect(ev).to_not be_nil
expect(ev.result.metadata['k1']).to eq('v1')
expect(ev.result.metadata['k2']).to eq('v2')
end
end
describe '#remote_read' do
it 'reads the response sent by a server' do
call, pass_through = make_test_call, Proc.new { |x| x }
call = make_test_call
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@ -173,7 +169,7 @@ describe GRPC::ActiveCall do
end
it 'saves metadata { status=200 } when the server adds no metadata' do
call, pass_through = make_test_call, Proc.new { |x| x }
call = make_test_call
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@ -186,11 +182,11 @@ describe GRPC::ActiveCall do
server_call.remote_send('ignore me')
expect(client_call.metadata).to be_nil
client_call.remote_read
expect(client_call.metadata).to eq({':status' => '200'})
expect(client_call.metadata).to eq(':status' => '200')
end
it 'saves metadata add by the server' do
call, pass_through = make_test_call, Proc.new { |x| x }
call = make_test_call
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@ -203,13 +199,12 @@ describe GRPC::ActiveCall do
server_call.remote_send('ignore me')
expect(client_call.metadata).to be_nil
client_call.remote_read
expect(client_call.metadata).to eq({':status' => '200', 'k1' => 'v1',
'k2' => 'v2'})
expected = { ':status' => '200', 'k1' => 'v1', 'k2' => 'v2' }
expect(client_call.metadata).to eq(expected)
end
it 'get a nil msg before a status when an OK status is sent' do
call, pass_through = make_test_call, Proc.new { |x| x }
call = make_test_call
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@ -227,12 +222,11 @@ describe GRPC::ActiveCall do
expect(res).to be_nil
end
it 'unmarshals the response using the unmarshal func' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
unmarshal = Proc.new { |x| 'unmarshalled:' + x }
unmarshal = proc { |x| 'unmarshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, @pass_through,
unmarshal, deadline,
finished_tag: done_tag,
@ -245,7 +239,6 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response')
expect(client_call.remote_read).to eq('unmarshalled:server_response')
end
end
describe '#each_remote_read' do
@ -298,7 +291,6 @@ describe GRPC::ActiveCall do
server_call.send_status(OK, 'OK')
expect { e.next }.to raise_error(StopIteration)
end
end
describe '#writes_done' do
@ -357,7 +349,6 @@ describe GRPC::ActiveCall do
expect { client_call.writes_done(true) }.to_not raise_error
expect { server_call.finished }.to_not raise_error
end
end
def expect_server_to_receive(sent_text, **kw)
@ -371,7 +362,7 @@ describe GRPC::ActiveCall do
ev = @server_queue.next(deadline)
ev.call.add_metadata(kw)
ev.call.server_accept(@client_queue, @server_done_tag)
ev.call.server_end_initial_metadata()
ev.call.server_end_initial_metadata
ActiveCall.new(ev.call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: @server_done_tag)
@ -384,5 +375,4 @@ describe GRPC::ActiveCall do
def deadline
Time.now + 0.25 # in 0.25 seconds; arbitrary
end
end

@ -31,7 +31,7 @@ require 'grpc'
require 'xray/thread_dump_signal_handler'
require_relative '../port_picker'
NOOP = Proc.new { |x| x }
NOOP = proc { |x| x }
def wakey_thread(&blk)
awake_mutex, awake_cond = Mutex.new, ConditionVariable.new
@ -52,7 +52,6 @@ include GRPC::Core::StatusCodes
include GRPC::Core::TimeConsts
describe 'ClientStub' do
before(:each) do
Thread.abort_on_exception = true
@server = nil
@ -67,11 +66,10 @@ describe 'ClientStub' do
end
describe '#new' do
it 'can be created from a host and args' do
host = new_test_host
opts = {:a_channel_arg => 'an_arg'}
blk = Proc.new do
opts = { a_channel_arg: 'an_arg' }
blk = proc do
GRPC::ClientStub.new(host, @cq, **opts)
end
expect(&blk).not_to raise_error
@ -79,8 +77,8 @@ describe 'ClientStub' do
it 'can be created with a default deadline' do
host = new_test_host
opts = {:a_channel_arg => 'an_arg', :deadline => 5}
blk = Proc.new do
opts = { a_channel_arg: 'an_arg', deadline: 5 }
blk = proc do
GRPC::ClientStub.new(host, @cq, **opts)
end
expect(&blk).not_to raise_error
@ -88,8 +86,8 @@ describe 'ClientStub' do
it 'can be created with an channel override' do
host = new_test_host
opts = {:a_channel_arg => 'an_arg', :channel_override => @ch}
blk = Proc.new do
opts = { a_channel_arg: 'an_arg', channel_override: @ch }
blk = proc do
GRPC::ClientStub.new(host, @cq, **opts)
end
expect(&blk).not_to raise_error
@ -97,8 +95,8 @@ describe 'ClientStub' do
it 'cannot be created with a bad channel override' do
host = new_test_host
blk = Proc.new do
opts = {:a_channel_arg => 'an_arg', :channel_override => Object.new}
blk = proc do
opts = { a_channel_arg: 'an_arg', channel_override: Object.new }
GRPC::ClientStub.new(host, @cq, **opts)
end
expect(&blk).to raise_error
@ -106,8 +104,8 @@ describe 'ClientStub' do
it 'cannot be created with bad credentials' do
host = new_test_host
blk = Proc.new do
opts = {:a_channel_arg => 'an_arg', :creds => Object.new}
blk = proc do
opts = { a_channel_arg: 'an_arg', creds: Object.new }
GRPC::ClientStub.new(host, @cq, **opts)
end
expect(&blk).to raise_error
@ -116,17 +114,16 @@ describe 'ClientStub' do
it 'can be created with test test credentials' do
certs = load_test_certs
host = new_test_host
blk = Proc.new do
blk = proc do
opts = {
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com',
:a_channel_arg => 'an_arg',
:creds => GRPC::Core::Credentials.new(certs[0], nil, nil)
a_channel_arg: 'an_arg',
creds: GRPC::Core::Credentials.new(certs[0], nil, nil)
}
GRPC::ClientStub.new(host, @cq, **opts)
end
expect(&blk).to_not raise_error
end
end
describe '#request_response' do
@ -135,7 +132,6 @@ describe 'ClientStub' do
end
shared_examples 'request response' do
it 'should send a request to/receive a reply from a server' do
host = new_test_host
th = run_request_response(host, @sent_msg, @resp, @pass)
@ -146,8 +142,8 @@ describe 'ClientStub' do
it 'should send metadata to the server ok' do
host = new_test_host
th = run_request_response(host, @sent_msg, @resp, @pass, k1: 'v1',
k2: 'v2')
th = run_request_response(host, @sent_msg, @resp, @pass,
k1: 'v1', k2: 'v2')
stub = GRPC::ClientStub.new(host, @cq)
expect(get_response(stub)).to eq(@resp)
th.join
@ -157,7 +153,10 @@ describe 'ClientStub' do
host = new_test_host
th = run_request_response(host, @sent_msg, @resp, @pass,
k1: 'updated-v1', k2: 'v2')
update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md }
update_md = proc do |md|
md[:k1] = 'updated-v1'
md
end
stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)
expect(get_response(stub)).to eq(@resp)
th.join
@ -176,43 +175,35 @@ describe 'ClientStub' do
host = new_test_host
th = run_request_response(host, @sent_msg, @resp, @fail)
stub = GRPC::ClientStub.new(host, @cq)
blk = Proc.new { get_response(stub) }
blk = proc { get_response(stub) }
expect(&blk).to raise_error(GRPC::BadStatus)
th.join
end
end
describe 'without a call operation' do
def get_response(stub)
stub.request_response(@method, @sent_msg, NOOP, NOOP, k1: 'v1',
k2: 'v2')
stub.request_response(@method, @sent_msg, NOOP, NOOP,
k1: 'v1', k2: 'v2')
end
it_behaves_like 'request response'
end
describe 'via a call operation' do
def get_response(stub)
op = stub.request_response(@method, @sent_msg, NOOP, NOOP,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute()
op.execute
end
it_behaves_like 'request response'
end
end
describe '#client_streamer' do
shared_examples 'client streaming' do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@resp = 'a_reply'
@ -228,19 +219,21 @@ describe 'ClientStub' do
it 'should send metadata to the server ok' do
host = new_test_host
th = run_client_streamer(host, @sent_msgs, @resp, @pass, k1: 'v1',
k2: 'v2')
th = run_client_streamer(host, @sent_msgs, @resp, @pass,
k1: 'v1', k2: 'v2')
stub = GRPC::ClientStub.new(host, @cq)
expect(get_response(stub)).to eq(@resp)
th.join
end
it 'should update the sent metadata with a provided metadata updater' do
host = new_test_host
th = run_client_streamer(host, @sent_msgs, @resp, @pass,
k1: 'updated-v1', k2: 'v2')
update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md }
update_md = proc do |md|
md[:k1] = 'updated-v1'
md
end
stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)
expect(get_response(stub)).to eq(@resp)
th.join
@ -250,43 +243,35 @@ describe 'ClientStub' do
host = new_test_host
th = run_client_streamer(host, @sent_msgs, @resp, @fail)
stub = GRPC::ClientStub.new(host, @cq)
blk = Proc.new { get_response(stub) }
blk = proc { get_response(stub) }
expect(&blk).to raise_error(GRPC::BadStatus)
th.join
end
end
describe 'without a call operation' do
def get_response(stub)
stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, k1: 'v1',
k2: 'v2')
stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
k1: 'v1', k2: 'v2')
end
it_behaves_like 'client streaming'
end
describe 'via a call operation' do
def get_response(stub)
op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
resp = op.execute()
op.execute
end
it_behaves_like 'client streaming'
end
end
describe '#server_streamer' do
shared_examples 'server streaming' do
before(:each) do
@sent_msg = 'a_msg'
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@ -311,8 +296,8 @@ describe 'ClientStub' do
it 'should send metadata to the server ok' do
host = new_test_host
th = run_server_streamer(host, @sent_msg, @replys, @fail, k1: 'v1',
k2: 'v2')
th = run_server_streamer(host, @sent_msg, @replys, @fail,
k1: 'v1', k2: 'v2')
stub = GRPC::ClientStub.new(host, @cq)
e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
@ -323,55 +308,50 @@ describe 'ClientStub' do
host = new_test_host
th = run_server_streamer(host, @sent_msg, @replys, @pass,
k1: 'updated-v1', k2: 'v2')
update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md }
update_md = proc do |md|
md[:k1] = 'updated-v1'
md
end
stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@replys)
th.join
end
end
describe 'without a call operation' do
def get_responses(stub)
e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, k1: 'v1',
k2: 'v2')
e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
k1: 'v1', k2: 'v2')
expect(e).to be_a(Enumerator)
e
end
it_behaves_like 'server streaming'
end
describe 'via a call operation' do
def get_responses(stub)
op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute()
e = op.execute
expect(e).to be_a(Enumerator)
e
end
it_behaves_like 'server streaming'
end
end
describe '#bidi_streamer' do
shared_examples 'bidi streaming' do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
end
it 'supports sending all the requests first', :bidi => true do
it 'supports sending all the requests first', bidi: true do
host = new_test_host
th = run_bidi_streamer_handle_inputs_first(host, @sent_msgs, @replys,
@pass)
@ -381,7 +361,7 @@ describe 'ClientStub' do
th.join
end
it 'supports client-initiated ping pong', :bidi => true do
it 'supports client-initiated ping pong', bidi: true do
host = new_test_host
th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, true)
stub = GRPC::ClientStub.new(host, @cq)
@ -396,7 +376,7 @@ describe 'ClientStub' do
# servers don't know if all the client metadata has been sent until
# they receive a message from the client. Without receiving all the
# metadata, the server does not accept the call, so this test hangs.
xit 'supports a server-initiated ping pong', :bidi => true do
xit 'supports a server-initiated ping pong', bidi: true do
host = new_test_host
th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, false)
stub = GRPC::ClientStub.new(host, @cq)
@ -404,11 +384,9 @@ describe 'ClientStub' do
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
end
describe 'without a call operation' do
def get_responses(stub)
e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
expect(e).to be_a(Enumerator)
@ -416,13 +394,12 @@ describe 'ClientStub' do
end
it_behaves_like 'bidi streaming'
end
describe 'via a call operation' do
def get_responses(stub)
op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, return_op:true)
op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP,
return_op: true)
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
expect(e).to be_a(Enumerator)
@ -430,9 +407,7 @@ describe 'ClientStub' do
end
it_behaves_like 'bidi streaming'
end
end
def run_server_streamer(hostname, expected_input, replys, status, **kw)
@ -514,14 +489,13 @@ describe 'ClientStub' do
def expect_server_to_be_invoked(hostname, awake_mutex, awake_cond)
server_queue = start_test_server(hostname, awake_mutex, awake_cond)
test_deadline = Time.now + 10 # fail tests after 10 seconds
ev = server_queue.pluck(@server_tag, INFINITE_FUTURE)
raise OutOfTime if ev.nil?
fail OutOfTime if ev.nil?
server_call = ev.call
server_call.metadata = ev.result.metadata
finished_tag = Object.new
server_call.server_accept(server_queue, finished_tag)
server_call.server_end_initial_metadata()
server_call.server_end_initial_metadata
GRPC::ActiveCall.new(server_call, server_queue, NOOP, NOOP, INFINITE_FUTURE,
finished_tag: finished_tag)
end
@ -530,5 +504,4 @@ describe 'ClientStub' do
port = find_unused_tcp_port
"localhost:#{port}"
end
end

@ -30,9 +30,7 @@
require 'grpc'
require 'grpc/generic/rpc_desc'
describe GRPC::RpcDesc do
RpcDesc = GRPC::RpcDesc
Stream = RpcDesc::Stream
OK = GRPC::Core::StatusCodes::OK
@ -56,7 +54,6 @@ describe GRPC::RpcDesc do
end
describe '#run_server_method' do
describe 'for request responses' do
before(:each) do
@call = double('active_call')
@ -78,7 +75,7 @@ describe GRPC::RpcDesc do
it 'absorbs EventError with no further action' do
expect(@call).to receive(:remote_read).once.and_raise(EventError)
blk = Proc.new do
blk = proc do
@request_response.run_server_method(@call, method(:fake_reqresp))
end
expect(&blk).to_not raise_error
@ -86,7 +83,7 @@ describe GRPC::RpcDesc do
it 'absorbs CallError with no further action' do
expect(@call).to receive(:remote_read).once.and_raise(CallError)
blk = Proc.new do
blk = proc do
@request_response.run_server_method(@call, method(:fake_reqresp))
end
expect(&blk).to_not raise_error
@ -100,7 +97,6 @@ describe GRPC::RpcDesc do
expect(@call).to receive(:finished).once
@request_response.run_server_method(@call, method(:fake_reqresp))
end
end
describe 'for client streamers' do
@ -122,7 +118,7 @@ describe GRPC::RpcDesc do
it 'absorbs EventError with no further action' do
expect(@call).to receive(:remote_send).once.and_raise(EventError)
blk = Proc.new do
blk = proc do
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
expect(&blk).to_not raise_error
@ -130,20 +126,18 @@ describe GRPC::RpcDesc do
it 'absorbs CallError with no further action' do
expect(@call).to receive(:remote_send).once.and_raise(CallError)
blk = Proc.new do
blk = proc do
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
expect(&blk).to_not raise_error
end
it 'sends a response and closes the stream if there no errors' do
req = Object.new
expect(@call).to receive(:remote_send).once.with(@ok_response)
expect(@call).to receive(:send_status).once.with(OK, 'OK')
expect(@call).to receive(:finished).once
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
end
describe 'for server streaming' do
@ -167,7 +161,7 @@ describe GRPC::RpcDesc do
it 'absorbs EventError with no further action' do
expect(@call).to receive(:remote_read).once.and_raise(EventError)
blk = Proc.new do
blk = proc do
@server_streamer.run_server_method(@call, method(:fake_svstream))
end
expect(&blk).to_not raise_error
@ -175,7 +169,7 @@ describe GRPC::RpcDesc do
it 'absorbs CallError with no further action' do
expect(@call).to receive(:remote_read).once.and_raise(CallError)
blk = Proc.new do
blk = proc do
@server_streamer.run_server_method(@call, method(:fake_svstream))
end
expect(&blk).to_not raise_error
@ -189,7 +183,6 @@ describe GRPC::RpcDesc do
expect(@call).to receive(:finished).once
@server_streamer.run_server_method(@call, method(:fake_svstream))
end
end
describe 'for bidi streamers' do
@ -215,30 +208,27 @@ describe GRPC::RpcDesc do
end
it 'closes the stream if there no errors' do
req = Object.new
expect(@call).to receive(:run_server_bidi)
expect(@call).to receive(:send_status).once.with(OK, 'OK')
expect(@call).to receive(:finished).once
@bidi_streamer.run_server_method(@call, method(:fake_bidistream))
end
end
end
describe '#assert_arity_matches' do
def no_arg
end
def fake_clstream(arg)
def fake_clstream(_arg)
end
def fake_svstream(arg1, arg2)
def fake_svstream(_arg1, _arg2)
end
it 'raises when a request_response does not have 2 args' do
[:fake_clstream, :no_arg].each do |mth|
blk = Proc.new do
blk = proc do
@request_response.assert_arity_matches(method(mth))
end
expect(&blk).to raise_error
@ -246,7 +236,7 @@ describe GRPC::RpcDesc do
end
it 'passes when a request_response has 2 args' do
blk = Proc.new do
blk = proc do
@request_response.assert_arity_matches(method(:fake_svstream))
end
expect(&blk).to_not raise_error
@ -254,7 +244,7 @@ describe GRPC::RpcDesc do
it 'raises when a server_streamer does not have 2 args' do
[:fake_clstream, :no_arg].each do |mth|
blk = Proc.new do
blk = proc do
@server_streamer.assert_arity_matches(method(mth))
end
expect(&blk).to raise_error
@ -262,7 +252,7 @@ describe GRPC::RpcDesc do
end
it 'passes when a server_streamer has 2 args' do
blk = Proc.new do
blk = proc do
@server_streamer.assert_arity_matches(method(:fake_svstream))
end
expect(&blk).to_not raise_error
@ -270,7 +260,7 @@ describe GRPC::RpcDesc do
it 'raises when a client streamer does not have 1 arg' do
[:fake_svstream, :no_arg].each do |mth|
blk = Proc.new do
blk = proc do
@client_streamer.assert_arity_matches(method(mth))
end
expect(&blk).to raise_error
@ -278,16 +268,15 @@ describe GRPC::RpcDesc do
end
it 'passes when a client_streamer has 1 arg' do
blk = Proc.new do
blk = proc do
@client_streamer.assert_arity_matches(method(:fake_clstream))
end
expect(&blk).to_not raise_error
end
it 'raises when a bidi streamer does not have 1 arg' do
[:fake_svstream, :no_arg].each do |mth|
blk = Proc.new do
blk = proc do
@bidi_streamer.assert_arity_matches(method(mth))
end
expect(&blk).to raise_error
@ -295,88 +284,78 @@ describe GRPC::RpcDesc do
end
it 'passes when a bidi streamer has 1 arg' do
blk = Proc.new do
blk = proc do
@bidi_streamer.assert_arity_matches(method(:fake_clstream))
end
expect(&blk).to_not raise_error
end
end
describe '#is_request_response?' do
describe '#request_response?' do
it 'is true only input and output are both not Streams' do
expect(@request_response.is_request_response?).to be(true)
expect(@client_streamer.is_request_response?).to be(false)
expect(@bidi_streamer.is_request_response?).to be(false)
expect(@server_streamer.is_request_response?).to be(false)
expect(@request_response.request_response?).to be(true)
expect(@client_streamer.request_response?).to be(false)
expect(@bidi_streamer.request_response?).to be(false)
expect(@server_streamer.request_response?).to be(false)
end
end
describe '#is_client_streamer?' do
describe '#client_streamer?' do
it 'is true only when input is a Stream and output is not a Stream' do
expect(@client_streamer.is_client_streamer?).to be(true)
expect(@request_response.is_client_streamer?).to be(false)
expect(@server_streamer.is_client_streamer?).to be(false)
expect(@bidi_streamer.is_client_streamer?).to be(false)
expect(@client_streamer.client_streamer?).to be(true)
expect(@request_response.client_streamer?).to be(false)
expect(@server_streamer.client_streamer?).to be(false)
expect(@bidi_streamer.client_streamer?).to be(false)
end
end
describe '#is_server_streamer?' do
describe '#server_streamer?' do
it 'is true only when output is a Stream and input is not a Stream' do
expect(@server_streamer.is_server_streamer?).to be(true)
expect(@client_streamer.is_server_streamer?).to be(false)
expect(@request_response.is_server_streamer?).to be(false)
expect(@bidi_streamer.is_server_streamer?).to be(false)
expect(@server_streamer.server_streamer?).to be(true)
expect(@client_streamer.server_streamer?).to be(false)
expect(@request_response.server_streamer?).to be(false)
expect(@bidi_streamer.server_streamer?).to be(false)
end
end
describe '#is_bidi_streamer?' do
describe '#bidi_streamer?' do
it 'is true only when output is a Stream and input is a Stream' do
expect(@bidi_streamer.is_bidi_streamer?).to be(true)
expect(@server_streamer.is_bidi_streamer?).to be(false)
expect(@client_streamer.is_bidi_streamer?).to be(false)
expect(@request_response.is_bidi_streamer?).to be(false)
expect(@bidi_streamer.bidi_streamer?).to be(true)
expect(@server_streamer.bidi_streamer?).to be(false)
expect(@client_streamer.bidi_streamer?).to be(false)
expect(@request_response.bidi_streamer?).to be(false)
end
end
def fake_reqresp(req, call)
def fake_reqresp(_req, _call)
@ok_response
end
def fake_clstream(call)
def fake_clstream(_call)
@ok_response
end
def fake_svstream(req, call)
def fake_svstream(_req, _call)
[@ok_response, @ok_response]
end
def fake_bidistream(an_array)
return an_array
an_array
end
def bad_status(req, call)
raise GRPC::BadStatus.new(@bs_code, 'NOK')
def bad_status(_req, _call)
fail GRPC::BadStatus.new(@bs_code, 'NOK')
end
def other_error(req, call)
raise ArgumentError.new('other error')
def other_error(_req, _call)
fail(ArgumentError, 'other error')
end
def bad_status_alt(call)
raise GRPC::BadStatus.new(@bs_code, 'NOK')
def bad_status_alt(_call)
fail GRPC::BadStatus.new(@bs_code, 'NOK')
end
def other_error_alt(call)
raise ArgumentError.new('other error')
def other_error_alt(_call)
fail(ArgumentError, 'other error')
end
end

@ -33,9 +33,7 @@ require 'xray/thread_dump_signal_handler'
Pool = GRPC::RpcServer::Pool
describe Pool do
describe '#new' do
it 'raises if a non-positive size is used' do
expect { Pool.new(0) }.to raise_error
expect { Pool.new(-1) }.to raise_error
@ -45,11 +43,9 @@ describe Pool do
it 'is constructed OK with a positive size' do
expect { Pool.new(1) }.not_to raise_error
end
end
describe '#jobs_waiting' do
it 'at start, it is zero' do
p = Pool.new(1)
expect(p.jobs_waiting).to be(0)
@ -57,74 +53,67 @@ describe Pool do
it 'it increases, with each scheduled job if the pool is not running' do
p = Pool.new(1)
job = Proc.new { }
job = proc {}
expect(p.jobs_waiting).to be(0)
5.times do |i|
p.schedule(&job)
expect(p.jobs_waiting).to be(i + 1)
end
end
it 'it decreases as jobs are run' do
p = Pool.new(1)
job = Proc.new { }
job = proc {}
expect(p.jobs_waiting).to be(0)
3.times do |i|
3.times do
p.schedule(&job)
end
p.start
sleep 2
expect(p.jobs_waiting).to be(0)
end
end
describe '#schedule' do
it 'throws if the pool is already stopped' do
p = Pool.new(1)
p.stop()
job = Proc.new { }
p.stop
job = proc {}
expect { p.schedule(&job) }.to raise_error
end
it 'adds jobs that get run by the pool' do
p = Pool.new(1)
p.start()
p.start
o, q = Object.new, Queue.new
job = Proc.new { q.push(o) }
job = proc { q.push(o) }
p.schedule(&job)
expect(q.pop).to be(o)
p.stop
end
end
describe '#stop' do
it 'works when there are no scheduled tasks' do
p = Pool.new(1)
expect { p.stop() }.not_to raise_error
expect { p.stop }.not_to raise_error
end
it 'stops jobs when there are long running jobs' do
p = Pool.new(1)
p.start()
p.start
o, q = Object.new, Queue.new
job = Proc.new do
job = proc do
sleep(5) # long running
q.push(o)
end
p.schedule(&job)
sleep(1) # should ensure the long job gets scheduled
expect { p.stop() }.not_to raise_error
expect { p.stop }.not_to raise_error
end
end
describe '#start' do
it 'runs pre-scheduled jobs' do
p = Pool.new(2)
o, q = Object.new, Queue.new
@ -146,7 +135,5 @@ describe Pool do
end
p.stop
end
end
end

@ -37,33 +37,37 @@ def load_test_certs
files.map { |f| File.open(File.join(test_root, f)).read }
end
# A test message
class EchoMsg
def self.marshal(o)
def self.marshal(_o)
''
end
def self.unmarshal(o)
def self.unmarshal(_o)
EchoMsg.new
end
end
# A test service with no methods.
class EmptyService
include GRPC::GenericService
end
# A test service without an implementation.
class NoRpcImplementation
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
end
# A test service with an implementation.
class EchoService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
def initialize(default_var='ignored')
def initialize(_default_var = 'ignored')
end
def an_rpc(req, call)
def an_rpc(req, _call)
logger.info('echo service received a request')
req
end
@ -71,14 +75,15 @@ end
EchoStub = EchoService.rpc_stub_class
# A slow test service.
class SlowService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
def initialize(default_var='ignored')
def initialize(_default_var = 'ignored')
end
def an_rpc(req, call)
def an_rpc(req, _call)
delay = 0.25
logger.info("starting a slow #{delay} rpc")
sleep delay
@ -89,7 +94,6 @@ end
SlowStub = SlowService.rpc_stub_class
describe GRPC::RpcServer do
RpcServer = GRPC::RpcServer
StatusCodes = GRPC::Core::StatusCodes
@ -97,7 +101,7 @@ describe GRPC::RpcServer do
@method = 'an_rpc_method'
@pass = 0
@fail = 1
@noop = Proc.new { |x| x }
@noop = proc { |x| x }
@server_queue = GRPC::Core::CompletionQueue.new
port = find_unused_tcp_port
@ -112,18 +116,17 @@ describe GRPC::RpcServer do
end
describe '#new' do
it 'can be created with just some args' do
opts = {:a_channel_arg => 'an_arg'}
blk = Proc.new do
opts = { a_channel_arg: 'an_arg' }
blk = proc do
RpcServer.new(**opts)
end
expect(&blk).not_to raise_error
end
it 'can be created with a default deadline' do
opts = {:a_channel_arg => 'an_arg', :deadline => 5}
blk = Proc.new do
opts = { a_channel_arg: 'an_arg', deadline: 5 }
blk = proc do
RpcServer.new(**opts)
end
expect(&blk).not_to raise_error
@ -131,20 +134,20 @@ describe GRPC::RpcServer do
it 'can be created with a completion queue override' do
opts = {
:a_channel_arg => 'an_arg',
:completion_queue_override => @server_queue
a_channel_arg: 'an_arg',
completion_queue_override: @server_queue
}
blk = Proc.new do
blk = proc do
RpcServer.new(**opts)
end
expect(&blk).not_to raise_error
end
it 'cannot be created with a bad completion queue override' do
blk = Proc.new do
blk = proc do
opts = {
:a_channel_arg => 'an_arg',
:completion_queue_override => Object.new
a_channel_arg: 'an_arg',
completion_queue_override: Object.new
}
RpcServer.new(**opts)
end
@ -152,10 +155,10 @@ describe GRPC::RpcServer do
end
it 'cannot be created with invalid ServerCredentials' do
blk = Proc.new do
blk = proc do
opts = {
:a_channel_arg => 'an_arg',
:creds => Object.new
a_channel_arg: 'an_arg',
creds: Object.new
}
RpcServer.new(**opts)
end
@ -165,10 +168,10 @@ describe GRPC::RpcServer do
it 'can be created with the creds as valid ServerCedentials' do
certs = load_test_certs
server_creds = GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2])
blk = Proc.new do
blk = proc do
opts = {
:a_channel_arg => 'an_arg',
:creds => server_creds
a_channel_arg: 'an_arg',
creds: server_creds
}
RpcServer.new(**opts)
end
@ -176,30 +179,28 @@ describe GRPC::RpcServer do
end
it 'can be created with a server override' do
opts = {:a_channel_arg => 'an_arg', :server_override => @server}
blk = Proc.new do
opts = { a_channel_arg: 'an_arg', server_override: @server }
blk = proc do
RpcServer.new(**opts)
end
expect(&blk).not_to raise_error
end
it 'cannot be created with a bad server override' do
blk = Proc.new do
blk = proc do
opts = {
:a_channel_arg => 'an_arg',
:server_override => Object.new
a_channel_arg: 'an_arg',
server_override: Object.new
}
RpcServer.new(**opts)
end
expect(&blk).to raise_error
end
end
describe '#stopped?' do
before(:each) do
opts = {:a_channel_arg => 'an_arg', :poll_period => 1}
opts = { a_channel_arg: 'an_arg', poll_period: 1 }
@srv = RpcServer.new(**opts)
end
@ -229,33 +230,31 @@ describe GRPC::RpcServer do
expect(@srv.stopped?).to be(true)
t.join
end
end
describe '#running?' do
it 'starts out false' do
opts = {:a_channel_arg => 'an_arg', :server_override => @server}
opts = { a_channel_arg: 'an_arg', server_override: @server }
r = RpcServer.new(**opts)
expect(r.running?).to be(false)
end
it 'is false after run is called with no services registered' do
opts = {
:a_channel_arg => 'an_arg',
:poll_period => 1,
:server_override => @server
a_channel_arg: 'an_arg',
poll_period: 1,
server_override: @server
}
r = RpcServer.new(**opts)
r.run()
r.run
expect(r.running?).to be(false)
end
it 'is true after run is called with a registered service' do
opts = {
:a_channel_arg => 'an_arg',
:poll_period => 1,
:server_override => @server
a_channel_arg: 'an_arg',
poll_period: 1,
server_override: @server
}
r = RpcServer.new(**opts)
r.handle(EchoService)
@ -265,13 +264,11 @@ describe GRPC::RpcServer do
r.stop
t.join
end
end
describe '#handle' do
before(:each) do
@opts = {:a_channel_arg => 'an_arg', :poll_period => 1}
@opts = { a_channel_arg: 'an_arg', poll_period: 1 }
@srv = RpcServer.new(**@opts)
end
@ -309,33 +306,30 @@ describe GRPC::RpcServer do
@srv.handle(EchoService)
expect { r.handle(EchoService) }.to raise_error
end
end
describe '#run' do
before(:each) do
@client_opts = {
:channel_override => @ch
channel_override: @ch
}
@marshal = EchoService.rpc_descs[:an_rpc].marshal_proc
@unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output)
server_opts = {
:server_override => @server,
:completion_queue_override => @server_queue,
:poll_period => 1
server_override: @server,
completion_queue_override: @server_queue,
poll_period: 1
}
@srv = RpcServer.new(**server_opts)
end
describe 'when running' do
it 'should return NOT_FOUND status for requests on unknown methods' do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
blk = Proc.new do
blk = proc do
cq = GRPC::Core::CompletionQueue.new
stub = GRPC::ClientStub.new(@host, cq, **@client_opts)
stub.request_response('/unknown', req, @marshal, @unmarshal)
@ -352,20 +346,19 @@ describe GRPC::RpcServer do
req = EchoMsg.new
n = 5 # arbitrary
stub = EchoStub.new(@host, **@client_opts)
n.times { |x| expect(stub.an_rpc(req)).to be_a(EchoMsg) }
n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) }
@srv.stop
t.join
end
it 'should obtain responses for multiple parallel requests' do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
Thread.new { @srv.run }
@srv.wait_till_running
req, q = EchoMsg.new, Queue.new
n = 5 # arbitrary
threads = []
n.times do |x|
cq = GRPC::Core::CompletionQueue.new
n.times do
threads << Thread.new do
stub = EchoStub.new(@host, **@client_opts)
q << stub.an_rpc(req)
@ -373,44 +366,40 @@ describe GRPC::RpcServer do
end
n.times { expect(q.pop).to be_a(EchoMsg) }
@srv.stop
threads.each { |t| t.join }
threads.each(&:join)
end
it 'should return UNAVAILABLE status if there too many jobs' do
opts = {
:a_channel_arg => 'an_arg',
:server_override => @server,
:completion_queue_override => @server_queue,
:pool_size => 1,
:poll_period => 1,
:max_waiting_requests => 0
a_channel_arg: 'an_arg',
server_override: @server,
completion_queue_override: @server_queue,
pool_size: 1,
poll_period: 1,
max_waiting_requests: 0
}
alt_srv = RpcServer.new(**opts)
alt_srv.handle(SlowService)
t = Thread.new { alt_srv.run }
Thread.new { alt_srv.run }
alt_srv.wait_till_running
req = EchoMsg.new
n = 5 # arbitrary, use as many to ensure the server pool is exceeded
threads = []
_1_failed_as_unavailable = false
n.times do |x|
one_failed_as_unavailable = false
n.times do
threads << Thread.new do
cq = GRPC::Core::CompletionQueue.new
stub = SlowStub.new(@host, **@client_opts)
begin
stub.an_rpc(req)
rescue GRPC::BadStatus => e
_1_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE
one_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE
end
end
end
threads.each { |t| t.join }
threads.each(&:join)
alt_srv.stop
expect(_1_failed_as_unavailable).to be(true)
expect(one_failed_as_unavailable).to be(true)
end
end
end
end

@ -31,23 +31,24 @@ require 'grpc'
require 'grpc/generic/rpc_desc'
require 'grpc/generic/service'
# A test message that encodes/decodes using marshal/marshal.
class GoodMsg
def self.marshal(o)
def self.marshal(_o)
''
end
def self.unmarshal(o)
def self.unmarshal(_o)
GoodMsg.new
end
end
# A test message that encodes/decodes using encode/decode.
class EncodeDecodeMsg
def self.encode(o)
def self.encode(_o)
''
end
def self.decode(o)
def self.decode(_o)
GoodMsg.new
end
end
@ -55,7 +56,6 @@ end
GenericService = GRPC::GenericService
Dsl = GenericService::Dsl
describe 'String#underscore' do
it 'should convert CamelCase to underscore separated' do
expect('AnRPC'.underscore).to eq('an_rpc')
@ -66,20 +66,14 @@ describe 'String#underscore' do
end
describe Dsl do
it 'can be included in new classes' do
blk = Proc.new do
c = Class.new { include Dsl }
end
blk = proc { Class.new { include Dsl } }
expect(&blk).to_not raise_error
end
end
describe GenericService do
describe 'including it' do
it 'adds a class method, rpc' do
c = Class.new do
include GenericService
@ -144,9 +138,8 @@ describe GenericService do
end
describe '#include' do
it 'raises if #rpc is missing an arg' do
blk = Proc.new do
blk = proc do
Class.new do
include GenericService
rpc :AnRpc, GoodMsg
@ -154,7 +147,7 @@ describe GenericService do
end
expect(&blk).to raise_error ArgumentError
blk = Proc.new do
blk = proc do
Class.new do
include GenericService
rpc :AnRpc
@ -164,9 +157,8 @@ describe GenericService do
end
describe 'when #rpc args are incorrect' do
it 'raises if an arg does not have the marshal or unmarshal methods' do
blk = Proc.new do
blk = proc do
Class.new do
include GenericService
rpc :AnRpc, GoodMsg, Object
@ -176,13 +168,14 @@ describe GenericService do
end
it 'raises if a type arg only has the marshal method' do
# a bad message type with only a marshal method
class OnlyMarshal
def marshal(o)
o
end
end
blk = Proc.new do
blk = proc do
Class.new do
include GenericService
rpc :AnRpc, OnlyMarshal, GoodMsg
@ -192,12 +185,13 @@ describe GenericService do
end
it 'raises if a type arg only has the unmarshal method' do
# a bad message type with only an unmarshal method
class OnlyUnmarshal
def self.ummarshal(o)
o
end
end
blk = Proc.new do
blk = proc do
Class.new do
include GenericService
rpc :AnRpc, GoodMsg, OnlyUnmarshal
@ -208,7 +202,7 @@ describe GenericService do
end
it 'is ok for services that expect the default {un,}marshal methods' do
blk = Proc.new do
blk = proc do
Class.new do
include GenericService
rpc :AnRpc, GoodMsg, GoodMsg
@ -218,7 +212,7 @@ describe GenericService do
end
it 'is ok for services that override the default {un,}marshal methods' do
blk = Proc.new do
blk = proc do
Class.new do
include GenericService
self.marshal_class_method = :encode
@ -228,11 +222,9 @@ describe GenericService do
end
expect(&blk).not_to raise_error
end
end
describe '#rpc_stub_class' do
it 'generates a client class that defines any of the rpc methods' do
s = Class.new do
include GenericService
@ -249,7 +241,6 @@ describe GenericService do
end
describe 'the generated instances' do
it 'can be instanciated with just a hostname' do
s = Class.new do
include GenericService
@ -277,13 +268,10 @@ describe GenericService do
expect(o.methods).to include(:a_client_streamer)
expect(o.methods).to include(:a_bidi_streamer)
end
end
end
describe '#assert_rpc_descs_have_methods' do
it 'fails if there is no instance method for an rpc descriptor' do
c1 = Class.new do
include GenericService
@ -310,16 +298,16 @@ describe GenericService do
rpc :AClientStreamer, stream(GoodMsg), GoodMsg
rpc :ABidiStreamer, stream(GoodMsg), stream(GoodMsg)
def an_rpc(req, call)
def an_rpc(_req, _call)
end
def a_server_streamer(req, call)
def a_server_streamer(_req, _call)
end
def a_client_streamer(call)
def a_client_streamer(_call)
end
def a_bidi_streamer(call)
def a_bidi_streamer(_call)
end
end
expect { c.assert_rpc_descs_have_methods }.to_not raise_error
@ -330,7 +318,7 @@ describe GenericService do
include GenericService
rpc :AnRpc, GoodMsg, GoodMsg
def an_rpc(req, call)
def an_rpc(_req, _call)
end
end
c = Class.new(base)
@ -344,13 +332,11 @@ describe GenericService do
rpc :AnRpc, GoodMsg, GoodMsg
end
c = Class.new(base) do
def an_rpc(req, call)
def an_rpc(_req, _call)
end
end
expect { c.assert_rpc_descs_have_methods }.to_not raise_error
expect(c.include?(GenericService)).to be(true)
end
end
end

@ -30,7 +30,6 @@
require 'grpc'
describe GRPC::Core::Metadata do
describe '#new' do
it 'should create instances' do
expect { GRPC::Core::Metadata.new('a key', 'a value') }.to_not raise_error
@ -62,5 +61,4 @@ describe GRPC::Core::Metadata do
expect(md.dup.value).to eq('a value')
end
end
end

@ -32,7 +32,7 @@ require 'socket'
# @param [Fixnum] the minimum port number to accept
# @param [Fixnum] the maximum port number to accept
# @return [Fixnum ]a free tcp port
def find_unused_tcp_port(min=32768, max=60000)
def find_unused_tcp_port(min = 32_768, max = 60_000)
# Allow the system to assign a port, by specifying 0.
# Loop until a port is assigned in the required range
loop do
@ -40,6 +40,6 @@ def find_unused_tcp_port(min=32768, max=60000)
socket.bind(Addrinfo.tcp('127.0.0.1', 0))
p = socket.local_address.ip_port
socket.close
return p if p > min and p < 60000
return p if p > min && p < max
end
end

@ -35,13 +35,10 @@ def load_test_certs
files.map { |f| File.open(File.join(test_root, f)).read }
end
describe GRPC::Core::ServerCredentials do
Creds = GRPC::Core::ServerCredentials
describe '#new' do
it 'can be constructed from a fake CA PEM, server PEM and a server key' do
expect { Creds.new('a', 'b', 'c') }.not_to raise_error
end
@ -53,22 +50,20 @@ describe GRPC::Core::ServerCredentials do
it 'cannot be constructed without a server cert chain' do
root_cert, server_key, _ = load_test_certs
blk = Proc.new { Creds.new(root_cert, server_key, nil) }
blk = proc { Creds.new(root_cert, server_key, nil) }
expect(&blk).to raise_error
end
it 'cannot be constructed without a server key' do
root_cert, server_key, _ = load_test_certs
blk = Proc.new { Creds.new(root_cert, _, cert_chain) }
root_cert, _, _ = load_test_certs
blk = proc { Creds.new(root_cert, nil, cert_chain) }
expect(&blk).to raise_error
end
it 'can be constructed without a root_cret' do
_, server_key, cert_chain = load_test_certs
blk = Proc.new { Creds.new(_, server_key, cert_chain) }
blk = proc { Creds.new(nil, server_key, cert_chain) }
expect(&blk).to_not raise_error
end
end
end

@ -39,7 +39,6 @@ end
Server = GRPC::Core::Server
describe Server do
def create_test_cert
GRPC::Core::ServerCredentials.new(*load_test_certs)
end
@ -49,11 +48,8 @@ describe Server do
end
describe '#start' do
it 'runs without failing' do
blk = Proc.new do
s = Server.new(@cq, nil).start
end
blk = proc { Server.new(@cq, nil).start }
expect(&blk).to_not raise_error
end
@ -62,20 +58,19 @@ describe Server do
s.close
expect { s.start }.to raise_error(RuntimeError)
end
end
describe '#destroy' do
it 'destroys a server ok' do
s = start_a_server
blk = Proc.new { s.destroy }
blk = proc { s.destroy }
expect(&blk).to_not raise_error
end
it 'can be called more than once without error' do
s = start_a_server
begin
blk = Proc.new { s.destroy }
blk = proc { s.destroy }
expect(&blk).to_not raise_error
blk.call
expect(&blk).to_not raise_error
@ -89,7 +84,7 @@ describe Server do
it 'closes a server ok' do
s = start_a_server
begin
blk = Proc.new { s.close }
blk = proc { s.close }
expect(&blk).to_not raise_error
ensure
s.close
@ -98,7 +93,7 @@ describe Server do
it 'can be called more than once without error' do
s = start_a_server
blk = Proc.new { s.close }
blk = proc { s.close }
expect(&blk).to_not raise_error
blk.call
expect(&blk).to_not raise_error
@ -106,11 +101,9 @@ describe Server do
end
describe '#add_http_port' do
describe 'for insecure servers' do
it 'runs without failing' do
blk = Proc.new do
blk = proc do
s = Server.new(@cq, nil)
s.add_http2_port('localhost:0')
s.close
@ -123,13 +116,11 @@ describe Server do
s.close
expect { s.add_http2_port('localhost:0') }.to raise_error(RuntimeError)
end
end
describe 'for secure servers' do
it 'runs without failing' do
blk = Proc.new do
blk = proc do
s = Server.new(@cq, nil)
s.add_http2_port('localhost:0', true)
s.close
@ -140,16 +131,13 @@ describe Server do
it 'fails if the server is closed' do
s = Server.new(@cq, nil)
s.close
blk = Proc.new { s.add_http2_port('localhost:0', true) }
blk = proc { s.add_http2_port('localhost:0', true) }
expect(&blk).to raise_error(RuntimeError)
end
end
end
shared_examples '#new' do
it 'takes a completion queue with nil channel args' do
expect { Server.new(@cq, nil, create_test_cert) }.to_not raise_error
end
@ -162,14 +150,14 @@ describe Server do
end
it 'does not take a hash with bad values as channel args' do
blk = construct_with_args(:symbol => Object.new)
blk = construct_with_args(symbol: Object.new)
expect(&blk).to raise_error TypeError
blk = construct_with_args('1' => Hash.new)
expect(&blk).to raise_error TypeError
end
it 'can take a hash with a symbol key as channel args' do
blk = construct_with_args(:a_symbol => 1)
blk = construct_with_args(a_symbol: 1)
expect(&blk).to_not raise_error
end
@ -179,17 +167,17 @@ describe Server do
end
it 'can take a hash with a string value as channel args' do
blk = construct_with_args(:a_symbol => '1')
blk = construct_with_args(a_symbol: '1')
expect(&blk).to_not raise_error
end
it 'can take a hash with a symbol value as channel args' do
blk = construct_with_args(:a_symbol => :another_symbol)
blk = construct_with_args(a_symbol: :another_symbol)
expect(&blk).to_not raise_error
end
it 'can take a hash with a numeric value as channel args' do
blk = construct_with_args(:a_symbol => 1)
blk = construct_with_args(a_symbol: 1)
expect(&blk).to_not raise_error
end
@ -198,27 +186,22 @@ describe Server do
blk = construct_with_args(args)
expect(&blk).to_not raise_error
end
end
describe '#new with an insecure channel' do
def construct_with_args(a)
Proc.new { Server.new(@cq, a) }
proc { Server.new(@cq, a) }
end
it_behaves_like '#new'
end
describe '#new with a secure channel' do
def construct_with_args(a)
Proc.new { Server.new(@cq, a, create_test_cert) }
proc { Server.new(@cq, a, create_test_cert) }
end
it_behaves_like '#new'
end
def start_a_server
@ -229,5 +212,4 @@ describe Server do
s.start
s
end
end

@ -32,7 +32,6 @@ require 'grpc'
TimeConsts = GRPC::Core::TimeConsts
describe TimeConsts do
before(:each) do
@known_consts = [:ZERO, :INFINITE_FUTURE, :INFINITE_PAST].sort
end
@ -49,11 +48,9 @@ describe TimeConsts do
end
end
end
end
describe '#from_relative_time' do
it 'cannot handle arbitrary objects' do
expect { TimeConsts.from_relative_time(Object.new) }.to raise_error
end
@ -89,5 +86,4 @@ describe '#from_relative_time' do
expect(abs.to_f).to be_within(epsilon).of(want.to_f)
end
end
end

Loading…
Cancel
Save