diff --git a/src/ruby/ext/grpc/rb_event.c b/src/ruby/ext/grpc/rb_event.c index 6708ea397cb..93f36f80e8c 100644 --- a/src/ruby/ext/grpc/rb_event.c +++ b/src/ruby/ext/grpc/rb_event.c @@ -205,7 +205,7 @@ static VALUE grpc_rb_event_result(VALUE self) { return Qnil; } rb_raise(rb_eEventError, "write failed, not sure why (code=%d)", - event->data.invoke_accepted); + event->data.write_accepted); break; case GRPC_CLIENT_METADATA_READ: @@ -263,7 +263,8 @@ void Init_google_rpc_event() { rb_define_method(rb_cEvent, "type", grpc_rb_event_type, 0); /* Constants representing the completion types */ - rb_mCompletionType = rb_define_module_under(rb_mGoogleRpcCore, "CompletionType"); + rb_mCompletionType = rb_define_module_under(rb_mGoogleRpcCore, + "CompletionType"); rb_define_const(rb_mCompletionType, "QUEUE_SHUTDOWN", INT2NUM(GRPC_QUEUE_SHUTDOWN)); rb_define_const(rb_mCompletionType, "READ", INT2NUM(GRPC_READ)); diff --git a/src/ruby/ext/grpc/rb_metadata.c b/src/ruby/ext/grpc/rb_metadata.c index 733a53a7e9c..dcacc4a976a 100644 --- a/src/ruby/ext/grpc/rb_metadata.c +++ b/src/ruby/ext/grpc/rb_metadata.c @@ -189,7 +189,8 @@ static VALUE grpc_rb_metadata_value(VALUE self) { /* rb_cMetadata is the Metadata class whose instances proxy grpc_metadata. */ VALUE rb_cMetadata = Qnil; void Init_google_rpc_metadata() { - rb_cMetadata = rb_define_class_under(rb_mGoogleRpcCore, "Metadata", rb_cObject); + rb_cMetadata = rb_define_class_under(rb_mGoogleRpcCore, "Metadata", + rb_cObject); /* Allocates an object managed by the ruby runtime */ rb_define_alloc_func(rb_cMetadata, grpc_rb_metadata_alloc); diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index de35f278a16..95cc7fc7082 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -31,7 +31,9 @@ require 'forwardable' require 'grpc' require 'grpc/generic/bidi_call' -def assert_event_type(got, want) +def assert_event_type(ev, want) + raise OutOfTime if ev.nil? + got = ev.type raise 'Unexpected rpc event: got %s, want %s' % [got, want] unless got == want end @@ -52,21 +54,28 @@ module GRPC # # deadline is the absolute deadline for the call. # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + # # @param call [Call] a call on which to start and invocation # @param q [CompletionQueue] used to wait for INVOKE_ACCEPTED # @param deadline [Fixnum,TimeSpec] the deadline for INVOKE_ACCEPTED - def self.client_start_invoke(call, q, deadline) + def self.client_start_invoke(call, q, deadline, **kw) raise ArgumentError.new('not a call') unless call.is_a?Core::Call if !q.is_a?Core::CompletionQueue raise ArgumentError.new('not a CompletionQueue') end + call.add_metadata(kw) if kw.length > 0 invoke_accepted, client_metadata_read = Object.new, Object.new finished_tag = Object.new call.start_invoke(q, invoke_accepted, client_metadata_read, finished_tag) + # wait for the invocation to be accepted ev = q.pluck(invoke_accepted, INFINITE_FUTURE) raise OutOfTime if ev.nil? - finished_tag + + [finished_tag, client_metadata_read] end # Creates an ActiveCall. @@ -91,9 +100,11 @@ module GRPC # @param deadline [Fixnum] the deadline for the call to complete # @param finished_tag [Object] the object used as the call's finish tag, # if the call has begun + # @param read_metadata_tag [Object] the object used as the call's finish + # tag, if the call has begun # @param started [true|false] (default true) indicates if the call has begun def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil, - started: true) + read_metadata_tag: nil, started: true) raise ArgumentError.new('not a call') unless call.is_a?Core::Call if !q.is_a?Core::CompletionQueue raise ArgumentError.new('not a CompletionQueue') @@ -102,6 +113,7 @@ module GRPC @cq = q @deadline = deadline @finished_tag = finished_tag + @read_metadata_tag = read_metadata_tag @marshal = marshal @started = started @unmarshal = unmarshal @@ -180,7 +192,7 @@ module GRPC def writes_done(assert_finished=true) @call.writes_done(self) ev = @cq.pluck(self, INFINITE_FUTURE) - assert_event_type(ev.type, FINISH_ACCEPTED) + assert_event_type(ev, FINISH_ACCEPTED) logger.debug("Writes done: waiting for finish? #{assert_finished}") if assert_finished ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) @@ -229,7 +241,7 @@ module GRPC # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return # until the flow control allows another send on this call. ev = @cq.pluck(self, INFINITE_FUTURE) - assert_event_type(ev.type, WRITE_ACCEPTED) + assert_event_type(ev, WRITE_ACCEPTED) ev = nil end @@ -243,7 +255,7 @@ module GRPC assert_queue_is_ready @call.start_write_status(Core::Status.new(code, details), self) ev = @cq.pluck(self, INFINITE_FUTURE) - assert_event_type(ev.type, FINISH_ACCEPTED) + assert_event_type(ev, FINISH_ACCEPTED) logger.debug("Status sent: #{code}:'#{details}'") if assert_finished return finished @@ -257,9 +269,16 @@ module GRPC # a READ, it returns the response after unmarshalling it. On # FINISHED, it returns nil if the status is OK, otherwise raising BadStatus def remote_read + if @call.metadata.nil? && !@read_metadata_tag.nil? + ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE) + assert_event_type(ev, CLIENT_METADATA_READ) + @call.metadata = ev.result + @read_metadata_tag = nil + end + @call.start_read(self) ev = @cq.pluck(self, INFINITE_FUTURE) - assert_event_type(ev.type, READ) + assert_event_type(ev, READ) logger.debug("received req: #{ev.result.inspect}") if !ev.result.nil? logger.debug("received req.to_s: #{ev.result.to_s}") @@ -333,10 +352,15 @@ module GRPC # request_response sends a request to a GRPC server, and returns the # response. + # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + # # @param req [Object] the request sent to the server # @return [Object] the response received from the server - def request_response(req) - start_call unless @started + def request_response(req, **kw) + start_call(**kw) unless @started remote_send(req) writes_done(false) response = remote_read @@ -354,10 +378,14 @@ module GRPC # array of marshallable objects; in typical case it will be an Enumerable # that allows dynamic construction of the marshallable objects. # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + # # @param requests [Object] an Enumerable of requests to send # @return [Object] the response received from the server - def client_streamer(requests) - start_call unless @started + def client_streamer(requests, **kw) + start_call(**kw) unless @started requests.each { |r| remote_send(r) } writes_done(false) response = remote_read @@ -377,10 +405,15 @@ module GRPC # it is executed with each response as the argument and no result is # returned. # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + # any keyword arguments are treated as metadata to be sent to the server. + # # @param req [Object] the request sent to the server # @return [Enumerator|nil] a response Enumerator - def server_streamer(req) - start_call unless @started + def server_streamer(req, **kw) + start_call(**kw) unless @started remote_send(req) writes_done(false) replies = enum_for(:each_remote_read_then_finish) @@ -410,10 +443,14 @@ module GRPC # the_call#writes_done has been called, otherwise the block will loop # forever. # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + # # @param requests [Object] an Enumerable of requests to send # @return [Enumerator, nil] a response Enumerator - def bidi_streamer(requests, &blk) - start_call unless @started + def bidi_streamer(requests, **kw, &blk) + start_call(**kw) unless @started bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline, @finished_tag) bd.run_on_client(requests, &blk) @@ -438,8 +475,9 @@ module GRPC private - def start_call - @finished_tag = ActiveCall.client_start_invoke(@call, @cq, @deadline) + def start_call(**kw) + tags = ActiveCall.client_start_invoke(@call, @cq, @deadline, **kw) + @finished_tag, @read_metadata_tag = tags @started = true end diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index 144f9d93eca..b0e72262ffa 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -50,7 +50,7 @@ module GRPC # Any arbitrary keyword arguments are treated as channel arguments used to # configure the RPC connection to the host. # - # There are two specific keywords are that not used to configure the + # There are some specific keyword args that are not used to configure the # channel: # # - :channel_override @@ -61,21 +61,30 @@ module GRPC # - :deadline # when present, this is the default deadline used for calls # + # - :update_metadata + # when present, this a func that takes a hash and returns a hash + # it can be used to update metadata, i.e, remove, change or update + # amend metadata values. + # # @param host [String] the host the stub connects to # @param q [Core::CompletionQueue] used to wait for events # @param channel_override [Core::Channel] a pre-created channel # @param deadline [Number] the default deadline to use in requests # @param creds [Core::Credentials] secures and/or authenticates the channel - # @param kw [KeywordArgs] the channel arguments + # @param update_metadata a func that updates metadata as described above + # @param kw [KeywordArgs]the channel arguments def initialize(host, q, channel_override:nil, - deadline:DEFAULT_DEADLINE, - creds:nil, + deadline: DEFAULT_DEADLINE, + creds: nil, + update_metadata: nil, **kw) if !q.is_a?Core::CompletionQueue raise ArgumentError.new('not a CompletionQueue') end - @host = host + @queue = q + + # set the channel instance if !channel_override.nil? ch = channel_override raise ArgumentError.new('not a Channel') unless ch.is_a?(Core::Channel) @@ -86,10 +95,19 @@ module GRPC else ch = Core::Channel.new(host, kw, creds) end + @ch = ch + + @update_metadata = nil + if !update_metadata.nil? + if !update_metadata.is_a?(Proc) + raise ArgumentError.new('update_metadata is not a Proc') + end + @update_metadata = update_metadata + end + + @host = host @deadline = deadline - @ch = ch - @queue = q end # request_response sends a request to a GRPC server, and returns the @@ -117,6 +135,11 @@ module GRPC # If return_op is true, the call returns an Operation, calling execute # on the Operation returns the response. # + # == Keyword Args == + # + # Unspecified keyword arguments are treated as metadata to be sent to the + # server. + # # @param method [String] the RPC method to call on the GRPC server # @param req [Object] the request sent to the server # @param marshal [Function] f(obj)->string that marshals requests @@ -125,15 +148,16 @@ module GRPC # @param return_op [true|false] (default false) return an Operation if true # @return [Object] the response received from the server def request_response(method, req, marshal, unmarshal, deadline=nil, - return_op:false) + return_op:false, **kw) c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - return c.request_response(req) unless return_op + md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + return c.request_response(req, **md) unless return_op # return the operation view of the active_call; define #execute as a # new method for this instance that invokes #request_response. op = c.operation op.define_singleton_method(:execute) do - c.request_response(req) + c.request_response(req, **md) end op end @@ -168,6 +192,11 @@ module GRPC # # If return_op is true, the call returns the response. # + # == Keyword Args == + # + # Unspecified keyword arguments are treated as metadata to be sent to the + # server. + # # @param method [String] the RPC method to call on the GRPC server # @param requests [Object] an Enumerable of requests to send # @param marshal [Function] f(obj)->string that marshals requests @@ -176,15 +205,16 @@ module GRPC # @param return_op [true|false] (default false) return an Operation if true # @return [Object|Operation] the response received from the server def client_streamer(method, requests, marshal, unmarshal, deadline=nil, - return_op:false) + return_op:false, **kw) c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - return c.client_streamer(requests) unless return_op + md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + return c.client_streamer(requests, **md) unless return_op # return the operation view of the active_call; define #execute as a # new method for this instance that invokes #client_streamer. op = c.operation op.define_singleton_method(:execute) do - c.client_streamer(requests) + c.client_streamer(requests, **md) end op end @@ -227,6 +257,11 @@ module GRPC # calls the given block with each response or returns an Enumerator of the # responses. # + # == Keyword Args == + # + # Unspecified keyword arguments are treated as metadata to be sent to the + # server. + # # @param method [String] the RPC method to call on the GRPC server # @param req [Object] the request sent to the server # @param marshal [Function] f(obj)->string that marshals requests @@ -236,15 +271,16 @@ module GRPC # @param blk [Block] when provided, is executed for each response # @return [Enumerator|Operation|nil] as discussed above def server_streamer(method, req, marshal, unmarshal, deadline=nil, - return_op:false, &blk) + return_op:false, **kw, &blk) c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - return c.server_streamer(req, &blk) unless return_op + md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + return c.server_streamer(req, **md, &blk) unless return_op # return the operation view of the active_call; define #execute # as a new method for this instance that invokes #server_streamer op = c.operation op.define_singleton_method(:execute) do - c.server_streamer(req, &blk) + c.server_streamer(req, **md, &blk) end op end @@ -313,6 +349,12 @@ module GRPC # # * the deadline is exceeded # + # + # == Keyword Args == + # + # Unspecified keyword arguments are treated as metadata to be sent to the + # server. + # # == Return Value == # # if the return_op is false, the return value is an Enumerator of the @@ -332,15 +374,16 @@ module GRPC # @param return_op [true|false] (default false) return an Operation if true # @return [Enumerator|nil|Operation] as discussed above def bidi_streamer(method, requests, marshal, unmarshal, deadline=nil, - return_op:false, &blk) + return_op:false, **kw, &blk) c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - return c.bidi_streamer(requests, &blk) unless return_op + md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + return c.bidi_streamer(requests, **md, &blk) unless return_op # return the operation view of the active_call; define #execute # as a new method for this instance that invokes #bidi_streamer op = c.operation op.define_singleton_method(:execute) do - c.bidi_streamer(requests, &blk) + c.bidi_streamer(requests, **md, &blk) end op end diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index ebbf3f97809..76e7838d0f2 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -246,6 +246,7 @@ module GRPC # immediately finished_tag = Object.new call_queue = Core::CompletionQueue.new + call.metadata = new_server_rpc.metadata # store the metadata on the call call.accept(call_queue, finished_tag) # Send UNAVAILABLE if there are too many unprocessed jobs diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index ceeef2a1d8f..a8ee3c0da89 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -31,14 +31,17 @@ require 'grpc' require 'grpc/generic/active_call' require_relative '../port_picker' -ActiveCall = GRPC::ActiveCall +include GRPC::Core::StatusCodes describe GRPC::ActiveCall do + ActiveCall = GRPC::ActiveCall + Call = GRPC::Core::Call + CompletionType = GRPC::Core::CompletionType before(:each) do @pass_through = Proc.new { |x| x } @server_tag = Object.new - @server_finished_tag = Object.new + @server_done_tag, meta_tag = Object.new @tag = Object.new @client_queue = GRPC::Core::CompletionQueue.new @@ -58,11 +61,12 @@ describe GRPC::ActiveCall do describe 'restricted view methods' do before(:each) do call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: finished_tag) + finished_tag: done_tag, + read_metadata_tag: meta_tag) end describe '#multi_req_view' do @@ -89,11 +93,12 @@ describe GRPC::ActiveCall do describe '#remote_send' do it 'allows a client to send a payload to the server' do call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: finished_tag) + finished_tag: done_tag, + read_metadata_tag: meta_tag) msg = 'message is a string' @client_call.remote_send(msg) @@ -113,12 +118,13 @@ describe GRPC::ActiveCall do it 'marshals the payload using the marshal func' do call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) marshal = Proc.new { |x| 'marshalled:' + x } client_call = ActiveCall.new(call, @client_queue, marshal, @pass_through, deadline, - finished_tag: finished_tag) + finished_tag: done_tag, + read_metadata_tag: meta_tag) msg = 'message is a string' client_call.remote_send(msg) @@ -133,14 +139,31 @@ describe GRPC::ActiveCall do end + describe '#client_start_invoke' do + + it 'sends keywords as metadata to the server when the are present' do + call, pass_through = make_test_call, Proc.new { |x| x } + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline, k1: 'v1', + k2: 'v2') + @server.request_call(@server_tag) + ev = @server_queue.next(deadline) + expect(ev).to_not be_nil + expect(ev.result.metadata['k1']).to eq('v1') + expect(ev.result.metadata['k2']).to eq('v2') + end + + end + describe '#remote_read' do it 'reads the response sent by a server' do call, pass_through = make_test_call, Proc.new { |x| x } - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: finished_tag) + finished_tag: done_tag, + read_metadata_tag: meta_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) @@ -148,19 +171,56 @@ describe GRPC::ActiveCall do expect(client_call.remote_read).to eq('server_response') end + it 'saves metadata { status=200 } when the server adds no metadata' do + call, pass_through = make_test_call, Proc.new { |x| x } + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: done_tag, + read_metadata_tag: meta_tag) + msg = 'message is a string' + client_call.remote_send(msg) + server_call = expect_server_to_receive(msg) + server_call.remote_send('ignore me') + expect(client_call.metadata).to be_nil + client_call.remote_read + expect(client_call.metadata).to eq({':status' => '200'}) + end + + it 'saves metadata add by the server' do + call, pass_through = make_test_call, Proc.new { |x| x } + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: done_tag, + read_metadata_tag: meta_tag) + msg = 'message is a string' + client_call.remote_send(msg) + server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2') + server_call.remote_send('ignore me') + expect(client_call.metadata).to be_nil + client_call.remote_read + expect(client_call.metadata).to eq({':status' => '200', 'k1' => 'v1', + 'k2' => 'v2'}) + end + + it 'get a nil msg before a status when an OK status is sent' do call, pass_through = make_test_call, Proc.new { |x| x } - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: finished_tag) + finished_tag: done_tag, + read_metadata_tag: meta_tag) msg = 'message is a string' client_call.remote_send(msg) client_call.writes_done(false) server_call = expect_server_to_receive(msg) server_call.remote_send('server_response') - server_call.send_status(StatusCodes::OK, 'OK') + server_call.send_status(OK, 'OK') expect(client_call.remote_read).to eq('server_response') res = client_call.remote_read expect(res).to be_nil @@ -169,12 +229,13 @@ describe GRPC::ActiveCall do it 'unmarshals the response using the unmarshal func' do call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) unmarshal = Proc.new { |x| 'unmarshalled:' + x } client_call = ActiveCall.new(call, @client_queue, @pass_through, unmarshal, deadline, - finished_tag: finished_tag) + finished_tag: done_tag, + read_metadata_tag: meta_tag) # confirm the client receives the unmarshalled message msg = 'message is a string' @@ -196,11 +257,12 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that can read n responses' do call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: finished_tag) + finished_tag: done_tag, + read_metadata_tag: meta_tag) msg = 'message is 4a string' reply = 'server_response' client_call.remote_send(msg) @@ -215,11 +277,12 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that stops after an OK Status' do call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: finished_tag) + read_metadata_tag: meta_tag, + finished_tag: done_tag) msg = 'message is a string' reply = 'server_response' client_call.remote_send(msg) @@ -231,7 +294,7 @@ describe GRPC::ActiveCall do server_call.remote_send(reply) expect(e.next).to eq(reply) end - server_call.send_status(StatusCodes::OK, 'OK') + server_call.send_status(OK, 'OK') expect { e.next }.to raise_error(StopIteration) end @@ -240,34 +303,36 @@ describe GRPC::ActiveCall do describe '#writes_done' do it 'finishes ok if the server sends a status response' do call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: finished_tag) + finished_tag: done_tag, + read_metadata_tag: meta_tag) msg = 'message is a string' client_call.remote_send(msg) expect { client_call.writes_done(false) }.to_not raise_error server_call = expect_server_to_receive(msg) server_call.remote_send('server_response') expect(client_call.remote_read).to eq('server_response') - server_call.send_status(StatusCodes::OK, 'status code is OK') + server_call.send_status(OK, 'status code is OK') expect { server_call.finished }.to_not raise_error expect { client_call.finished }.to_not raise_error end it 'finishes ok if the server sends an early status response' do call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: finished_tag) + read_metadata_tag: meta_tag, + finished_tag: done_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) server_call.remote_send('server_response') - server_call.send_status(StatusCodes::OK, 'status code is OK') + server_call.send_status(OK, 'status code is OK') expect(client_call.remote_read).to eq('server_response') expect { client_call.writes_done(false) }.to_not raise_error expect { server_call.finished }.to_not raise_error @@ -276,16 +341,17 @@ describe GRPC::ActiveCall do it 'finishes ok if writes_done is true' do call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: finished_tag) + read_metadata_tag: meta_tag, + finished_tag: done_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) server_call.remote_send('server_response') - server_call.send_status(StatusCodes::OK, 'status code is OK') + server_call.send_status(OK, 'status code is OK') expect(client_call.remote_read).to eq('server_response') expect { client_call.writes_done(true) }.to_not raise_error expect { server_call.finished }.to_not raise_error @@ -293,19 +359,20 @@ describe GRPC::ActiveCall do end - def expect_server_to_receive(sent_text) - c = expect_server_to_be_invoked + def expect_server_to_receive(sent_text, **kw) + c = expect_server_to_be_invoked(**kw) expect(c.remote_read).to eq(sent_text) c end - def expect_server_to_be_invoked() + def expect_server_to_be_invoked(**kw) @server.request_call(@server_tag) ev = @server_queue.next(deadline) - ev.call.accept(@client_queue, @server_finished_tag) + ev.call.add_metadata(kw) + ev.call.accept(@client_queue, @server_done_tag) ActiveCall.new(ev.call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: @server_finished_tag) + finished_tag: @server_done_tag) end def make_test_call diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 4b01af95810..c76f3b291ea 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -45,7 +45,7 @@ def wakey_thread(&blk) end def load_test_certs - test_root = File.join(File.parent(File.dirname(__FILE__)), 'testdata') + test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata') files = ['ca.pem', 'server1.key', 'server1.pem'] files.map { |f| File.open(File.join(test_root, f)).read } end @@ -136,14 +136,32 @@ describe 'ClientStub' do @sent_msg, @resp = 'a_msg', 'a_reply' end - describe 'without a call operation' do + shared_examples 'request response' do - it 'should send a request to/receive a_reply from a server' do + it 'should send a request to/receive a reply from a server' do host = new_test_host th = run_request_response(host, @sent_msg, @resp, @pass) stub = GRPC::ClientStub.new(host, @cq) - resp = stub.request_response(@method, @sent_msg, NOOP, NOOP) - expect(resp).to eq(@resp) + expect(get_response(stub)).to eq(@resp) + th.join + end + + it 'should send metadata to the server ok' do + host = new_test_host + th = run_request_response(host, @sent_msg, @resp, @pass, k1: 'v1', + k2: 'v2') + stub = GRPC::ClientStub.new(host, @cq) + expect(get_response(stub)).to eq(@resp) + th.join + end + + it 'should update the sent metadata with a provided metadata updater' do + host = new_test_host + th = run_request_response(host, @sent_msg, @resp, @pass, + k1: 'updated-v1', k2: 'v2') + update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md } + stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md) + expect(get_response(stub)).to eq(@resp) th.join end @@ -151,10 +169,8 @@ describe 'ClientStub' do alt_host = new_test_host th = run_request_response(alt_host, @sent_msg, @resp, @pass) ch = GRPC::Core::Channel.new(alt_host, nil) - stub = GRPC::ClientStub.new('ignored-host', @cq, - channel_override:ch) - resp = stub.request_response(@method, @sent_msg, NOOP, NOOP) - expect(resp).to eq(@resp) + stub = GRPC::ClientStub.new('ignored-host', @cq, channel_override:ch) + expect(get_response(stub)).to eq(@resp) th.join end @@ -162,89 +178,73 @@ describe 'ClientStub' do host = new_test_host th = run_request_response(host, @sent_msg, @resp, @fail) stub = GRPC::ClientStub.new(host, @cq) - blk = Proc.new do - stub.request_response(@method, @sent_msg, NOOP, NOOP) - end + blk = Proc.new { get_response(stub) } expect(&blk).to raise_error(GRPC::BadStatus) th.join end end - describe 'via a call operation' do + describe 'without a call operation' do - it 'should send a request to/receive a_reply from a server' do - host = new_test_host - th = run_request_response(host, @sent_msg, @resp, @pass) - stub = GRPC::ClientStub.new(host, @cq) - op = stub.request_response(@method, @sent_msg, NOOP, NOOP, - return_op:true) - expect(op).to be_a(GRPC::ActiveCall::Operation) - resp = op.execute() - expect(resp).to eq(@resp) - th.join + def get_response(stub) + stub.request_response(@method, @sent_msg, NOOP, NOOP, k1: 'v1', + k2: 'v2') end - it 'should raise an error if the status is not OK' do - host = new_test_host - th = run_request_response(host, @sent_msg, @resp, @fail) - stub = GRPC::ClientStub.new(host, @cq) + it_behaves_like 'request response' + + end + + describe 'via a call operation' do + + def get_response(stub) op = stub.request_response(@method, @sent_msg, NOOP, NOOP, - return_op:true) + return_op:true, k1: 'v1', k2: 'v2') expect(op).to be_a(GRPC::ActiveCall::Operation) - blk = Proc.new do - op.execute() - end - expect(&blk).to raise_error(GRPC::BadStatus) - th.join + op.execute() end + it_behaves_like 'request response' + end end describe '#client_streamer' do - before(:each) do - @sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s } - @resp = 'a_reply' - end + shared_examples 'client streaming' do - describe 'without a call operation' do + before(:each) do + @sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s } + @resp = 'a_reply' + end it 'should send requests to/receive a reply from a server' do host = new_test_host th = run_client_streamer(host, @sent_msgs, @resp, @pass) stub = GRPC::ClientStub.new(host, @cq) - resp = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP) - expect(resp).to eq(@resp) + expect(get_response(stub)).to eq(@resp) th.join end - it 'should raise an error if the status is not ok' do + it 'should send metadata to the server ok' do host = new_test_host - th = run_client_streamer(host, @sent_msgs, @resp, @fail) + th = run_client_streamer(host, @sent_msgs, @resp, @pass, k1: 'v1', + k2: 'v2') stub = GRPC::ClientStub.new(host, @cq) - blk = Proc.new do - stub.client_streamer(@method, @sent_msgs, NOOP, NOOP) - end - expect(&blk).to raise_error(BadStatus) + expect(get_response(stub)).to eq(@resp) th.join end - end - describe 'via a call operation' do - - it 'should send requests to/receive a reply from a server' do + it 'should update the sent metadata with a provided metadata updater' do host = new_test_host - th = run_client_streamer(host, @sent_msgs, @resp, @pass) - stub = GRPC::ClientStub.new(host, @cq) - op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, - return_op:true) - expect(op).to be_a(GRPC::ActiveCall::Operation) - resp = op.execute() - expect(resp).to eq(@resp) + th = run_client_streamer(host, @sent_msgs, @resp, @pass, + k1: 'updated-v1', k2: 'v2') + update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md } + stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md) + expect(get_response(stub)).to eq(@resp) th.join end @@ -252,36 +252,53 @@ describe 'ClientStub' do host = new_test_host th = run_client_streamer(host, @sent_msgs, @resp, @fail) stub = GRPC::ClientStub.new(host, @cq) + blk = Proc.new { get_response(stub) } + expect(&blk).to raise_error(GRPC::BadStatus) + th.join + end + + end + + describe 'without a call operation' do + + def get_response(stub) + stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, k1: 'v1', + k2: 'v2') + end + + it_behaves_like 'client streaming' + + end + + describe 'via a call operation' do + + def get_response(stub) op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, - return_op:true) + return_op:true, k1: 'v1', k2: 'v2') expect(op).to be_a(GRPC::ActiveCall::Operation) - blk = Proc.new do - op.execute() - end - expect(&blk).to raise_error(BadStatus) - th.join + resp = op.execute() end + it_behaves_like 'client streaming' + end end describe '#server_streamer' do - before(:each) do - @sent_msg = 'a_msg' - @replys = Array.new(3) { |i| 'reply_' + (i+1).to_s } - end + shared_examples 'server streaming' do - describe 'without a call operation' do + before(:each) do + @sent_msg = 'a_msg' + @replys = Array.new(3) { |i| 'reply_' + (i+1).to_s } + end it 'should send a request to/receive replies from a server' do host = new_test_host th = run_server_streamer(host, @sent_msg, @replys, @pass) stub = GRPC::ClientStub.new(host, @cq) - e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP) - expect(e).to be_a(Enumerator) - expect(e.collect { |r| r }).to eq(@replys) + expect(get_responses(stub).collect { |r| r }).to eq(@replys) th.join end @@ -289,60 +306,79 @@ describe 'ClientStub' do host = new_test_host th = run_server_streamer(host, @sent_msg, @replys, @fail) stub = GRPC::ClientStub.new(host, @cq) - e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP) - expect(e).to be_a(Enumerator) - expect { e.collect { |r| r } }.to raise_error(BadStatus) + e = get_responses(stub) + expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end - end - - describe 'via a call operation' do - - it 'should send a request to/receive replies from a server' do + it 'should send metadata to the server ok' do host = new_test_host - th = run_server_streamer(host, @sent_msg, @replys, @pass) + th = run_server_streamer(host, @sent_msg, @replys, @fail, k1: 'v1', + k2: 'v2') stub = GRPC::ClientStub.new(host, @cq) - op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, - return_op:true) - expect(op).to be_a(GRPC::ActiveCall::Operation) - e = op.execute() - expect(e).to be_a(Enumerator) + e = get_responses(stub) + expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end - it 'should raise an error if the status is not ok' do + it 'should update the sent metadata with a provided metadata updater' do host = new_test_host - th = run_server_streamer(host, @sent_msg, @replys, @fail) - stub = GRPC::ClientStub.new(host, @cq) + th = run_server_streamer(host, @sent_msg, @replys, @pass, + k1: 'updated-v1', k2: 'v2') + update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md } + stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md) + e = get_responses(stub) + expect(e.collect { |r| r }).to eq(@replys) + th.join + end + + end + + describe 'without a call operation' do + + def get_responses(stub) + e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, k1: 'v1', + k2: 'v2') + expect(e).to be_a(Enumerator) + e + end + + it_behaves_like 'server streaming' + + end + + describe 'via a call operation' do + + def get_responses(stub) op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, - return_op:true) + return_op:true, k1: 'v1', k2: 'v2') expect(op).to be_a(GRPC::ActiveCall::Operation) e = op.execute() expect(e).to be_a(Enumerator) - expect { e.collect { |r| r } }.to raise_error(BadStatus) - th.join + e end + it_behaves_like 'server streaming' + end end describe '#bidi_streamer' do - before(:each) do - @sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s } - @replys = Array.new(3) { |i| 'reply_' + (i+1).to_s } - end - describe 'without a call operation' do + shared_examples 'bidi streaming' do + + before(:each) do + @sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s } + @replys = Array.new(3) { |i| 'reply_' + (i+1).to_s } + end it 'supports sending all the requests first', :bidi => true do host = new_test_host th = run_bidi_streamer_handle_inputs_first(host, @sent_msgs, @replys, @pass) stub = GRPC::ClientStub.new(host, @cq) - e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP) - expect(e).to be_a(Enumerator) + e = get_responses(stub) expect(e.collect { |r| r }).to eq(@replys) th.join end @@ -351,8 +387,7 @@ describe 'ClientStub' do host = new_test_host th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, true) stub = GRPC::ClientStub.new(host, @cq) - e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP) - expect(e).to be_a(Enumerator) + e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end @@ -367,68 +402,48 @@ describe 'ClientStub' do host = new_test_host th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, false) stub = GRPC::ClientStub.new(host, @cq) - e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP) - expect(e).to be_a(Enumerator) + e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end end - describe 'via a call operation' do + describe 'without a call operation' do - it 'supports sending all the requests first', :bidi => true do - host = new_test_host - th = run_bidi_streamer_handle_inputs_first(host, @sent_msgs, @replys, - @pass) - stub = GRPC::ClientStub.new(host, @cq) - op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, - return_op:true) - expect(op).to be_a(GRPC::ActiveCall::Operation) - e = op.execute + def get_responses(stub) + e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP) expect(e).to be_a(Enumerator) - expect(e.collect { |r| r }).to eq(@replys) - th.join + e end - it 'supports client-initiated ping pong', :bidi => true do - host = new_test_host - th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, true) - stub = GRPC::ClientStub.new(host, @cq) - op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, - return_op:true) - expect(op).to be_a(GRPC::ActiveCall::Operation) - e = op.execute - expect(e).to be_a(Enumerator) - expect(e.collect { |r| r }).to eq(@sent_msgs) - th.join - end + it_behaves_like 'bidi streaming' - # disabled because an unresolved wire-protocol implementation feature - # - # - servers should be able initiate messaging, however, as it stand - # servers don't know if all the client metadata has been sent until - # they receive a message from the client. Without receiving all the - # metadata, the server does not accept the call, so this test hangs. - xit 'supports server-initiated ping pong', :bidi => true do - th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, false) - stub = GRPC::ClientStub.new(host, @cq) - op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, - return_op:true) + end + + describe 'via a call operation' do + + def get_responses(stub) + op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, return_op:true) expect(op).to be_a(GRPC::ActiveCall::Operation) e = op.execute expect(e).to be_a(Enumerator) - expect(e.collect { |r| r }).to eq(@sent_msgs) - th.join + e end + it_behaves_like 'bidi streaming' + end end - def run_server_streamer(hostname, expected_input, replys, status) + def run_server_streamer(hostname, expected_input, replys, status, **kw) + wanted_metadata = kw.clone wakey_thread do |mtx, cnd| c = expect_server_to_be_invoked(hostname, mtx, cnd) + wanted_metadata.each do |k, v| + expect(c.metadata[k.to_s]).to eq(v) + end expect(c.remote_read).to eq(expected_input) replys.each { |r| c.remote_send(r) } c.send_status(status, status == @pass ? 'OK' : 'NOK', true) @@ -462,19 +477,27 @@ describe 'ClientStub' do end end - def run_client_streamer(hostname, expected_inputs, resp, status) + def run_client_streamer(hostname, expected_inputs, resp, status, **kw) + wanted_metadata = kw.clone wakey_thread do |mtx, cnd| c = expect_server_to_be_invoked(hostname, mtx, cnd) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } + wanted_metadata.each do |k, v| + expect(c.metadata[k.to_s]).to eq(v) + end c.remote_send(resp) c.send_status(status, status == @pass ? 'OK' : 'NOK', true) end end - def run_request_response(hostname, expected_input, resp, status) + def run_request_response(hostname, expected_input, resp, status, **kw) + wanted_metadata = kw.clone wakey_thread do |mtx, cnd| c = expect_server_to_be_invoked(hostname, mtx, cnd) expect(c.remote_read).to eq(expected_input) + wanted_metadata.each do |k, v| + expect(c.metadata[k.to_s]).to eq(v) + end c.remote_send(resp) c.send_status(status, status == @pass ? 'OK' : 'NOK', true) end @@ -496,9 +519,11 @@ describe 'ClientStub' do test_deadline = Time.now + 10 # fail tests after 10 seconds ev = server_queue.pluck(@server_tag, INFINITE_FUTURE) raise OutOfTime if ev.nil? + server_call = ev.call + server_call.metadata = ev.result.metadata finished_tag = Object.new - ev.call.accept(server_queue, finished_tag) - GRPC::ActiveCall.new(ev.call, server_queue, NOOP, NOOP, INFINITE_FUTURE, + server_call.accept(server_queue, finished_tag) + GRPC::ActiveCall.new(server_call, server_queue, NOOP, NOOP, INFINITE_FUTURE, finished_tag: finished_tag) end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index fc579a6c3f9..7c9b074abfc 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -36,7 +36,7 @@ require 'xray/thread_dump_signal_handler' require_relative '../port_picker' def load_test_certs - test_root = File.join(File.parent(File.dirname(__FILE__)), 'testdata') + test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata') files = ['ca.pem', 'server1.key', 'server1.pem'] files.map { |f| File.open(File.join(test_root, f)).read } end @@ -95,6 +95,7 @@ SlowStub = SlowService.rpc_stub_class describe GRPC::RpcServer do RpcServer = GRPC::RpcServer + StatusCodes = GRPC::Core::StatusCodes before(:each) do @method = 'an_rpc_method' @@ -343,7 +344,7 @@ describe GRPC::RpcServer do stub = GRPC::ClientStub.new(@host, cq, **@client_opts) stub.request_response('/unknown', req, @marshal, @unmarshal) end - expect(&blk).to raise_error BadStatus + expect(&blk).to raise_error GRPC::BadStatus @srv.stop t.join end @@ -402,7 +403,7 @@ describe GRPC::RpcServer do stub = SlowStub.new(@host, **@client_opts) begin stub.an_rpc(req) - rescue BadStatus => e + rescue GRPC::BadStatus => e _1_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE end end