|
|
|
@ -57,21 +57,6 @@ module GRPC |
|
|
|
|
Core::Channel.new(host, kw, creds) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def self.update_with_jwt_aud_uri(a_hash, host, method) |
|
|
|
|
last_slash_idx, res = method.rindex('/'), a_hash.clone |
|
|
|
|
return res if last_slash_idx.nil? |
|
|
|
|
service_name = method[0..(last_slash_idx - 1)] |
|
|
|
|
res[:jwt_aud_uri] = "https://#{host}#{service_name}" |
|
|
|
|
res |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
# check_update_metadata is used by #initialize verify that it's a Proc. |
|
|
|
|
def self.check_update_metadata(update_metadata) |
|
|
|
|
return update_metadata if update_metadata.nil? |
|
|
|
|
fail(TypeError, '!is_a?Proc') unless update_metadata.is_a?(Proc) |
|
|
|
|
update_metadata |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
# Allows users of the stub to modify the propagate mask. |
|
|
|
|
# |
|
|
|
|
# This is an advanced feature for use when making calls to another gRPC |
|
|
|
@ -99,29 +84,21 @@ module GRPC |
|
|
|
|
# - :timeout |
|
|
|
|
# when present, this is the default timeout used for calls |
|
|
|
|
# |
|
|
|
|
# - :update_metadata |
|
|
|
|
# when present, this a func that takes a hash and returns a hash |
|
|
|
|
# it can be used to update metadata, i.e, remove, or amend |
|
|
|
|
# metadata values. |
|
|
|
|
# |
|
|
|
|
# @param host [String] the host the stub connects to |
|
|
|
|
# @param q [Core::CompletionQueue] used to wait for events |
|
|
|
|
# @param channel_override [Core::Channel] a pre-created channel |
|
|
|
|
# @param timeout [Number] the default timeout to use in requests |
|
|
|
|
# @param creds [Core::ChannelCredentials] the channel credentials |
|
|
|
|
# @param update_metadata a func that updates metadata as described above |
|
|
|
|
# @param kw [KeywordArgs]the channel arguments |
|
|
|
|
def initialize(host, q, |
|
|
|
|
channel_override: nil, |
|
|
|
|
timeout: nil, |
|
|
|
|
creds: nil, |
|
|
|
|
propagate_mask: nil, |
|
|
|
|
update_metadata: nil, |
|
|
|
|
**kw) |
|
|
|
|
fail(TypeError, '!CompletionQueue') unless q.is_a?(Core::CompletionQueue) |
|
|
|
|
@queue = q |
|
|
|
|
@ch = ClientStub.setup_channel(channel_override, host, creds, **kw) |
|
|
|
|
@update_metadata = ClientStub.check_update_metadata(update_metadata) |
|
|
|
|
alt_host = kw[Core::Channel::SSL_TARGET] |
|
|
|
|
@host = alt_host.nil? ? host : alt_host |
|
|
|
|
@propagate_mask = propagate_mask |
|
|
|
@ -166,6 +143,8 @@ module GRPC |
|
|
|
|
# @param deadline [Time] (optional) the time the request should complete |
|
|
|
|
# @param parent [Core::Call] a prior call whose reserved metadata |
|
|
|
|
# will be propagated by this one. |
|
|
|
|
# @param credentials [Core::CallCredentials] credentials to use when making |
|
|
|
|
# the call |
|
|
|
|
# @param return_op [true|false] return an Operation if true |
|
|
|
|
# @return [Object] the response received from the server |
|
|
|
|
def request_response(method, req, marshal, unmarshal, |
|
|
|
@ -173,19 +152,20 @@ module GRPC |
|
|
|
|
timeout: nil, |
|
|
|
|
return_op: false, |
|
|
|
|
parent: nil, |
|
|
|
|
credentials: nil, |
|
|
|
|
**kw) |
|
|
|
|
c = new_active_call(method, marshal, unmarshal, |
|
|
|
|
deadline: deadline, |
|
|
|
|
timeout: timeout, |
|
|
|
|
parent: parent) |
|
|
|
|
md = update_metadata(kw, method) |
|
|
|
|
return c.request_response(req, **md) unless return_op |
|
|
|
|
parent: parent, |
|
|
|
|
credentials: credentials) |
|
|
|
|
return c.request_response(req, **kw) unless return_op |
|
|
|
|
|
|
|
|
|
# return the operation view of the active_call; define #execute as a |
|
|
|
|
# new method for this instance that invokes #request_response. |
|
|
|
|
op = c.operation |
|
|
|
|
op.define_singleton_method(:execute) do |
|
|
|
|
c.request_response(req, **md) |
|
|
|
|
c.request_response(req, **kw) |
|
|
|
|
end |
|
|
|
|
op |
|
|
|
|
end |
|
|
|
@ -234,25 +214,28 @@ module GRPC |
|
|
|
|
# @param return_op [true|false] return an Operation if true |
|
|
|
|
# @param parent [Core::Call] a prior call whose reserved metadata |
|
|
|
|
# will be propagated by this one. |
|
|
|
|
# @param credentials [Core::CallCredentials] credentials to use when making |
|
|
|
|
# the call |
|
|
|
|
# @return [Object|Operation] the response received from the server |
|
|
|
|
def client_streamer(method, requests, marshal, unmarshal, |
|
|
|
|
deadline: nil, |
|
|
|
|
timeout: nil, |
|
|
|
|
return_op: false, |
|
|
|
|
parent: nil, |
|
|
|
|
credentials: nil, |
|
|
|
|
**kw) |
|
|
|
|
c = new_active_call(method, marshal, unmarshal, |
|
|
|
|
deadline: deadline, |
|
|
|
|
timeout: timeout, |
|
|
|
|
parent: parent) |
|
|
|
|
md = update_metadata(kw, method) |
|
|
|
|
return c.client_streamer(requests, **md) unless return_op |
|
|
|
|
parent: parent, |
|
|
|
|
credentials: credentials) |
|
|
|
|
return c.client_streamer(requests, **kw) unless return_op |
|
|
|
|
|
|
|
|
|
# return the operation view of the active_call; define #execute as a |
|
|
|
|
# new method for this instance that invokes #client_streamer. |
|
|
|
|
op = c.operation |
|
|
|
|
op.define_singleton_method(:execute) do |
|
|
|
|
c.client_streamer(requests, **md) |
|
|
|
|
c.client_streamer(requests, **kw) |
|
|
|
|
end |
|
|
|
|
op |
|
|
|
|
end |
|
|
|
@ -309,6 +292,8 @@ module GRPC |
|
|
|
|
# @param return_op [true|false]return an Operation if true |
|
|
|
|
# @param parent [Core::Call] a prior call whose reserved metadata |
|
|
|
|
# will be propagated by this one. |
|
|
|
|
# @param credentials [Core::CallCredentials] credentials to use when making |
|
|
|
|
# the call |
|
|
|
|
# @param blk [Block] when provided, is executed for each response |
|
|
|
|
# @return [Enumerator|Operation|nil] as discussed above |
|
|
|
|
def server_streamer(method, req, marshal, unmarshal, |
|
|
|
@ -316,20 +301,21 @@ module GRPC |
|
|
|
|
timeout: nil, |
|
|
|
|
return_op: false, |
|
|
|
|
parent: nil, |
|
|
|
|
credentials: nil, |
|
|
|
|
**kw, |
|
|
|
|
&blk) |
|
|
|
|
c = new_active_call(method, marshal, unmarshal, |
|
|
|
|
deadline: deadline, |
|
|
|
|
timeout: timeout, |
|
|
|
|
parent: parent) |
|
|
|
|
md = update_metadata(kw, method) |
|
|
|
|
return c.server_streamer(req, **md, &blk) unless return_op |
|
|
|
|
parent: parent, |
|
|
|
|
credentials: credentials) |
|
|
|
|
return c.server_streamer(req, **kw, &blk) unless return_op |
|
|
|
|
|
|
|
|
|
# return the operation view of the active_call; define #execute |
|
|
|
|
# as a new method for this instance that invokes #server_streamer |
|
|
|
|
op = c.operation |
|
|
|
|
op.define_singleton_method(:execute) do |
|
|
|
|
c.server_streamer(req, **md, &blk) |
|
|
|
|
c.server_streamer(req, **kw, &blk) |
|
|
|
|
end |
|
|
|
|
op |
|
|
|
|
end |
|
|
|
@ -424,6 +410,8 @@ module GRPC |
|
|
|
|
# @param deadline [Time] (optional) the time the request should complete |
|
|
|
|
# @param parent [Core::Call] a prior call whose reserved metadata |
|
|
|
|
# will be propagated by this one. |
|
|
|
|
# @param credentials [Core::CallCredentials] credentials to use when making |
|
|
|
|
# the call |
|
|
|
|
# @param return_op [true|false] return an Operation if true |
|
|
|
|
# @param blk [Block] when provided, is executed for each response |
|
|
|
|
# @return [Enumerator|nil|Operation] as discussed above |
|
|
|
@ -432,36 +420,28 @@ module GRPC |
|
|
|
|
timeout: nil, |
|
|
|
|
return_op: false, |
|
|
|
|
parent: nil, |
|
|
|
|
credentials: nil, |
|
|
|
|
**kw, |
|
|
|
|
&blk) |
|
|
|
|
c = new_active_call(method, marshal, unmarshal, |
|
|
|
|
deadline: deadline, |
|
|
|
|
timeout: timeout, |
|
|
|
|
parent: parent) |
|
|
|
|
md = update_metadata(kw, method) |
|
|
|
|
return c.bidi_streamer(requests, **md, &blk) unless return_op |
|
|
|
|
parent: parent, |
|
|
|
|
credentials: credentials) |
|
|
|
|
|
|
|
|
|
return c.bidi_streamer(requests, **kw, &blk) unless return_op |
|
|
|
|
|
|
|
|
|
# return the operation view of the active_call; define #execute |
|
|
|
|
# as a new method for this instance that invokes #bidi_streamer |
|
|
|
|
op = c.operation |
|
|
|
|
op.define_singleton_method(:execute) do |
|
|
|
|
c.bidi_streamer(requests, **md, &blk) |
|
|
|
|
c.bidi_streamer(requests, **kw, &blk) |
|
|
|
|
end |
|
|
|
|
op |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
private |
|
|
|
|
|
|
|
|
|
def update_metadata(kw, method) |
|
|
|
|
return kw if @update_metadata.nil? |
|
|
|
|
just_jwt_uri = self.class.update_with_jwt_aud_uri({}, @host, method) |
|
|
|
|
updated = @update_metadata.call(just_jwt_uri) |
|
|
|
|
|
|
|
|
|
# keys should be lowercase |
|
|
|
|
updated = Hash[updated.each_pair.map { |k, v| [k.downcase, v] }] |
|
|
|
|
kw.merge(updated) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
# Creates a new active stub |
|
|
|
|
# |
|
|
|
|
# @param method [string] the method being called. |
|
|
|
@ -473,7 +453,8 @@ module GRPC |
|
|
|
|
def new_active_call(method, marshal, unmarshal, |
|
|
|
|
deadline: nil, |
|
|
|
|
timeout: nil, |
|
|
|
|
parent: nil) |
|
|
|
|
parent: nil, |
|
|
|
|
credentials: nil) |
|
|
|
|
if deadline.nil? |
|
|
|
|
deadline = from_relative_time(timeout.nil? ? @timeout : timeout) |
|
|
|
|
end |
|
|
|
@ -483,6 +464,7 @@ module GRPC |
|
|
|
|
method, |
|
|
|
|
nil, # host use nil, |
|
|
|
|
deadline) |
|
|
|
|
call.set_credentials credentials unless credentials.nil? |
|
|
|
|
ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false) |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|