Adds support for metadata to the surface APIs

- received metadata is visible on the ActiveCall object
- metadata to send is keyword args on ActiveCall objects

Also
- fixes a typo that meant that the wrong error error code might be returned
- fixes bad references in some tests that are only visible when run via rspec
	Change on 2014/12/10 by temiola <temiola@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=81811008
pull/1/merge
temiola 10 years ago committed by Michael Lumish
parent 9db8509c9a
commit 6919c7595b
  1. 5
      src/ruby/ext/grpc/rb_event.c
  2. 3
      src/ruby/ext/grpc/rb_metadata.c
  3. 74
      src/ruby/lib/grpc/generic/active_call.rb
  4. 81
      src/ruby/lib/grpc/generic/client_stub.rb
  5. 1
      src/ruby/lib/grpc/generic/rpc_server.rb
  6. 157
      src/ruby/spec/generic/active_call_spec.rb
  7. 313
      src/ruby/spec/generic/client_stub_spec.rb
  8. 7
      src/ruby/spec/generic/rpc_server_spec.rb

@ -205,7 +205,7 @@ static VALUE grpc_rb_event_result(VALUE self) {
return Qnil;
}
rb_raise(rb_eEventError, "write failed, not sure why (code=%d)",
event->data.invoke_accepted);
event->data.write_accepted);
break;
case GRPC_CLIENT_METADATA_READ:
@ -263,7 +263,8 @@ void Init_google_rpc_event() {
rb_define_method(rb_cEvent, "type", grpc_rb_event_type, 0);
/* Constants representing the completion types */
rb_mCompletionType = rb_define_module_under(rb_mGoogleRpcCore, "CompletionType");
rb_mCompletionType = rb_define_module_under(rb_mGoogleRpcCore,
"CompletionType");
rb_define_const(rb_mCompletionType, "QUEUE_SHUTDOWN",
INT2NUM(GRPC_QUEUE_SHUTDOWN));
rb_define_const(rb_mCompletionType, "READ", INT2NUM(GRPC_READ));

@ -189,7 +189,8 @@ static VALUE grpc_rb_metadata_value(VALUE self) {
/* rb_cMetadata is the Metadata class whose instances proxy grpc_metadata. */
VALUE rb_cMetadata = Qnil;
void Init_google_rpc_metadata() {
rb_cMetadata = rb_define_class_under(rb_mGoogleRpcCore, "Metadata", rb_cObject);
rb_cMetadata = rb_define_class_under(rb_mGoogleRpcCore, "Metadata",
rb_cObject);
/* Allocates an object managed by the ruby runtime */
rb_define_alloc_func(rb_cMetadata, grpc_rb_metadata_alloc);

@ -31,7 +31,9 @@ require 'forwardable'
require 'grpc'
require 'grpc/generic/bidi_call'
def assert_event_type(got, want)
def assert_event_type(ev, want)
raise OutOfTime if ev.nil?
got = ev.type
raise 'Unexpected rpc event: got %s, want %s' % [got, want] unless got == want
end
@ -52,21 +54,28 @@ module GRPC
#
# deadline is the absolute deadline for the call.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param call [Call] a call on which to start and invocation
# @param q [CompletionQueue] used to wait for INVOKE_ACCEPTED
# @param deadline [Fixnum,TimeSpec] the deadline for INVOKE_ACCEPTED
def self.client_start_invoke(call, q, deadline)
def self.client_start_invoke(call, q, deadline, **kw)
raise ArgumentError.new('not a call') unless call.is_a?Core::Call
if !q.is_a?Core::CompletionQueue
raise ArgumentError.new('not a CompletionQueue')
end
call.add_metadata(kw) if kw.length > 0
invoke_accepted, client_metadata_read = Object.new, Object.new
finished_tag = Object.new
call.start_invoke(q, invoke_accepted, client_metadata_read, finished_tag)
# wait for the invocation to be accepted
ev = q.pluck(invoke_accepted, INFINITE_FUTURE)
raise OutOfTime if ev.nil?
finished_tag
[finished_tag, client_metadata_read]
end
# Creates an ActiveCall.
@ -91,9 +100,11 @@ module GRPC
# @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag,
# if the call has begun
# @param read_metadata_tag [Object] the object used as the call's finish
# tag, if the call has begun
# @param started [true|false] (default true) indicates if the call has begun
def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
started: true)
read_metadata_tag: nil, started: true)
raise ArgumentError.new('not a call') unless call.is_a?Core::Call
if !q.is_a?Core::CompletionQueue
raise ArgumentError.new('not a CompletionQueue')
@ -102,6 +113,7 @@ module GRPC
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@read_metadata_tag = read_metadata_tag
@marshal = marshal
@started = started
@unmarshal = unmarshal
@ -180,7 +192,7 @@ module GRPC
def writes_done(assert_finished=true)
@call.writes_done(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev.type, FINISH_ACCEPTED)
assert_event_type(ev, FINISH_ACCEPTED)
logger.debug("Writes done: waiting for finish? #{assert_finished}")
if assert_finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
@ -229,7 +241,7 @@ module GRPC
# call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
# until the flow control allows another send on this call.
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev.type, WRITE_ACCEPTED)
assert_event_type(ev, WRITE_ACCEPTED)
ev = nil
end
@ -243,7 +255,7 @@ module GRPC
assert_queue_is_ready
@call.start_write_status(Core::Status.new(code, details), self)
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev.type, FINISH_ACCEPTED)
assert_event_type(ev, FINISH_ACCEPTED)
logger.debug("Status sent: #{code}:'#{details}'")
if assert_finished
return finished
@ -257,9 +269,16 @@ module GRPC
# a READ, it returns the response after unmarshalling it. On
# FINISHED, it returns nil if the status is OK, otherwise raising BadStatus
def remote_read
if @call.metadata.nil? && !@read_metadata_tag.nil?
ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
assert_event_type(ev, CLIENT_METADATA_READ)
@call.metadata = ev.result
@read_metadata_tag = nil
end
@call.start_read(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
assert_event_type(ev.type, READ)
assert_event_type(ev, READ)
logger.debug("received req: #{ev.result.inspect}")
if !ev.result.nil?
logger.debug("received req.to_s: #{ev.result.to_s}")
@ -333,10 +352,15 @@ module GRPC
# request_response sends a request to a GRPC server, and returns the
# response.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param req [Object] the request sent to the server
# @return [Object] the response received from the server
def request_response(req)
start_call unless @started
def request_response(req, **kw)
start_call(**kw) unless @started
remote_send(req)
writes_done(false)
response = remote_read
@ -354,10 +378,14 @@ module GRPC
# array of marshallable objects; in typical case it will be an Enumerable
# that allows dynamic construction of the marshallable objects.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @return [Object] the response received from the server
def client_streamer(requests)
start_call unless @started
def client_streamer(requests, **kw)
start_call(**kw) unless @started
requests.each { |r| remote_send(r) }
writes_done(false)
response = remote_read
@ -377,10 +405,15 @@ module GRPC
# it is executed with each response as the argument and no result is
# returned.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
# any keyword arguments are treated as metadata to be sent to the server.
#
# @param req [Object] the request sent to the server
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req)
start_call unless @started
def server_streamer(req, **kw)
start_call(**kw) unless @started
remote_send(req)
writes_done(false)
replies = enum_for(:each_remote_read_then_finish)
@ -410,10 +443,14 @@ module GRPC
# the_call#writes_done has been called, otherwise the block will loop
# forever.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, &blk)
start_call unless @started
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd.run_on_client(requests, &blk)
@ -438,8 +475,9 @@ module GRPC
private
def start_call
@finished_tag = ActiveCall.client_start_invoke(@call, @cq, @deadline)
def start_call(**kw)
tags = ActiveCall.client_start_invoke(@call, @cq, @deadline, **kw)
@finished_tag, @read_metadata_tag = tags
@started = true
end

@ -50,7 +50,7 @@ module GRPC
# Any arbitrary keyword arguments are treated as channel arguments used to
# configure the RPC connection to the host.
#
# There are two specific keywords are that not used to configure the
# There are some specific keyword args that are not used to configure the
# channel:
#
# - :channel_override
@ -61,21 +61,30 @@ module GRPC
# - :deadline
# when present, this is the default deadline used for calls
#
# - :update_metadata
# when present, this a func that takes a hash and returns a hash
# it can be used to update metadata, i.e, remove, change or update
# amend metadata values.
#
# @param host [String] the host the stub connects to
# @param q [Core::CompletionQueue] used to wait for events
# @param channel_override [Core::Channel] a pre-created channel
# @param deadline [Number] the default deadline to use in requests
# @param creds [Core::Credentials] secures and/or authenticates the channel
# @param kw [KeywordArgs] the channel arguments
# @param update_metadata a func that updates metadata as described above
# @param kw [KeywordArgs]the channel arguments
def initialize(host, q,
channel_override:nil,
deadline:DEFAULT_DEADLINE,
creds:nil,
deadline: DEFAULT_DEADLINE,
creds: nil,
update_metadata: nil,
**kw)
if !q.is_a?Core::CompletionQueue
raise ArgumentError.new('not a CompletionQueue')
end
@host = host
@queue = q
# set the channel instance
if !channel_override.nil?
ch = channel_override
raise ArgumentError.new('not a Channel') unless ch.is_a?(Core::Channel)
@ -86,10 +95,19 @@ module GRPC
else
ch = Core::Channel.new(host, kw, creds)
end
@ch = ch
@update_metadata = nil
if !update_metadata.nil?
if !update_metadata.is_a?(Proc)
raise ArgumentError.new('update_metadata is not a Proc')
end
@update_metadata = update_metadata
end
@host = host
@deadline = deadline
@ch = ch
@queue = q
end
# request_response sends a request to a GRPC server, and returns the
@ -117,6 +135,11 @@ module GRPC
# If return_op is true, the call returns an Operation, calling execute
# on the Operation returns the response.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
@ -125,15 +148,16 @@ module GRPC
# @param return_op [true|false] (default false) return an Operation if true
# @return [Object] the response received from the server
def request_response(method, req, marshal, unmarshal, deadline=nil,
return_op:false)
return_op:false, **kw)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
return c.request_response(req) unless return_op
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.request_response(req, **md) unless return_op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #request_response.
op = c.operation
op.define_singleton_method(:execute) do
c.request_response(req)
c.request_response(req, **md)
end
op
end
@ -168,6 +192,11 @@ module GRPC
#
# If return_op is true, the call returns the response.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
@ -176,15 +205,16 @@ module GRPC
# @param return_op [true|false] (default false) return an Operation if true
# @return [Object|Operation] the response received from the server
def client_streamer(method, requests, marshal, unmarshal, deadline=nil,
return_op:false)
return_op:false, **kw)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
return c.client_streamer(requests) unless return_op
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.client_streamer(requests, **md) unless return_op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #client_streamer.
op = c.operation
op.define_singleton_method(:execute) do
c.client_streamer(requests)
c.client_streamer(requests, **md)
end
op
end
@ -227,6 +257,11 @@ module GRPC
# calls the given block with each response or returns an Enumerator of the
# responses.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
@ -236,15 +271,16 @@ module GRPC
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above
def server_streamer(method, req, marshal, unmarshal, deadline=nil,
return_op:false, &blk)
return_op:false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
return c.server_streamer(req, &blk) unless return_op
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.server_streamer(req, **md, &blk) unless return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #server_streamer
op = c.operation
op.define_singleton_method(:execute) do
c.server_streamer(req, &blk)
c.server_streamer(req, **md, &blk)
end
op
end
@ -313,6 +349,12 @@ module GRPC
#
# * the deadline is exceeded
#
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# == Return Value ==
#
# if the return_op is false, the return value is an Enumerator of the
@ -332,15 +374,16 @@ module GRPC
# @param return_op [true|false] (default false) return an Operation if true
# @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal, deadline=nil,
return_op:false, &blk)
return_op:false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
return c.bidi_streamer(requests, &blk) unless return_op
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.bidi_streamer(requests, **md, &blk) unless return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #bidi_streamer
op = c.operation
op.define_singleton_method(:execute) do
c.bidi_streamer(requests, &blk)
c.bidi_streamer(requests, **md, &blk)
end
op
end

@ -246,6 +246,7 @@ module GRPC
# immediately
finished_tag = Object.new
call_queue = Core::CompletionQueue.new
call.metadata = new_server_rpc.metadata # store the metadata on the call
call.accept(call_queue, finished_tag)
# Send UNAVAILABLE if there are too many unprocessed jobs

@ -31,14 +31,17 @@ require 'grpc'
require 'grpc/generic/active_call'
require_relative '../port_picker'
ActiveCall = GRPC::ActiveCall
include GRPC::Core::StatusCodes
describe GRPC::ActiveCall do
ActiveCall = GRPC::ActiveCall
Call = GRPC::Core::Call
CompletionType = GRPC::Core::CompletionType
before(:each) do
@pass_through = Proc.new { |x| x }
@server_tag = Object.new
@server_finished_tag = Object.new
@server_done_tag, meta_tag = Object.new
@tag = Object.new
@client_queue = GRPC::Core::CompletionQueue.new
@ -58,11 +61,12 @@ describe GRPC::ActiveCall do
describe 'restricted view methods' do
before(:each) do
call = make_test_call
finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
@client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: finished_tag)
finished_tag: done_tag,
read_metadata_tag: meta_tag)
end
describe '#multi_req_view' do
@ -89,11 +93,12 @@ describe GRPC::ActiveCall do
describe '#remote_send' do
it 'allows a client to send a payload to the server' do
call = make_test_call
finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
@client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: finished_tag)
finished_tag: done_tag,
read_metadata_tag: meta_tag)
msg = 'message is a string'
@client_call.remote_send(msg)
@ -113,12 +118,13 @@ describe GRPC::ActiveCall do
it 'marshals the payload using the marshal func' do
call = make_test_call
finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
marshal = Proc.new { |x| 'marshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, marshal,
@pass_through, deadline,
finished_tag: finished_tag)
finished_tag: done_tag,
read_metadata_tag: meta_tag)
msg = 'message is a string'
client_call.remote_send(msg)
@ -133,14 +139,31 @@ describe GRPC::ActiveCall do
end
describe '#client_start_invoke' do
it 'sends keywords as metadata to the server when the are present' do
call, pass_through = make_test_call, Proc.new { |x| x }
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline, k1: 'v1',
k2: 'v2')
@server.request_call(@server_tag)
ev = @server_queue.next(deadline)
expect(ev).to_not be_nil
expect(ev.result.metadata['k1']).to eq('v1')
expect(ev.result.metadata['k2']).to eq('v2')
end
end
describe '#remote_read' do
it 'reads the response sent by a server' do
call, pass_through = make_test_call, Proc.new { |x| x }
finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: finished_tag)
finished_tag: done_tag,
read_metadata_tag: meta_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@ -148,19 +171,56 @@ describe GRPC::ActiveCall do
expect(client_call.remote_read).to eq('server_response')
end
it 'saves metadata { status=200 } when the server adds no metadata' do
call, pass_through = make_test_call, Proc.new { |x| x }
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
server_call.remote_send('ignore me')
expect(client_call.metadata).to be_nil
client_call.remote_read
expect(client_call.metadata).to eq({':status' => '200'})
end
it 'saves metadata add by the server' do
call, pass_through = make_test_call, Proc.new { |x| x }
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2')
server_call.remote_send('ignore me')
expect(client_call.metadata).to be_nil
client_call.remote_read
expect(client_call.metadata).to eq({':status' => '200', 'k1' => 'v1',
'k2' => 'v2'})
end
it 'get a nil msg before a status when an OK status is sent' do
call, pass_through = make_test_call, Proc.new { |x| x }
finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: finished_tag)
finished_tag: done_tag,
read_metadata_tag: meta_tag)
msg = 'message is a string'
client_call.remote_send(msg)
client_call.writes_done(false)
server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response')
server_call.send_status(StatusCodes::OK, 'OK')
server_call.send_status(OK, 'OK')
expect(client_call.remote_read).to eq('server_response')
res = client_call.remote_read
expect(res).to be_nil
@ -169,12 +229,13 @@ describe GRPC::ActiveCall do
it 'unmarshals the response using the unmarshal func' do
call = make_test_call
finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
unmarshal = Proc.new { |x| 'unmarshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, @pass_through,
unmarshal, deadline,
finished_tag: finished_tag)
finished_tag: done_tag,
read_metadata_tag: meta_tag)
# confirm the client receives the unmarshalled message
msg = 'message is a string'
@ -196,11 +257,12 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that can read n responses' do
call = make_test_call
finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: finished_tag)
finished_tag: done_tag,
read_metadata_tag: meta_tag)
msg = 'message is 4a string'
reply = 'server_response'
client_call.remote_send(msg)
@ -215,11 +277,12 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that stops after an OK Status' do
call = make_test_call
finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: finished_tag)
read_metadata_tag: meta_tag,
finished_tag: done_tag)
msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
@ -231,7 +294,7 @@ describe GRPC::ActiveCall do
server_call.remote_send(reply)
expect(e.next).to eq(reply)
end
server_call.send_status(StatusCodes::OK, 'OK')
server_call.send_status(OK, 'OK')
expect { e.next }.to raise_error(StopIteration)
end
@ -240,34 +303,36 @@ describe GRPC::ActiveCall do
describe '#writes_done' do
it 'finishes ok if the server sends a status response' do
call = make_test_call
finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: finished_tag)
finished_tag: done_tag,
read_metadata_tag: meta_tag)
msg = 'message is a string'
client_call.remote_send(msg)
expect { client_call.writes_done(false) }.to_not raise_error
server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response')
expect(client_call.remote_read).to eq('server_response')
server_call.send_status(StatusCodes::OK, 'status code is OK')
server_call.send_status(OK, 'status code is OK')
expect { server_call.finished }.to_not raise_error
expect { client_call.finished }.to_not raise_error
end
it 'finishes ok if the server sends an early status response' do
call = make_test_call
finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: finished_tag)
read_metadata_tag: meta_tag,
finished_tag: done_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response')
server_call.send_status(StatusCodes::OK, 'status code is OK')
server_call.send_status(OK, 'status code is OK')
expect(client_call.remote_read).to eq('server_response')
expect { client_call.writes_done(false) }.to_not raise_error
expect { server_call.finished }.to_not raise_error
@ -276,16 +341,17 @@ describe GRPC::ActiveCall do
it 'finishes ok if writes_done is true' do
call = make_test_call
finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: finished_tag)
read_metadata_tag: meta_tag,
finished_tag: done_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response')
server_call.send_status(StatusCodes::OK, 'status code is OK')
server_call.send_status(OK, 'status code is OK')
expect(client_call.remote_read).to eq('server_response')
expect { client_call.writes_done(true) }.to_not raise_error
expect { server_call.finished }.to_not raise_error
@ -293,19 +359,20 @@ describe GRPC::ActiveCall do
end
def expect_server_to_receive(sent_text)
c = expect_server_to_be_invoked
def expect_server_to_receive(sent_text, **kw)
c = expect_server_to_be_invoked(**kw)
expect(c.remote_read).to eq(sent_text)
c
end
def expect_server_to_be_invoked()
def expect_server_to_be_invoked(**kw)
@server.request_call(@server_tag)
ev = @server_queue.next(deadline)
ev.call.accept(@client_queue, @server_finished_tag)
ev.call.add_metadata(kw)
ev.call.accept(@client_queue, @server_done_tag)
ActiveCall.new(ev.call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: @server_finished_tag)
finished_tag: @server_done_tag)
end
def make_test_call

@ -45,7 +45,7 @@ def wakey_thread(&blk)
end
def load_test_certs
test_root = File.join(File.parent(File.dirname(__FILE__)), 'testdata')
test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
files = ['ca.pem', 'server1.key', 'server1.pem']
files.map { |f| File.open(File.join(test_root, f)).read }
end
@ -136,14 +136,32 @@ describe 'ClientStub' do
@sent_msg, @resp = 'a_msg', 'a_reply'
end
describe 'without a call operation' do
shared_examples 'request response' do
it 'should send a request to/receive a_reply from a server' do
it 'should send a request to/receive a reply from a server' do
host = new_test_host
th = run_request_response(host, @sent_msg, @resp, @pass)
stub = GRPC::ClientStub.new(host, @cq)
resp = stub.request_response(@method, @sent_msg, NOOP, NOOP)
expect(resp).to eq(@resp)
expect(get_response(stub)).to eq(@resp)
th.join
end
it 'should send metadata to the server ok' do
host = new_test_host
th = run_request_response(host, @sent_msg, @resp, @pass, k1: 'v1',
k2: 'v2')
stub = GRPC::ClientStub.new(host, @cq)
expect(get_response(stub)).to eq(@resp)
th.join
end
it 'should update the sent metadata with a provided metadata updater' do
host = new_test_host
th = run_request_response(host, @sent_msg, @resp, @pass,
k1: 'updated-v1', k2: 'v2')
update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md }
stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)
expect(get_response(stub)).to eq(@resp)
th.join
end
@ -151,10 +169,8 @@ describe 'ClientStub' do
alt_host = new_test_host
th = run_request_response(alt_host, @sent_msg, @resp, @pass)
ch = GRPC::Core::Channel.new(alt_host, nil)
stub = GRPC::ClientStub.new('ignored-host', @cq,
channel_override:ch)
resp = stub.request_response(@method, @sent_msg, NOOP, NOOP)
expect(resp).to eq(@resp)
stub = GRPC::ClientStub.new('ignored-host', @cq, channel_override:ch)
expect(get_response(stub)).to eq(@resp)
th.join
end
@ -162,89 +178,73 @@ describe 'ClientStub' do
host = new_test_host
th = run_request_response(host, @sent_msg, @resp, @fail)
stub = GRPC::ClientStub.new(host, @cq)
blk = Proc.new do
stub.request_response(@method, @sent_msg, NOOP, NOOP)
end
blk = Proc.new { get_response(stub) }
expect(&blk).to raise_error(GRPC::BadStatus)
th.join
end
end
describe 'via a call operation' do
describe 'without a call operation' do
it 'should send a request to/receive a_reply from a server' do
host = new_test_host
th = run_request_response(host, @sent_msg, @resp, @pass)
stub = GRPC::ClientStub.new(host, @cq)
op = stub.request_response(@method, @sent_msg, NOOP, NOOP,
return_op:true)
expect(op).to be_a(GRPC::ActiveCall::Operation)
resp = op.execute()
expect(resp).to eq(@resp)
th.join
def get_response(stub)
stub.request_response(@method, @sent_msg, NOOP, NOOP, k1: 'v1',
k2: 'v2')
end
it 'should raise an error if the status is not OK' do
host = new_test_host
th = run_request_response(host, @sent_msg, @resp, @fail)
stub = GRPC::ClientStub.new(host, @cq)
it_behaves_like 'request response'
end
describe 'via a call operation' do
def get_response(stub)
op = stub.request_response(@method, @sent_msg, NOOP, NOOP,
return_op:true)
return_op:true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
blk = Proc.new do
op.execute()
end
expect(&blk).to raise_error(GRPC::BadStatus)
th.join
op.execute()
end
it_behaves_like 'request response'
end
end
describe '#client_streamer' do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s }
@resp = 'a_reply'
end
shared_examples 'client streaming' do
describe 'without a call operation' do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s }
@resp = 'a_reply'
end
it 'should send requests to/receive a reply from a server' do
host = new_test_host
th = run_client_streamer(host, @sent_msgs, @resp, @pass)
stub = GRPC::ClientStub.new(host, @cq)
resp = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP)
expect(resp).to eq(@resp)
expect(get_response(stub)).to eq(@resp)
th.join
end
it 'should raise an error if the status is not ok' do
it 'should send metadata to the server ok' do
host = new_test_host
th = run_client_streamer(host, @sent_msgs, @resp, @fail)
th = run_client_streamer(host, @sent_msgs, @resp, @pass, k1: 'v1',
k2: 'v2')
stub = GRPC::ClientStub.new(host, @cq)
blk = Proc.new do
stub.client_streamer(@method, @sent_msgs, NOOP, NOOP)
end
expect(&blk).to raise_error(BadStatus)
expect(get_response(stub)).to eq(@resp)
th.join
end
end
describe 'via a call operation' do
it 'should send requests to/receive a reply from a server' do
it 'should update the sent metadata with a provided metadata updater' do
host = new_test_host
th = run_client_streamer(host, @sent_msgs, @resp, @pass)
stub = GRPC::ClientStub.new(host, @cq)
op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
return_op:true)
expect(op).to be_a(GRPC::ActiveCall::Operation)
resp = op.execute()
expect(resp).to eq(@resp)
th = run_client_streamer(host, @sent_msgs, @resp, @pass,
k1: 'updated-v1', k2: 'v2')
update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md }
stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)
expect(get_response(stub)).to eq(@resp)
th.join
end
@ -252,36 +252,53 @@ describe 'ClientStub' do
host = new_test_host
th = run_client_streamer(host, @sent_msgs, @resp, @fail)
stub = GRPC::ClientStub.new(host, @cq)
blk = Proc.new { get_response(stub) }
expect(&blk).to raise_error(GRPC::BadStatus)
th.join
end
end
describe 'without a call operation' do
def get_response(stub)
stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, k1: 'v1',
k2: 'v2')
end
it_behaves_like 'client streaming'
end
describe 'via a call operation' do
def get_response(stub)
op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
return_op:true)
return_op:true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
blk = Proc.new do
op.execute()
end
expect(&blk).to raise_error(BadStatus)
th.join
resp = op.execute()
end
it_behaves_like 'client streaming'
end
end
describe '#server_streamer' do
before(:each) do
@sent_msg = 'a_msg'
@replys = Array.new(3) { |i| 'reply_' + (i+1).to_s }
end
shared_examples 'server streaming' do
describe 'without a call operation' do
before(:each) do
@sent_msg = 'a_msg'
@replys = Array.new(3) { |i| 'reply_' + (i+1).to_s }
end
it 'should send a request to/receive replies from a server' do
host = new_test_host
th = run_server_streamer(host, @sent_msg, @replys, @pass)
stub = GRPC::ClientStub.new(host, @cq)
e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP)
expect(e).to be_a(Enumerator)
expect(e.collect { |r| r }).to eq(@replys)
expect(get_responses(stub).collect { |r| r }).to eq(@replys)
th.join
end
@ -289,60 +306,79 @@ describe 'ClientStub' do
host = new_test_host
th = run_server_streamer(host, @sent_msg, @replys, @fail)
stub = GRPC::ClientStub.new(host, @cq)
e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP)
expect(e).to be_a(Enumerator)
expect { e.collect { |r| r } }.to raise_error(BadStatus)
e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join
end
end
describe 'via a call operation' do
it 'should send a request to/receive replies from a server' do
it 'should send metadata to the server ok' do
host = new_test_host
th = run_server_streamer(host, @sent_msg, @replys, @pass)
th = run_server_streamer(host, @sent_msg, @replys, @fail, k1: 'v1',
k2: 'v2')
stub = GRPC::ClientStub.new(host, @cq)
op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
return_op:true)
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute()
expect(e).to be_a(Enumerator)
e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join
end
it 'should raise an error if the status is not ok' do
it 'should update the sent metadata with a provided metadata updater' do
host = new_test_host
th = run_server_streamer(host, @sent_msg, @replys, @fail)
stub = GRPC::ClientStub.new(host, @cq)
th = run_server_streamer(host, @sent_msg, @replys, @pass,
k1: 'updated-v1', k2: 'v2')
update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md }
stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@replys)
th.join
end
end
describe 'without a call operation' do
def get_responses(stub)
e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, k1: 'v1',
k2: 'v2')
expect(e).to be_a(Enumerator)
e
end
it_behaves_like 'server streaming'
end
describe 'via a call operation' do
def get_responses(stub)
op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
return_op:true)
return_op:true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute()
expect(e).to be_a(Enumerator)
expect { e.collect { |r| r } }.to raise_error(BadStatus)
th.join
e
end
it_behaves_like 'server streaming'
end
end
describe '#bidi_streamer' do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i+1).to_s }
end
describe 'without a call operation' do
shared_examples 'bidi streaming' do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i+1).to_s }
end
it 'supports sending all the requests first', :bidi => true do
host = new_test_host
th = run_bidi_streamer_handle_inputs_first(host, @sent_msgs, @replys,
@pass)
stub = GRPC::ClientStub.new(host, @cq)
e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
expect(e).to be_a(Enumerator)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@replys)
th.join
end
@ -351,8 +387,7 @@ describe 'ClientStub' do
host = new_test_host
th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, true)
stub = GRPC::ClientStub.new(host, @cq)
e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
expect(e).to be_a(Enumerator)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
@ -367,68 +402,48 @@ describe 'ClientStub' do
host = new_test_host
th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, false)
stub = GRPC::ClientStub.new(host, @cq)
e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
expect(e).to be_a(Enumerator)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
end
describe 'via a call operation' do
describe 'without a call operation' do
it 'supports sending all the requests first', :bidi => true do
host = new_test_host
th = run_bidi_streamer_handle_inputs_first(host, @sent_msgs, @replys,
@pass)
stub = GRPC::ClientStub.new(host, @cq)
op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP,
return_op:true)
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
def get_responses(stub)
e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
expect(e).to be_a(Enumerator)
expect(e.collect { |r| r }).to eq(@replys)
th.join
e
end
it 'supports client-initiated ping pong', :bidi => true do
host = new_test_host
th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, true)
stub = GRPC::ClientStub.new(host, @cq)
op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP,
return_op:true)
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
expect(e).to be_a(Enumerator)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
it_behaves_like 'bidi streaming'
# disabled because an unresolved wire-protocol implementation feature
#
# - servers should be able initiate messaging, however, as it stand
# servers don't know if all the client metadata has been sent until
# they receive a message from the client. Without receiving all the
# metadata, the server does not accept the call, so this test hangs.
xit 'supports server-initiated ping pong', :bidi => true do
th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, false)
stub = GRPC::ClientStub.new(host, @cq)
op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP,
return_op:true)
end
describe 'via a call operation' do
def get_responses(stub)
op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, return_op:true)
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
expect(e).to be_a(Enumerator)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
e
end
it_behaves_like 'bidi streaming'
end
end
def run_server_streamer(hostname, expected_input, replys, status)
def run_server_streamer(hostname, expected_input, replys, status, **kw)
wanted_metadata = kw.clone
wakey_thread do |mtx, cnd|
c = expect_server_to_be_invoked(hostname, mtx, cnd)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
expect(c.remote_read).to eq(expected_input)
replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
@ -462,19 +477,27 @@ describe 'ClientStub' do
end
end
def run_client_streamer(hostname, expected_inputs, resp, status)
def run_client_streamer(hostname, expected_inputs, resp, status, **kw)
wanted_metadata = kw.clone
wakey_thread do |mtx, cnd|
c = expect_server_to_be_invoked(hostname, mtx, cnd)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
c.remote_send(resp)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
end
end
def run_request_response(hostname, expected_input, resp, status)
def run_request_response(hostname, expected_input, resp, status, **kw)
wanted_metadata = kw.clone
wakey_thread do |mtx, cnd|
c = expect_server_to_be_invoked(hostname, mtx, cnd)
expect(c.remote_read).to eq(expected_input)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
c.remote_send(resp)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
end
@ -496,9 +519,11 @@ describe 'ClientStub' do
test_deadline = Time.now + 10 # fail tests after 10 seconds
ev = server_queue.pluck(@server_tag, INFINITE_FUTURE)
raise OutOfTime if ev.nil?
server_call = ev.call
server_call.metadata = ev.result.metadata
finished_tag = Object.new
ev.call.accept(server_queue, finished_tag)
GRPC::ActiveCall.new(ev.call, server_queue, NOOP, NOOP, INFINITE_FUTURE,
server_call.accept(server_queue, finished_tag)
GRPC::ActiveCall.new(server_call, server_queue, NOOP, NOOP, INFINITE_FUTURE,
finished_tag: finished_tag)
end

@ -36,7 +36,7 @@ require 'xray/thread_dump_signal_handler'
require_relative '../port_picker'
def load_test_certs
test_root = File.join(File.parent(File.dirname(__FILE__)), 'testdata')
test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
files = ['ca.pem', 'server1.key', 'server1.pem']
files.map { |f| File.open(File.join(test_root, f)).read }
end
@ -95,6 +95,7 @@ SlowStub = SlowService.rpc_stub_class
describe GRPC::RpcServer do
RpcServer = GRPC::RpcServer
StatusCodes = GRPC::Core::StatusCodes
before(:each) do
@method = 'an_rpc_method'
@ -343,7 +344,7 @@ describe GRPC::RpcServer do
stub = GRPC::ClientStub.new(@host, cq, **@client_opts)
stub.request_response('/unknown', req, @marshal, @unmarshal)
end
expect(&blk).to raise_error BadStatus
expect(&blk).to raise_error GRPC::BadStatus
@srv.stop
t.join
end
@ -402,7 +403,7 @@ describe GRPC::RpcServer do
stub = SlowStub.new(@host, **@client_opts)
begin
stub.an_rpc(req)
rescue BadStatus => e
rescue GRPC::BadStatus => e
_1_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE
end
end

Loading…
Cancel
Save