|
|
|
@ -129,27 +129,36 @@ def nulls(l) |
|
|
|
|
[].pack('x' * l).force_encoding('ascii-8bit') |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. |
|
|
|
|
class EnumeratorQueue |
|
|
|
|
extend Forwardable |
|
|
|
|
def_delegators :@q, :push |
|
|
|
|
|
|
|
|
|
def initialize(sentinel) |
|
|
|
|
@q = Queue.new |
|
|
|
|
@sentinel = sentinel |
|
|
|
|
end |
|
|
|
|
# A FullDuplexEnumerator passes requests to a block and yields generated responses |
|
|
|
|
class FullDuplexEnumerator |
|
|
|
|
include Grpc::Testing |
|
|
|
|
include Grpc::Testing::PayloadType |
|
|
|
|
|
|
|
|
|
def initialize(requests) |
|
|
|
|
@requests = requests |
|
|
|
|
end |
|
|
|
|
def each_item |
|
|
|
|
return enum_for(:each_item) unless block_given? |
|
|
|
|
loop do |
|
|
|
|
r = @q.pop |
|
|
|
|
break if r.equal?(@sentinel) |
|
|
|
|
fail r if r.is_a? Exception |
|
|
|
|
yield r |
|
|
|
|
GRPC.logger.info('interop-server: started receiving') |
|
|
|
|
begin |
|
|
|
|
cls = StreamingOutputCallResponse |
|
|
|
|
@requests.each do |req| |
|
|
|
|
req.response_parameters.each do |params| |
|
|
|
|
resp_size = params.size |
|
|
|
|
GRPC.logger.info("read a req, response size is #{resp_size}") |
|
|
|
|
yield cls.new(payload: Payload.new(type: req.response_type, |
|
|
|
|
body: nulls(resp_size))) |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
GRPC.logger.info('interop-server: finished receiving') |
|
|
|
|
rescue StandardError => e |
|
|
|
|
GRPC.logger.info('interop-server: failed') |
|
|
|
|
GRPC.logger.warn(e) |
|
|
|
|
fail e |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# A runnable implementation of the schema-specified testing service, with each |
|
|
|
|
# service method implemented as required by the interop testing spec. |
|
|
|
|
class TestTarget < Grpc::Testing::TestService::Service |
|
|
|
@ -182,31 +191,9 @@ class TestTarget < Grpc::Testing::TestService::Service |
|
|
|
|
|
|
|
|
|
def full_duplex_call(reqs) |
|
|
|
|
# reqs is a lazy Enumerator of the requests sent by the client. |
|
|
|
|
q = EnumeratorQueue.new(self) |
|
|
|
|
cls = StreamingOutputCallResponse |
|
|
|
|
Thread.new do |
|
|
|
|
begin |
|
|
|
|
GRPC.logger.info('interop-server: started receiving') |
|
|
|
|
reqs.each do |req| |
|
|
|
|
req.response_parameters.each do |params| |
|
|
|
|
resp_size = params.size |
|
|
|
|
GRPC.logger.info("read a req, response size is #{resp_size}") |
|
|
|
|
resp = cls.new(payload: Payload.new(type: req.response_type, |
|
|
|
|
body: nulls(resp_size))) |
|
|
|
|
q.push(resp) |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
GRPC.logger.info('interop-server: finished receiving') |
|
|
|
|
q.push(self) |
|
|
|
|
rescue StandardError => e |
|
|
|
|
GRPC.logger.info('interop-server: failed') |
|
|
|
|
GRPC.logger.warn(e) |
|
|
|
|
q.push(e) # share the exception with the enumerator |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
q.each_item |
|
|
|
|
FullDuplexEnumerator.new(reqs).each_item |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def half_duplex_call(reqs) |
|
|
|
|
# TODO: update with unique behaviour of the half_duplex_call if that's |
|
|
|
|
# ever required by any of the tests. |
|
|
|
|