Breaks API - switches timeout to a keyword arg.

- timeout has been a default arg till now

- this switches it to a keyword arg with the same behavior

- in addition, it adds deadline as distinct keyword arg, allowing users
  the choice of the idiomatic(timeout) or the aligned(deadline)
pull/2937/head
Tim Emiola 9 years ago
parent 685674c554
commit 49b7650eaf
  1. 9
      src/ruby/lib/grpc/generic/active_call.rb
  2. 4
      src/ruby/lib/grpc/generic/bidi_call.rb
  3. 51
      src/ruby/lib/grpc/generic/client_stub.rb
  4. 18
      src/ruby/lib/grpc/generic/service.rb
  5. 29
      src/ruby/spec/generic/active_call_spec.rb
  6. 9
      src/ruby/spec/generic/rpc_server_spec.rb

@ -74,8 +74,7 @@ module GRPC
# #
# @param call [Call] a call on which to start and invocation # @param call [Call] a call on which to start and invocation
# @param q [CompletionQueue] the completion queue # @param q [CompletionQueue] the completion queue
# @param deadline [Fixnum,TimeSpec] the deadline def self.client_invoke(call, q, **kw)
def self.client_invoke(call, q, _deadline, **kw)
fail(TypeError, '!Core::Call') unless call.is_a? Core::Call fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue unless q.is_a? Core::CompletionQueue
fail(TypeError, '!Core::CompletionQueue') fail(TypeError, '!Core::CompletionQueue')
@ -418,7 +417,7 @@ module GRPC
# @return [Enumerator, nil] a response Enumerator # @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk) def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd = BidiCall.new(@call, @cq, @marshal, @unmarshal)
bd.run_on_client(requests, @op_notifier, &blk) bd.run_on_client(requests, @op_notifier, &blk)
end end
@ -434,7 +433,7 @@ module GRPC
# #
# @param gen_each_reply [Proc] generates the BiDi stream replies # @param gen_each_reply [Proc] generates the BiDi stream replies
def run_server_bidi(gen_each_reply) def run_server_bidi(gen_each_reply)
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd = BidiCall.new(@call, @cq, @marshal, @unmarshal)
bd.run_on_server(gen_each_reply) bd.run_on_server(gen_each_reply)
end end
@ -456,7 +455,7 @@ module GRPC
# Starts the call if not already started # Starts the call if not already started
def start_call(**kw) def start_call(**kw)
return if @started return if @started
@metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) @metadata_tag = ActiveCall.client_invoke(@call, @cq, **kw)
@started = true @started = true
end end

@ -56,15 +56,13 @@ module GRPC
# the call # the call
# @param marshal [Function] f(obj)->string that marshal requests # @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete def initialize(call, q, marshal, unmarshal)
def initialize(call, q, marshal, unmarshal, deadline)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue') fail(ArgumentError, 'not a CompletionQueue')
end end
@call = call @call = call
@cq = q @cq = q
@deadline = deadline
@marshal = marshal @marshal = marshal
@op_notifier = nil # signals completion on clients @op_notifier = nil # signals completion on clients
@readq = Queue.new @readq = Queue.new

@ -161,15 +161,21 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshals requests # @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds # @param timeout [Numeric] (optional) the max completion time in seconds
# @param deadline [Time] (optional) the time the request should complete
# @param parent [Core::Call] a prior call whose reserved metadata # @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one. # will be propagated by this one.
# @param return_op [true|false] return an Operation if true # @param return_op [true|false] return an Operation if true
# @return [Object] the response received from the server # @return [Object] the response received from the server
def request_response(method, req, marshal, unmarshal, timeout = nil, def request_response(method, req, marshal, unmarshal,
deadline: nil,
timeout: nil,
return_op: false, return_op: false,
parent: parent, parent: parent,
**kw) **kw)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) c = new_active_call(method, marshal, unmarshal,
deadline: deadline,
timeout: timeout,
parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.request_response(req, **md) unless return_op return c.request_response(req, **md) unless return_op
@ -222,16 +228,22 @@ module GRPC
# @param requests [Object] an Enumerable of requests to send # @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests # @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] the max completion time in seconds # @param timeout [Numeric] (optional) the max completion time in seconds
# @param deadline [Time] (optional) the time the request should complete
# @param return_op [true|false] return an Operation if true # @param return_op [true|false] return an Operation if true
# @param parent [Core::Call] a prior call whose reserved metadata # @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one. # will be propagated by this one.
# @return [Object|Operation] the response received from the server # @return [Object|Operation] the response received from the server
def client_streamer(method, requests, marshal, unmarshal, timeout = nil, def client_streamer(method, requests, marshal, unmarshal,
deadline: nil,
timeout: nil,
return_op: false, return_op: false,
parent: nil, parent: nil,
**kw) **kw)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) c = new_active_call(method, marshal, unmarshal,
deadline: deadline,
timeout: timeout,
parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.client_streamer(requests, **md) unless return_op return c.client_streamer(requests, **md) unless return_op
@ -292,18 +304,24 @@ module GRPC
# @param req [Object] the request sent to the server # @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests # @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] the max completion time in seconds # @param timeout [Numeric] (optional) the max completion time in seconds
# @param deadline [Time] (optional) the time the request should complete
# @param return_op [true|false]return an Operation if true # @param return_op [true|false]return an Operation if true
# @param parent [Core::Call] a prior call whose reserved metadata # @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one. # will be propagated by this one.
# @param blk [Block] when provided, is executed for each response # @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above # @return [Enumerator|Operation|nil] as discussed above
def server_streamer(method, req, marshal, unmarshal, timeout = nil, def server_streamer(method, req, marshal, unmarshal,
deadline: nil,
timeout: nil,
return_op: false, return_op: false,
parent: nil, parent: nil,
**kw, **kw,
&blk) &blk)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) c = new_active_call(method, marshal, unmarshal,
deadline: deadline,
timeout: timeout,
parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.server_streamer(req, **md, &blk) unless return_op return c.server_streamer(req, **md, &blk) unless return_op
@ -404,17 +422,23 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshals requests # @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds # @param timeout [Numeric] (optional) the max completion time in seconds
# @param deadline [Time] (optional) the time the request should complete
# @param parent [Core::Call] a prior call whose reserved metadata # @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one. # will be propagated by this one.
# @param return_op [true|false] return an Operation if true # @param return_op [true|false] return an Operation if true
# @param blk [Block] when provided, is executed for each response # @param blk [Block] when provided, is executed for each response
# @return [Enumerator|nil|Operation] as discussed above # @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil, def bidi_streamer(method, requests, marshal, unmarshal,
deadline: nil,
timeout: nil,
return_op: false, return_op: false,
parent: nil, parent: nil,
**kw, **kw,
&blk) &blk)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) c = new_active_call(method, marshal, unmarshal,
deadline: deadline,
timeout: timeout,
parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.bidi_streamer(requests, **md, &blk) unless return_op return c.bidi_streamer(requests, **md, &blk) unless return_op
@ -438,8 +462,13 @@ module GRPC
# @param parent [Grpc::Call] a parent call, available when calls are # @param parent [Grpc::Call] a parent call, available when calls are
# made from server # made from server
# @param timeout [TimeConst] # @param timeout [TimeConst]
def new_active_call(method, marshal, unmarshal, timeout = nil, parent: nil) def new_active_call(method, marshal, unmarshal,
deadline: nil,
timeout: nil,
parent: nil)
if deadline.nil?
deadline = from_relative_time(timeout.nil? ? @timeout : timeout) deadline = from_relative_time(timeout.nil? ? @timeout : timeout)
end
call = @ch.create_call(@queue, call = @ch.create_call(@queue,
parent, # parent call parent, # parent call
@propagate_mask, # propagation options @propagate_mask, # propagation options

@ -174,26 +174,24 @@ module GRPC
unmarshal = desc.unmarshal_proc(:output) unmarshal = desc.unmarshal_proc(:output)
route = "/#{route_prefix}/#{name}" route = "/#{route_prefix}/#{name}"
if desc.request_response? if desc.request_response?
define_method(mth_name) do |req, deadline = nil, **kw| define_method(mth_name) do |req, **kw|
GRPC.logger.debug("calling #{@host}:#{route}") GRPC.logger.debug("calling #{@host}:#{route}")
request_response(route, req, marshal, unmarshal, deadline, **kw) request_response(route, req, marshal, unmarshal, **kw)
end end
elsif desc.client_streamer? elsif desc.client_streamer?
define_method(mth_name) do |reqs, deadline = nil, **kw| define_method(mth_name) do |reqs, **kw|
GRPC.logger.debug("calling #{@host}:#{route}") GRPC.logger.debug("calling #{@host}:#{route}")
client_streamer(route, reqs, marshal, unmarshal, deadline, **kw) client_streamer(route, reqs, marshal, unmarshal, **kw)
end end
elsif desc.server_streamer? elsif desc.server_streamer?
define_method(mth_name) do |req, deadline = nil, **kw, &blk| define_method(mth_name) do |req, **kw, &blk|
GRPC.logger.debug("calling #{@host}:#{route}") GRPC.logger.debug("calling #{@host}:#{route}")
server_streamer(route, req, marshal, unmarshal, deadline, **kw, server_streamer(route, req, marshal, unmarshal, **kw, &blk)
&blk)
end end
else # is a bidi_stream else # is a bidi_stream
define_method(mth_name) do |reqs, deadline = nil, **kw, &blk| define_method(mth_name) do |reqs, **kw, &blk|
GRPC.logger.debug("calling #{@host}:#{route}") GRPC.logger.debug("calling #{@host}:#{route}")
bidi_streamer(route, reqs, marshal, unmarshal, deadline, **kw, bidi_streamer(route, reqs, marshal, unmarshal, **kw, &blk)
&blk)
end end
end end
end end

@ -57,7 +57,7 @@ describe GRPC::ActiveCall do
describe 'restricted view methods' do describe 'restricted view methods' do
before(:each) do before(:each) do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
@client_call = ActiveCall.new(call, @client_queue, @pass_through, @client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline, @pass_through, deadline,
metadata_tag: md_tag) metadata_tag: md_tag)
@ -87,7 +87,7 @@ describe GRPC::ActiveCall do
describe '#remote_send' do describe '#remote_send' do
it 'allows a client to send a payload to the server' do it 'allows a client to send a payload to the server' do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
@client_call = ActiveCall.new(call, @client_queue, @pass_through, @client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline, @pass_through, deadline,
metadata_tag: md_tag) metadata_tag: md_tag)
@ -111,7 +111,7 @@ describe GRPC::ActiveCall do
it 'marshals the payload using the marshal func' do it 'marshals the payload using the marshal func' do
call = make_test_call call = make_test_call
ActiveCall.client_invoke(call, @client_queue, deadline) ActiveCall.client_invoke(call, @client_queue)
marshal = proc { |x| 'marshalled:' + x } marshal = proc { |x| 'marshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, marshal, client_call = ActiveCall.new(call, @client_queue, marshal,
@pass_through, deadline) @pass_through, deadline)
@ -134,8 +134,7 @@ describe GRPC::ActiveCall do
describe '#client_invoke' do describe '#client_invoke' do
it 'sends keywords as metadata to the server when the are present' do it 'sends keywords as metadata to the server when the are present' do
call = make_test_call call = make_test_call
ActiveCall.client_invoke(call, @client_queue, deadline, ActiveCall.client_invoke(call, @client_queue, k1: 'v1', k2: 'v2')
k1: 'v1', k2: 'v2')
recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
recvd_call = recvd_rpc.call recvd_call = recvd_rpc.call
expect(recvd_call).to_not be_nil expect(recvd_call).to_not be_nil
@ -148,7 +147,7 @@ describe GRPC::ActiveCall do
describe '#remote_read' do describe '#remote_read' do
it 'reads the response sent by a server' do it 'reads the response sent by a server' do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
client_call = ActiveCall.new(call, @client_queue, @pass_through, client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline, @pass_through, deadline,
metadata_tag: md_tag) metadata_tag: md_tag)
@ -161,7 +160,7 @@ describe GRPC::ActiveCall do
it 'saves no metadata when the server adds no metadata' do it 'saves no metadata when the server adds no metadata' do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
client_call = ActiveCall.new(call, @client_queue, @pass_through, client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline, @pass_through, deadline,
metadata_tag: md_tag) metadata_tag: md_tag)
@ -176,7 +175,7 @@ describe GRPC::ActiveCall do
it 'saves metadata add by the server' do it 'saves metadata add by the server' do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
client_call = ActiveCall.new(call, @client_queue, @pass_through, client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline, @pass_through, deadline,
metadata_tag: md_tag) metadata_tag: md_tag)
@ -192,7 +191,7 @@ describe GRPC::ActiveCall do
it 'get a nil msg before a status when an OK status is sent' do it 'get a nil msg before a status when an OK status is sent' do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
client_call = ActiveCall.new(call, @client_queue, @pass_through, client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline, @pass_through, deadline,
metadata_tag: md_tag) metadata_tag: md_tag)
@ -209,7 +208,7 @@ describe GRPC::ActiveCall do
it 'unmarshals the response using the unmarshal func' do it 'unmarshals the response using the unmarshal func' do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
unmarshal = proc { |x| 'unmarshalled:' + x } unmarshal = proc { |x| 'unmarshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, @pass_through, client_call = ActiveCall.new(call, @client_queue, @pass_through,
unmarshal, deadline, unmarshal, deadline,
@ -234,7 +233,7 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that can read n responses' do it 'the returns an enumerator that can read n responses' do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
client_call = ActiveCall.new(call, @client_queue, @pass_through, client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline, @pass_through, deadline,
metadata_tag: md_tag) metadata_tag: md_tag)
@ -252,7 +251,7 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that stops after an OK Status' do it 'the returns an enumerator that stops after an OK Status' do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
client_call = ActiveCall.new(call, @client_queue, @pass_through, client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline, @pass_through, deadline,
metadata_tag: md_tag) metadata_tag: md_tag)
@ -275,7 +274,7 @@ describe GRPC::ActiveCall do
describe '#writes_done' do describe '#writes_done' do
it 'finishes ok if the server sends a status response' do it 'finishes ok if the server sends a status response' do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
client_call = ActiveCall.new(call, @client_queue, @pass_through, client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline, @pass_through, deadline,
metadata_tag: md_tag) metadata_tag: md_tag)
@ -291,7 +290,7 @@ describe GRPC::ActiveCall do
it 'finishes ok if the server sends an early status response' do it 'finishes ok if the server sends an early status response' do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
client_call = ActiveCall.new(call, @client_queue, @pass_through, client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline, @pass_through, deadline,
metadata_tag: md_tag) metadata_tag: md_tag)
@ -307,7 +306,7 @@ describe GRPC::ActiveCall do
it 'finishes ok if writes_done is true' do it 'finishes ok if writes_done is true' do
call = make_test_call call = make_test_call
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) md_tag = ActiveCall.client_invoke(call, @client_queue)
client_call = ActiveCall.new(call, @client_queue, @pass_through, client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline, @pass_through, deadline,
metadata_tag: md_tag) metadata_tag: md_tag)

@ -396,8 +396,9 @@ describe GRPC::RpcServer do
@srv.wait_till_running @srv.wait_till_running
req = EchoMsg.new req = EchoMsg.new
stub = SlowStub.new(@host, **client_opts) stub = SlowStub.new(@host, **client_opts)
deadline = service.delay + 1.0 # wait for long enough timeout = service.delay + 1.0 # wait for long enough
expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) resp = stub.an_rpc(req, timeout: timeout, k1: 'v1', k2: 'v2')
expect(resp).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
check_md(wanted_md, service.received_md) check_md(wanted_md, service.received_md)
@srv.stop @srv.stop
@ -411,8 +412,8 @@ describe GRPC::RpcServer do
@srv.wait_till_running @srv.wait_till_running
req = EchoMsg.new req = EchoMsg.new
stub = SlowStub.new(@host, **client_opts) stub = SlowStub.new(@host, **client_opts)
deadline = 0.1 # too short for SlowService to respond timeout = 0.1 # too short for SlowService to respond
blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') } blk = proc { stub.an_rpc(req, timeout: timeout, k1: 'v1', k2: 'v2') }
expect(&blk).to raise_error GRPC::BadStatus expect(&blk).to raise_error GRPC::BadStatus
wanted_md = [] wanted_md = []
expect(service.received_md).to eq(wanted_md) expect(service.received_md).to eq(wanted_md)

Loading…
Cancel
Save