Merge pull request #2892 from tbetbetbe/grpc-ruby-enable-propagation-between-calls

Grpc ruby enable propagation between calls
pull/2908/head
Michael Lumish 9 years ago
commit 62fee02cef
  1. 42
      src/ruby/ext/grpc/rb_channel.c
  2. 2
      src/ruby/grpc.gemspec
  3. 61
      src/ruby/lib/grpc/generic/client_stub.rb
  4. 2
      src/ruby/spec/call_spec.rb
  5. 4
      src/ruby/spec/channel_spec.rb
  6. 2
      src/ruby/spec/client_server_spec.rb
  7. 2
      src/ruby/spec/generic/active_call_spec.rb

@ -195,18 +195,28 @@ static VALUE grpc_rb_channel_init_copy(VALUE copy, VALUE orig) {
/* Create a call given a grpc_channel, in order to call method. The request /* Create a call given a grpc_channel, in order to call method. The request
is not sent until grpc_call_invoke is called. */ is not sent until grpc_call_invoke is called. */
static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method, static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
VALUE host, VALUE deadline) { VALUE parent, VALUE mask,
VALUE method, VALUE host,
VALUE deadline) {
VALUE res = Qnil; VALUE res = Qnil;
grpc_rb_channel *wrapper = NULL; grpc_rb_channel *wrapper = NULL;
grpc_call *call = NULL; grpc_call *call = NULL;
grpc_call *parent_call = NULL;
grpc_channel *ch = NULL; grpc_channel *ch = NULL;
grpc_completion_queue *cq = NULL; grpc_completion_queue *cq = NULL;
int flags = GRPC_PROPAGATE_DEFAULTS;
char *method_chars = StringValueCStr(method); char *method_chars = StringValueCStr(method);
char *host_chars = NULL; char *host_chars = NULL;
if (host != Qnil) { if (host != Qnil) {
host_chars = StringValueCStr(host); host_chars = StringValueCStr(host);
} }
if (mask != Qnil) {
flags = NUM2UINT(mask);
}
if (parent != Qnil) {
parent_call = grpc_rb_get_wrapped_call(parent);
}
cq = grpc_rb_get_wrapped_completion_queue(cqueue); cq = grpc_rb_get_wrapped_completion_queue(cqueue);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
@ -216,10 +226,10 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method,
return Qnil; return Qnil;
} }
call = grpc_channel_create_call(ch, NULL, GRPC_PROPAGATE_DEFAULTS, cq, call = grpc_channel_create_call(ch, parent_call, flags, cq, method_chars,
method_chars, host_chars, host_chars, grpc_rb_time_timeval(
grpc_rb_time_timeval(deadline, deadline,
/* absolute time */ 0)); /* absolute time */ 0));
if (call == NULL) { if (call == NULL) {
rb_raise(rb_eRuntimeError, "cannot create call with method %s", rb_raise(rb_eRuntimeError, "cannot create call with method %s",
method_chars); method_chars);
@ -237,6 +247,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method,
return res; return res;
} }
/* Closes the channel, calling it's destroy method */ /* Closes the channel, calling it's destroy method */
static VALUE grpc_rb_channel_destroy(VALUE self) { static VALUE grpc_rb_channel_destroy(VALUE self) {
grpc_rb_channel *wrapper = NULL; grpc_rb_channel *wrapper = NULL;
@ -268,6 +279,22 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
return res; return res;
} }
static void Init_grpc_propagate_masks() {
/* Constants representing call propagation masks in grpc.h */
VALUE grpc_rb_mPropagateMasks = rb_define_module_under(
grpc_rb_mGrpcCore, "PropagateMasks");
rb_define_const(grpc_rb_mPropagateMasks, "DEADLINE",
UINT2NUM(GRPC_PROPAGATE_DEADLINE));
rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_STATS_CONTEXT",
UINT2NUM(GRPC_PROPAGATE_CENSUS_STATS_CONTEXT));
rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_TRACING_CONTEXT",
UINT2NUM(GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT));
rb_define_const(grpc_rb_mPropagateMasks, "CANCELLATION",
UINT2NUM(GRPC_PROPAGATE_CANCELLATION));
rb_define_const(grpc_rb_mPropagateMasks, "DEFAULTS",
UINT2NUM(GRPC_PROPAGATE_DEFAULTS));
}
void Init_grpc_channel() { void Init_grpc_channel() {
grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject); grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
grpc_rb_cChannel = grpc_rb_cChannel =
@ -283,7 +310,7 @@ void Init_grpc_channel() {
/* Add ruby analogues of the Channel methods. */ /* Add ruby analogues of the Channel methods. */
rb_define_method(grpc_rb_cChannel, "create_call", rb_define_method(grpc_rb_cChannel, "create_call",
grpc_rb_channel_create_call, 4); grpc_rb_channel_create_call, 6);
rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0); rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0); rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
rb_define_alias(grpc_rb_cChannel, "close", "destroy"); rb_define_alias(grpc_rb_cChannel, "close", "destroy");
@ -299,6 +326,7 @@ void Init_grpc_channel() {
ID2SYM(rb_intern(GRPC_ARG_MAX_CONCURRENT_STREAMS))); ID2SYM(rb_intern(GRPC_ARG_MAX_CONCURRENT_STREAMS)));
rb_define_const(grpc_rb_cChannel, "MAX_MESSAGE_LENGTH", rb_define_const(grpc_rb_cChannel, "MAX_MESSAGE_LENGTH",
ID2SYM(rb_intern(GRPC_ARG_MAX_MESSAGE_LENGTH))); ID2SYM(rb_intern(GRPC_ARG_MAX_MESSAGE_LENGTH)));
Init_grpc_propagate_masks();
} }
/* Gets the wrapped channel from the ruby wrapper */ /* Gets the wrapped channel from the ruby wrapper */

@ -22,7 +22,7 @@ Gem::Specification.new do |s|
s.files += Dir.glob('bin/**/*') s.files += Dir.glob('bin/**/*')
s.test_files = Dir.glob('spec/**/*') s.test_files = Dir.glob('spec/**/*')
%w(math noproto).each do |b| %w(math noproto).each do |b|
s.executables += [ "#{b}_client.rb", "#{b}_server.rb" ] s.executables += ["#{b}_client.rb", "#{b}_server.rb"]
end end
s.require_paths = %w( bin lib ) s.require_paths = %w( bin lib )
s.platform = Gem::Platform::RUBY s.platform = Gem::Platform::RUBY

@ -32,6 +32,8 @@ require 'grpc/version'
# GRPC contains the General RPC module. # GRPC contains the General RPC module.
module GRPC module GRPC
# rubocop:disable Metrics/ParameterLists
# ClientStub represents an endpoint used to send requests to GRPC servers. # ClientStub represents an endpoint used to send requests to GRPC servers.
class ClientStub class ClientStub
include Core::StatusCodes include Core::StatusCodes
@ -68,6 +70,12 @@ module GRPC
update_metadata update_metadata
end end
# Allows users of the stub to modify the propagate mask.
#
# This is an advanced feature for use when making calls to another gRPC
# server whilst running in the handler of an existing one.
attr_writer :propagate_mask
# Creates a new ClientStub. # Creates a new ClientStub.
# #
# Minimally, a stub is created with the just the host of the gRPC service # Minimally, a stub is created with the just the host of the gRPC service
@ -91,8 +99,8 @@ module GRPC
# #
# - :update_metadata # - :update_metadata
# when present, this a func that takes a hash and returns a hash # when present, this a func that takes a hash and returns a hash
# it can be used to update metadata, i.e, remove, change or update # it can be used to update metadata, i.e, remove, or amend
# amend metadata values. # metadata values.
# #
# @param host [String] the host the stub connects to # @param host [String] the host the stub connects to
# @param q [Core::CompletionQueue] used to wait for events # @param q [Core::CompletionQueue] used to wait for events
@ -105,6 +113,7 @@ module GRPC
channel_override: nil, channel_override: nil,
timeout: nil, timeout: nil,
creds: nil, creds: nil,
propagate_mask: nil,
update_metadata: nil, update_metadata: nil,
**kw) **kw)
fail(TypeError, '!CompletionQueue') unless q.is_a?(Core::CompletionQueue) fail(TypeError, '!CompletionQueue') unless q.is_a?(Core::CompletionQueue)
@ -113,6 +122,7 @@ module GRPC
@update_metadata = ClientStub.check_update_metadata(update_metadata) @update_metadata = ClientStub.check_update_metadata(update_metadata)
alt_host = kw[Core::Channel::SSL_TARGET] alt_host = kw[Core::Channel::SSL_TARGET]
@host = alt_host.nil? ? host : alt_host @host = alt_host.nil? ? host : alt_host
@propagate_mask = propagate_mask
@timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout @timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout
end end
@ -151,11 +161,15 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshals requests # @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds # @param timeout [Numeric] (optional) the max completion time in seconds
# @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @param return_op [true|false] return an Operation if true # @param return_op [true|false] return an Operation if true
# @return [Object] the response received from the server # @return [Object] the response received from the server
def request_response(method, req, marshal, unmarshal, timeout = nil, def request_response(method, req, marshal, unmarshal, timeout = nil,
return_op: false, **kw) return_op: false,
c = new_active_call(method, marshal, unmarshal, timeout) parent: parent,
**kw)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.request_response(req, **md) unless return_op return c.request_response(req, **md) unless return_op
@ -210,10 +224,14 @@ module GRPC
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] the max completion time in seconds # @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false] return an Operation if true # @param return_op [true|false] return an Operation if true
# @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @return [Object|Operation] the response received from the server # @return [Object|Operation] the response received from the server
def client_streamer(method, requests, marshal, unmarshal, timeout = nil, def client_streamer(method, requests, marshal, unmarshal, timeout = nil,
return_op: false, **kw) return_op: false,
c = new_active_call(method, marshal, unmarshal, timeout) parent: nil,
**kw)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.client_streamer(requests, **md) unless return_op return c.client_streamer(requests, **md) unless return_op
@ -276,11 +294,16 @@ module GRPC
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] the max completion time in seconds # @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false]return an Operation if true # @param return_op [true|false]return an Operation if true
# @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @param blk [Block] when provided, is executed for each response # @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above # @return [Enumerator|Operation|nil] as discussed above
def server_streamer(method, req, marshal, unmarshal, timeout = nil, def server_streamer(method, req, marshal, unmarshal, timeout = nil,
return_op: false, **kw, &blk) return_op: false,
c = new_active_call(method, marshal, unmarshal, timeout) parent: nil,
**kw,
&blk)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.server_streamer(req, **md, &blk) unless return_op return c.server_streamer(req, **md, &blk) unless return_op
@ -381,12 +404,17 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshals requests # @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds # @param timeout [Numeric] (optional) the max completion time in seconds
# @param blk [Block] when provided, is executed for each response # @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @param return_op [true|false] return an Operation if true # @param return_op [true|false] return an Operation if true
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|nil|Operation] as discussed above # @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil, def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil,
return_op: false, **kw, &blk) return_op: false,
c = new_active_call(method, marshal, unmarshal, timeout) parent: nil,
**kw,
&blk)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.bidi_streamer(requests, **md, &blk) unless return_op return c.bidi_streamer(requests, **md, &blk) unless return_op
@ -407,10 +435,17 @@ module GRPC
# @param method [string] the method being called. # @param method [string] the method being called.
# @param marshal [Function] f(obj)->string that marshals requests # @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param parent [Grpc::Call] a parent call, available when calls are
# made from server
# @param timeout [TimeConst] # @param timeout [TimeConst]
def new_active_call(method, marshal, unmarshal, timeout = nil) def new_active_call(method, marshal, unmarshal, timeout = nil, parent: nil)
deadline = from_relative_time(timeout.nil? ? @timeout : timeout) deadline = from_relative_time(timeout.nil? ? @timeout : timeout)
call = @ch.create_call(@queue, method, nil, deadline) call = @ch.create_call(@queue,
parent, # parent call
@propagate_mask, # propagation options
method,
nil, # host use nil,
deadline)
ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false) ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false)
end end
end end

@ -137,7 +137,7 @@ describe GRPC::Core::Call do
end end
def make_test_call def make_test_call
@ch.create_call(client_queue, 'dummy_method', nil, deadline) @ch.create_call(client_queue, nil, nil, 'dummy_method', nil, deadline)
end end
def deadline def deadline

@ -117,7 +117,7 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5 deadline = Time.now + 5
blk = proc do blk = proc do
ch.create_call(cq, 'dummy_method', nil, deadline) ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline)
end end
expect(&blk).to_not raise_error expect(&blk).to_not raise_error
end end
@ -128,7 +128,7 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5 deadline = Time.now + 5
blk = proc do blk = proc do
ch.create_call(cq, 'dummy_method', nil, deadline) ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline)
end end
expect(&blk).to raise_error(RuntimeError) expect(&blk).to raise_error(RuntimeError)
end end

@ -61,7 +61,7 @@ shared_context 'setup: tags' do
end end
def new_client_call def new_client_call
@ch.create_call(@client_queue, '/method', nil, deadline) @ch.create_call(@client_queue, nil, nil, '/method', nil, deadline)
end end
end end

@ -338,7 +338,7 @@ describe GRPC::ActiveCall do
end end
def make_test_call def make_test_call
@ch.create_call(@client_queue, '/method', nil, deadline) @ch.create_call(@client_queue, nil, nil, '/method', nil, deadline)
end end
def deadline def deadline

Loading…
Cancel
Save