Updates BidiCall/ClientStub to the new Call API

pull/1188/head
Tim Emiola 10 years ago
parent b22a21ebe7
commit f90ce677b3
  1. 69
      src/ruby/lib/grpc/generic/bidi_call.rb
  2. 50
      src/ruby/lib/grpc/generic/client_stub.rb
  3. 136
      src/ruby/spec/generic/client_stub_spec.rb

@ -30,18 +30,12 @@
require 'forwardable'
require 'grpc/grpc'
def assert_event_type(ev, want)
fail OutOfTime if ev.nil?
got = ev.type
fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want
end
# GRPC contains the General RPC module.
module GRPC
# The BiDiCall class orchestrates exection of a BiDi stream on a client or
# server.
class BidiCall
include Core::CompletionType
include Core::CallOps
include Core::StatusCodes
include Core::TimeConsts
@ -63,8 +57,7 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag,
def initialize(call, q, marshal, unmarshal, deadline, finished_tag)
def initialize(call, q, marshal, unmarshal, deadline)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
@ -72,7 +65,6 @@ module GRPC
@call = call
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@marshal = marshal
@readq = Queue.new
@unmarshal = unmarshal
@ -146,30 +138,14 @@ module GRPC
requests.each do |req|
count += 1
payload = @marshal.call(req)
@call.start_write(Core::ByteBuffer.new(payload), write_tag)
ev = @cq.pluck(write_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, WRITE_ACCEPTED)
ensure
ev.close
end
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
if is_client
@call.writes_done(write_tag)
ev = @cq.pluck(write_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
ensure
ev.close
end
logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISHED)
ensure
ev.close
end
logger.debug('bidi-client: finished received')
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil,
RECV_STATUS_ON_CLIENT => nil)
end
rescue StandardError => e
logger.warn('bidi: write_loop failed')
@ -189,25 +165,20 @@ module GRPC
loop do
logger.debug("waiting for read #{count}")
count += 1
@call.start_read(read_tag)
ev = @cq.pluck(read_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, READ)
# handle the next event.
if ev.result.nil?
@readq.push(END_OF_READS)
logger.debug('done reading!')
break
end
# push the latest read onto the queue and continue reading
logger.debug("received req: #{ev.result}")
res = @unmarshal.call(ev.result.to_s)
@readq.push(res)
ensure
ev.close
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
RECV_MESSAGE => nil)
# handle the next message
if batch_result.message.nil?
@readq.push(END_OF_READS)
logger.debug('done reading!')
break
end
# push the latest read onto the queue and continue reading
logger.debug("received req: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
@readq.push(res)
end
rescue StandardError => e

@ -35,9 +35,10 @@ module GRPC
# ClientStub represents an endpoint used to send requests to GRPC servers.
class ClientStub
include Core::StatusCodes
include Core::TimeConsts
# Default deadline is 5 seconds.
DEFAULT_DEADLINE = 5
# Default timeout is 5 seconds.
DEFAULT_TIMEOUT = 5
# setup_channel is used by #initialize to constuct a channel from its
# arguments.
@ -76,8 +77,8 @@ module GRPC
# present the host and arbitrary keyword arg areignored, and the RPC
# connection uses this channel.
#
# - :deadline
# when present, this is the default deadline used for calls
# - :timeout
# when present, this is the default timeout used for calls
#
# - :update_metadata
# when present, this a func that takes a hash and returns a hash
@ -87,13 +88,13 @@ module GRPC
# @param host [String] the host the stub connects to
# @param q [Core::CompletionQueue] used to wait for events
# @param channel_override [Core::Channel] a pre-created channel
# @param deadline [Number] the default deadline to use in requests
# @param timeout [Number] the default timeout to use in requests
# @param creds [Core::Credentials] the channel
# @param update_metadata a func that updates metadata as described above
# @param kw [KeywordArgs]the channel arguments
def initialize(host, q,
channel_override: nil,
deadline: DEFAULT_DEADLINE,
timeout: nil,
creds: nil,
update_metadata: nil,
**kw)
@ -103,7 +104,7 @@ module GRPC
@update_metadata = ClientStub.check_update_metadata(update_metadata)
alt_host = kw[Core::Channel::SSL_TARGET]
@host = alt_host.nil? ? host : alt_host
@deadline = deadline
@timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout
end
# request_response sends a request to a GRPC server, and returns the
@ -140,12 +141,12 @@ module GRPC
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] (optional) the max completion time in seconds
# @param timeout [Numeric] (optional) the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @return [Object] the response received from the server
def request_response(method, req, marshal, unmarshal, deadline = nil,
def request_response(method, req, marshal, unmarshal, timeout = nil,
return_op: false, **kw)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
c = new_active_call(method, marshal, unmarshal, timeout)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.request_response(req, **md) unless return_op
@ -197,12 +198,12 @@ module GRPC
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] the max completion time in seconds
# @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @return [Object|Operation] the response received from the server
def client_streamer(method, requests, marshal, unmarshal, deadline = nil,
def client_streamer(method, requests, marshal, unmarshal, timeout = nil,
return_op: false, **kw)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
c = new_active_call(method, marshal, unmarshal, timeout)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.client_streamer(requests, **md) unless return_op
@ -262,13 +263,13 @@ module GRPC
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] the max completion time in seconds
# @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false]return an Operation if true
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above
def server_streamer(method, req, marshal, unmarshal, deadline = nil,
def server_streamer(method, req, marshal, unmarshal, timeout = nil,
return_op: false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
c = new_active_call(method, marshal, unmarshal, timeout)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.server_streamer(req, **md, &blk) unless return_op
@ -367,13 +368,13 @@ module GRPC
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] (optional) the max completion time in seconds
# @param timeout [Numeric] (optional) the max completion time in seconds
# @param blk [Block] when provided, is executed for each response
# @param return_op [true|false] return an Operation if true
# @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil,
def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil,
return_op: false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
c = new_active_call(method, marshal, unmarshal, timeout)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.bidi_streamer(requests, **md, &blk) unless return_op
@ -393,12 +394,11 @@ module GRPC
# @param method [string] the method being called.
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [TimeConst]
def new_active_call(method, marshal, unmarshal, deadline = nil)
absolute_deadline = Core::TimeConsts.from_relative_time(deadline)
call = @ch.create_call(@queue, method, @host, absolute_deadline)
ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
started: false)
# @param timeout [TimeConst]
def new_active_call(method, marshal, unmarshal, timeout = nil)
deadline = from_relative_time(timeout.nil? ? @timeout : timeout)
call = @ch.create_call(@queue, method, @host, deadline)
ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false)
end
end
end

@ -30,15 +30,41 @@
require 'grpc'
require 'xray/thread_dump_signal_handler'
NOOP = proc { |x| x }
FAKE_HOST = 'localhost:0'
# Notifier is useful high-level synchronization primitive.
class Notifier
attr_reader :payload, :notified
alias_method :notified?, :notified
def initialize
@mutex = Mutex.new
@cvar = ConditionVariable.new
@notified = false
@payload = nil
end
def wait
@mutex.synchronize do
@cvar.wait(@mutex) until notified?
end
end
def notify(payload)
@mutex.synchronize do
return Error.new('already notified') if notified?
@payload = payload
@notified = true
@cvar.signal
return nil
end
end
end
def wakey_thread(&blk)
awake_mutex, awake_cond = Mutex.new, ConditionVariable.new
n = Notifier.new
t = Thread.new do
blk.call(awake_mutex, awake_cond)
blk.call(n)
end
awake_mutex.synchronize { awake_cond.wait(awake_mutex) }
n.wait
t
end
@ -50,8 +76,11 @@ end
include GRPC::Core::StatusCodes
include GRPC::Core::TimeConsts
include GRPC::Core::CallOps
describe 'ClientStub' do
let(:noop) { proc { |x| x } }
before(:each) do
Thread.abort_on_exception = true
@server = nil
@ -66,61 +95,56 @@ describe 'ClientStub' do
end
describe '#new' do
let(:fake_host) { 'localhost:0' }
it 'can be created from a host and args' do
host = FAKE_HOST
opts = { a_channel_arg: 'an_arg' }
blk = proc do
GRPC::ClientStub.new(host, @cq, **opts)
GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).not_to raise_error
end
it 'can be created with a default deadline' do
host = FAKE_HOST
opts = { a_channel_arg: 'an_arg', deadline: 5 }
blk = proc do
GRPC::ClientStub.new(host, @cq, **opts)
GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).not_to raise_error
end
it 'can be created with an channel override' do
host = FAKE_HOST
opts = { a_channel_arg: 'an_arg', channel_override: @ch }
blk = proc do
GRPC::ClientStub.new(host, @cq, **opts)
GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).not_to raise_error
end
it 'cannot be created with a bad channel override' do
host = FAKE_HOST
blk = proc do
opts = { a_channel_arg: 'an_arg', channel_override: Object.new }
GRPC::ClientStub.new(host, @cq, **opts)
GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).to raise_error
end
it 'cannot be created with bad credentials' do
host = FAKE_HOST
blk = proc do
opts = { a_channel_arg: 'an_arg', creds: Object.new }
GRPC::ClientStub.new(host, @cq, **opts)
GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).to raise_error
end
it 'can be created with test test credentials' do
certs = load_test_certs
host = FAKE_HOST
blk = proc do
opts = {
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
a_channel_arg: 'an_arg',
creds: GRPC::Core::Credentials.new(certs[0], nil, nil)
}
GRPC::ClientStub.new(host, @cq, **opts)
GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).to_not raise_error
end
@ -187,7 +211,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_response(stub)
stub.request_response(@method, @sent_msg, NOOP, NOOP,
stub.request_response(@method, @sent_msg, noop, noop,
k1: 'v1', k2: 'v2')
end
@ -196,7 +220,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_response(stub)
op = stub.request_response(@method, @sent_msg, NOOP, NOOP,
op = stub.request_response(@method, @sent_msg, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute
@ -259,7 +283,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_response(stub)
stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
stub.client_streamer(@method, @sent_msgs, noop, noop,
k1: 'v1', k2: 'v2')
end
@ -268,7 +292,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_response(stub)
op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
op = stub.client_streamer(@method, @sent_msgs, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute
@ -333,7 +357,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_responses(stub)
e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
e = stub.server_streamer(@method, @sent_msg, noop, noop,
k1: 'v1', k2: 'v2')
expect(e).to be_a(Enumerator)
e
@ -344,7 +368,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_responses(stub)
op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
op = stub.server_streamer(@method, @sent_msg, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
@ -361,34 +385,30 @@ describe 'ClientStub' do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
server_port = create_test_server
@host = "localhost:#{server_port}"
end
it 'supports sending all the requests first', bidi: true do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
stub = GRPC::ClientStub.new(host, @cq)
stub = GRPC::ClientStub.new(@host, @cq)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@replys)
th.join
end
it 'supports client-initiated ping pong', bidi: true do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
stub = GRPC::ClientStub.new(host, @cq)
stub = GRPC::ClientStub.new(@host, @cq)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
it 'supports a server-initiated ping pong', bidi: true do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
stub = GRPC::ClientStub.new(host, @cq)
stub = GRPC::ClientStub.new(@host, @cq)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
@ -397,7 +417,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_responses(stub)
e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)
expect(e).to be_a(Enumerator)
e
end
@ -407,7 +427,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_responses(stub)
op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP,
op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
return_op: true)
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
@ -421,8 +441,8 @@ describe 'ClientStub' do
def run_server_streamer(expected_input, replys, status, **kw)
wanted_metadata = kw.clone
wakey_thread do |mtx, cnd|
c = expect_server_to_be_invoked(mtx, cnd)
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
@ -434,8 +454,8 @@ describe 'ClientStub' do
def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,
status)
wakey_thread do |mtx, cnd|
c = expect_server_to_be_invoked(mtx, cnd)
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
@ -443,8 +463,8 @@ describe 'ClientStub' do
end
def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)
wakey_thread do |mtx, cnd|
c = expect_server_to_be_invoked(mtx, cnd)
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
expected_inputs.each do |i|
if client_starts
expect(c.remote_read).to eq(i)
@ -460,8 +480,8 @@ describe 'ClientStub' do
def run_client_streamer(expected_inputs, resp, status, **kw)
wanted_metadata = kw.clone
wakey_thread do |mtx, cnd|
c = expect_server_to_be_invoked(mtx, cnd)
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
@ -473,8 +493,8 @@ describe 'ClientStub' do
def run_request_response(expected_input, resp, status, **kw)
wanted_metadata = kw.clone
wakey_thread do |mtx, cnd|
c = expect_server_to_be_invoked(mtx, cnd)
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
expect(c.remote_read).to eq(expected_input)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
@ -490,24 +510,16 @@ describe 'ClientStub' do
@server.add_http2_port('0.0.0.0:0')
end
def start_test_server(awake_mutex, awake_cond)
def expect_server_to_be_invoked(notifier)
@server.start
@server_tag = Object.new
@server.request_call(@server_tag)
awake_mutex.synchronize { awake_cond.signal }
end
def expect_server_to_be_invoked(awake_mutex, awake_cond)
start_test_server(awake_mutex, awake_cond)
ev = @server_queue.pluck(@server_tag, INFINITE_FUTURE)
fail OutOfTime if ev.nil?
server_call = ev.call
server_call.metadata = ev.result.metadata
finished_tag = Object.new
server_call.server_accept(@server_queue, finished_tag)
server_call.server_end_initial_metadata
GRPC::ActiveCall.new(server_call, @server_queue, NOOP, NOOP,
INFINITE_FUTURE,
finished_tag: finished_tag)
notifier.notify(nil)
server_tag = Object.new
recvd_rpc = @server.request_call(@server_queue, server_tag,
INFINITE_FUTURE)
recvd_call = recvd_rpc.call
recvd_call.metadata = recvd_rpc.metadata
recvd_call.run_batch(@server_queue, server_tag, Time.now + 2,
SEND_INITIAL_METADATA => nil)
GRPC::ActiveCall.new(recvd_call, @server_queue, noop, noop, INFINITE_FUTURE)
end
end

Loading…
Cancel
Save