Merge pull request #567 from tbetbetbe/grpc_ruby_rename_modules

Grpc ruby rename modules
pull/366/merge
Michael Lumish 10 years ago
commit 3c665f56be
  1. 4
      src/ruby/ext/grpc/rb_byte_buffer.c
  2. 2
      src/ruby/ext/grpc/rb_byte_buffer.h
  3. 12
      src/ruby/ext/grpc/rb_call.c
  4. 2
      src/ruby/ext/grpc/rb_call.h
  5. 4
      src/ruby/ext/grpc/rb_channel.c
  6. 2
      src/ruby/ext/grpc/rb_channel.h
  7. 4
      src/ruby/ext/grpc/rb_completion_queue.c
  8. 2
      src/ruby/ext/grpc/rb_completion_queue.h
  9. 4
      src/ruby/ext/grpc/rb_credentials.c
  10. 2
      src/ruby/ext/grpc/rb_credentials.h
  11. 8
      src/ruby/ext/grpc/rb_event.c
  12. 2
      src/ruby/ext/grpc/rb_event.h
  13. 46
      src/ruby/ext/grpc/rb_grpc.c
  14. 7
      src/ruby/ext/grpc/rb_grpc.h
  15. 4
      src/ruby/ext/grpc/rb_metadata.c
  16. 2
      src/ruby/ext/grpc/rb_metadata.h
  17. 4
      src/ruby/ext/grpc/rb_server.c
  18. 2
      src/ruby/ext/grpc/rb_server.h
  19. 4
      src/ruby/ext/grpc/rb_server_credentials.c
  20. 2
      src/ruby/ext/grpc/rb_server_credentials.h
  21. 3
      src/ruby/lib/grpc.rb
  22. 60
      src/ruby/lib/grpc/auth/compute_engine.rb
  23. 42
      src/ruby/lib/grpc/auth/service_account.rb
  24. 21
      src/ruby/lib/grpc/core/event.rb
  25. 69
      src/ruby/lib/grpc/core/time_consts.rb
  26. 50
      src/ruby/lib/grpc/errors.rb
  27. 904
      src/ruby/lib/grpc/generic/active_call.rb
  28. 318
      src/ruby/lib/grpc/generic/bidi_call.rb
  29. 704
      src/ruby/lib/grpc/generic/client_stub.rb
  30. 201
      src/ruby/lib/grpc/generic/rpc_desc.rb
  31. 640
      src/ruby/lib/grpc/generic/rpc_server.rb
  32. 312
      src/ruby/lib/grpc/generic/service.rb
  33. 6
      src/ruby/lib/grpc/logconfig.rb
  34. 8
      src/ruby/lib/grpc/version.rb
  35. 4
      src/ruby/spec/auth/compute_engine_spec.rb
  36. 4
      src/ruby/spec/auth/service_account_spec.rb

@ -202,9 +202,9 @@ static VALUE grpc_rb_byte_buffer_init(VALUE self, VALUE src) {
/* rb_cByteBuffer is the ruby class that proxies grpc_byte_buffer. */
VALUE rb_cByteBuffer = Qnil;
void Init_google_rpc_byte_buffer() {
void Init_grpc_byte_buffer() {
rb_cByteBuffer =
rb_define_class_under(rb_mGoogleRpcCore, "ByteBuffer", rb_cObject);
rb_define_class_under(rb_mGrpcCore, "ByteBuffer", rb_cObject);
/* Allocates an object managed by the ruby runtime */
rb_define_alloc_func(rb_cByteBuffer, grpc_rb_byte_buffer_alloc);

@ -42,7 +42,7 @@
extern VALUE rb_cByteBuffer;
/* Initializes the ByteBuffer class. */
void Init_google_rpc_byte_buffer();
void Init_grpc_byte_buffer();
/* grpc_rb_byte_buffer_create_with_mark creates a grpc_rb_byte_buffer with a
* ruby mark object that will be kept alive while the byte_buffer is alive. */

@ -449,9 +449,9 @@ VALUE rb_cCall = Qnil;
operations; */
VALUE rb_eCallError = Qnil;
void Init_google_rpc_error_codes() {
void Init_grpc_error_codes() {
/* Constants representing the error codes of grpc_call_error in grpc.h */
VALUE rb_RpcErrors = rb_define_module_under(rb_mGoogleRpcCore, "RpcErrors");
VALUE rb_RpcErrors = rb_define_module_under(rb_mGrpcCore, "RpcErrors");
rb_define_const(rb_RpcErrors, "OK", UINT2NUM(GRPC_CALL_OK));
rb_define_const(rb_RpcErrors, "ERROR", UINT2NUM(GRPC_CALL_ERROR));
rb_define_const(rb_RpcErrors, "NOT_ON_SERVER",
@ -500,11 +500,11 @@ void Init_google_rpc_error_codes() {
rb_obj_freeze(rb_error_code_details);
}
void Init_google_rpc_call() {
void Init_grpc_call() {
/* CallError inherits from Exception to signal that it is non-recoverable */
rb_eCallError =
rb_define_class_under(rb_mGoogleRpcCore, "CallError", rb_eException);
rb_cCall = rb_define_class_under(rb_mGoogleRpcCore, "Call", rb_cObject);
rb_define_class_under(rb_mGrpcCore, "CallError", rb_eException);
rb_cCall = rb_define_class_under(rb_mGrpcCore, "Call", rb_cObject);
/* Prevent allocation or inialization of the Call class */
rb_define_alloc_func(rb_cCall, grpc_rb_cannot_alloc);
@ -542,7 +542,7 @@ void Init_google_rpc_call() {
hash_all_calls = rb_hash_new();
rb_define_const(rb_cCall, "INTERNAL_ALL_CALLs", hash_all_calls);
Init_google_rpc_error_codes();
Init_grpc_error_codes();
}
/* Gets the call from the ruby object */

@ -54,6 +54,6 @@ extern VALUE rb_cCall;
extern VALUE rb_eCallError;
/* Initializes the Call class. */
void Init_google_rpc_call();
void Init_grpc_call();
#endif /* GRPC_RB_CALL_H_ */

@ -227,9 +227,9 @@ static VALUE grpc_rb_channel_destroy(VALUE self) {
/* rb_cChannel is the ruby class that proxies grpc_channel. */
VALUE rb_cChannel = Qnil;
void Init_google_rpc_channel() {
void Init_grpc_channel() {
rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
rb_cChannel = rb_define_class_under(rb_mGoogleRpcCore, "Channel", rb_cObject);
rb_cChannel = rb_define_class_under(rb_mGrpcCore, "Channel", rb_cObject);
/* Allocates an object managed by the ruby runtime */
rb_define_alloc_func(rb_cChannel, grpc_rb_channel_alloc);

@ -41,7 +41,7 @@
extern VALUE rb_cChannel;
/* Initializes the Channel class. */
void Init_google_rpc_channel();
void Init_grpc_channel();
/* Gets the wrapped channel from the ruby wrapper */
grpc_channel* grpc_rb_get_wrapped_channel(VALUE v);

@ -159,9 +159,9 @@ static VALUE grpc_rb_completion_queue_pluck(VALUE self, VALUE tag,
/* rb_cCompletionQueue is the ruby class that proxies grpc_completion_queue. */
VALUE rb_cCompletionQueue = Qnil;
void Init_google_rpc_completion_queue() {
void Init_grpc_completion_queue() {
rb_cCompletionQueue =
rb_define_class_under(rb_mGoogleRpcCore, "CompletionQueue", rb_cObject);
rb_define_class_under(rb_mGrpcCore, "CompletionQueue", rb_cObject);
/* constructor: uses an alloc func without an initializer. Using a simple
alloc func works here as the grpc header does not specify any args for

@ -45,6 +45,6 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
extern VALUE rb_cCompletionQueue;
/* Initializes the CompletionQueue class. */
void Init_google_rpc_completion_queue();
void Init_grpc_completion_queue();
#endif /* GRPC_RB_COMPLETION_QUEUE_H_ */

@ -245,9 +245,9 @@ static VALUE grpc_rb_credentials_init(int argc, VALUE *argv, VALUE self) {
/* rb_cCredentials is the ruby class that proxies grpc_credentials. */
VALUE rb_cCredentials = Qnil;
void Init_google_rpc_credentials() {
void Init_grpc_credentials() {
rb_cCredentials =
rb_define_class_under(rb_mGoogleRpcCore, "Credentials", rb_cObject);
rb_define_class_under(rb_mGrpcCore, "Credentials", rb_cObject);
/* Allocates an object managed by the ruby runtime */
rb_define_alloc_func(rb_cCredentials, grpc_rb_credentials_alloc);

@ -42,7 +42,7 @@
extern VALUE rb_cCredentials;
/* Initializes the ruby Credentials class. */
void Init_google_rpc_credentials();
void Init_grpc_credentials();
/* Gets the wrapped credentials from the ruby wrapper */
grpc_credentials* grpc_rb_get_wrapped_credentials(VALUE v);

@ -312,10 +312,10 @@ VALUE rb_cEvent = Qnil;
rpc event processing. */
VALUE rb_eEventError = Qnil;
void Init_google_rpc_event() {
void Init_grpc_event() {
rb_eEventError =
rb_define_class_under(rb_mGoogleRpcCore, "EventError", rb_eStandardError);
rb_cEvent = rb_define_class_under(rb_mGoogleRpcCore, "Event", rb_cObject);
rb_define_class_under(rb_mGrpcCore, "EventError", rb_eStandardError);
rb_cEvent = rb_define_class_under(rb_mGrpcCore, "Event", rb_cObject);
/* Prevent allocation or inialization from ruby. */
rb_define_alloc_func(rb_cEvent, grpc_rb_cannot_alloc);
@ -332,7 +332,7 @@ void Init_google_rpc_event() {
/* Constants representing the completion types */
rb_mCompletionType =
rb_define_module_under(rb_mGoogleRpcCore, "CompletionType");
rb_define_module_under(rb_mGrpcCore, "CompletionType");
rb_define_const(rb_mCompletionType, "QUEUE_SHUTDOWN",
INT2NUM(GRPC_QUEUE_SHUTDOWN));
rb_define_const(rb_mCompletionType, "OP_COMPLETE", INT2NUM(GRPC_OP_COMPLETE));

@ -48,6 +48,6 @@ extern VALUE rb_eEventError;
VALUE grpc_rb_new_event(grpc_event *ev);
/* Initializes the Event and EventError classes. */
void Init_google_rpc_event();
void Init_grpc_event();
#endif /* GRPC_RB_EVENT_H_ */

@ -153,10 +153,10 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval) {
return t;
}
void Init_google_status_codes() {
void Init_grpc_status_codes() {
/* Constants representing the status codes or grpc_status_code in status.h */
VALUE rb_mStatusCodes =
rb_define_module_under(rb_mGoogleRpcCore, "StatusCodes");
rb_define_module_under(rb_mGrpcCore, "StatusCodes");
rb_define_const(rb_mStatusCodes, "OK", INT2NUM(GRPC_STATUS_OK));
rb_define_const(rb_mStatusCodes, "CANCELLED", INT2NUM(GRPC_STATUS_CANCELLED));
rb_define_const(rb_mStatusCodes, "UNKNOWN", INT2NUM(GRPC_STATUS_UNKNOWN));
@ -214,11 +214,11 @@ VALUE grpc_rb_time_val_to_s(VALUE self) {
}
/* Adds a module with constants that map to gpr's static timeval structs. */
void Init_google_time_consts() {
void Init_grpc_time_consts() {
VALUE rb_mTimeConsts =
rb_define_module_under(rb_mGoogleRpcCore, "TimeConsts");
rb_define_module_under(rb_mGrpcCore, "TimeConsts");
rb_cTimeVal =
rb_define_class_under(rb_mGoogleRpcCore, "TimeSpec", rb_cObject);
rb_define_class_under(rb_mGrpcCore, "TimeSpec", rb_cObject);
rb_define_const(rb_mTimeConsts, "ZERO",
Data_Wrap_Struct(rb_cTimeVal, GC_NOT_MARKED, GC_DONT_FREE,
(void *)&gpr_time_0));
@ -240,37 +240,35 @@ void Init_google_time_consts() {
void grpc_rb_shutdown(void *vm) { grpc_shutdown(); }
/* Initialize the Google RPC module structs */
/* Initialize the GRPC module structs */
/* rb_sNewServerRpc is the struct that holds new server rpc details. */
VALUE rb_sNewServerRpc = Qnil;
/* rb_sStatus is the struct that holds status details. */
VALUE rb_sStatus = Qnil;
/* Initialize the Google RPC module. */
VALUE rb_mGoogle = Qnil;
VALUE rb_mGoogleRPC = Qnil;
VALUE rb_mGoogleRpcCore = Qnil;
/* Initialize the GRPC module. */
VALUE rb_mGRPC = Qnil;
VALUE rb_mGrpcCore = Qnil;
void Init_grpc() {
grpc_init();
ruby_vm_at_exit(grpc_rb_shutdown);
rb_mGoogle = rb_define_module("Google");
rb_mGoogleRPC = rb_define_module_under(rb_mGoogle, "RPC");
rb_mGoogleRpcCore = rb_define_module_under(rb_mGoogleRPC, "Core");
rb_mGRPC = rb_define_module("GRPC");
rb_mGrpcCore = rb_define_module_under(rb_mGRPC, "Core");
rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host",
"deadline", "metadata", NULL);
rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL);
Init_google_rpc_byte_buffer();
Init_google_rpc_event();
Init_google_rpc_channel();
Init_google_rpc_completion_queue();
Init_google_rpc_call();
Init_google_rpc_credentials();
Init_google_rpc_metadata();
Init_google_rpc_server();
Init_google_rpc_server_credentials();
Init_google_status_codes();
Init_google_time_consts();
Init_grpc_byte_buffer();
Init_grpc_event();
Init_grpc_channel();
Init_grpc_completion_queue();
Init_grpc_call();
Init_grpc_credentials();
Init_grpc_metadata();
Init_grpc_server();
Init_grpc_server_credentials();
Init_grpc_status_codes();
Init_grpc_time_consts();
}

@ -38,11 +38,8 @@
#include <ruby.h>
#include <grpc/support/time.h>
/* rb_mGoogle is the top-level Google module. */
extern VALUE rb_mGoogle;
/* rb_mGoogleRpcCore is the module containing the ruby wrapper GRPC classes. */
extern VALUE rb_mGoogleRpcCore;
/* rb_mGrpcCore is the module containing the ruby wrapper GRPC classes. */
extern VALUE rb_mGrpcCore;
/* Class used to wrap timeval structs. */
extern VALUE rb_cTimeVal;

@ -187,9 +187,9 @@ static VALUE grpc_rb_metadata_value(VALUE self) {
/* rb_cMetadata is the Metadata class whose instances proxy grpc_metadata. */
VALUE rb_cMetadata = Qnil;
void Init_google_rpc_metadata() {
void Init_grpc_metadata() {
rb_cMetadata =
rb_define_class_under(rb_mGoogleRpcCore, "Metadata", rb_cObject);
rb_define_class_under(rb_mGrpcCore, "Metadata", rb_cObject);
/* Allocates an object managed by the ruby runtime */
rb_define_alloc_func(rb_cMetadata, grpc_rb_metadata_alloc);

@ -48,6 +48,6 @@ extern VALUE grpc_rb_metadata_create_with_mark(VALUE mark, grpc_metadata* md);
grpc_metadata* grpc_rb_get_wrapped_metadata(VALUE v);
/* Initializes the Metadata class. */
void Init_google_rpc_metadata();
void Init_grpc_metadata();
#endif /* GRPC_RB_METADATA_H_ */

@ -251,8 +251,8 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) {
return INT2NUM(recvd_port);
}
void Init_google_rpc_server() {
rb_cServer = rb_define_class_under(rb_mGoogleRpcCore, "Server", rb_cObject);
void Init_grpc_server() {
rb_cServer = rb_define_class_under(rb_mGrpcCore, "Server", rb_cObject);
/* Allocates an object managed by the ruby runtime */
rb_define_alloc_func(rb_cServer, grpc_rb_server_alloc);

@ -42,7 +42,7 @@
extern VALUE rb_cServer;
/* Initializes the Server class. */
void Init_google_rpc_server();
void Init_grpc_server();
/* Gets the wrapped server from the ruby wrapper */
grpc_server* grpc_rb_get_wrapped_server(VALUE v);

@ -184,9 +184,9 @@ static VALUE grpc_rb_server_credentials_init(VALUE self, VALUE pem_root_certs,
grpc_server_credentials. */
VALUE rb_cServerCredentials = Qnil;
void Init_google_rpc_server_credentials() {
void Init_grpc_server_credentials() {
rb_cServerCredentials =
rb_define_class_under(rb_mGoogleRpcCore, "ServerCredentials", rb_cObject);
rb_define_class_under(rb_mGrpcCore, "ServerCredentials", rb_cObject);
/* Allocates an object managed by the ruby runtime */
rb_define_alloc_func(rb_cServerCredentials, grpc_rb_server_credentials_alloc);

@ -42,7 +42,7 @@
extern VALUE rb_cServerCredentials;
/* Initializes the ruby ServerCredentials class. */
void Init_google_rpc_server_credentials();
void Init_grpc_server_credentials();
/* Gets the wrapped server_credentials from the ruby wrapper */
grpc_server_credentials* grpc_rb_get_wrapped_server_credentials(VALUE v);

@ -39,6 +39,3 @@ require 'grpc/generic/active_call'
require 'grpc/generic/client_stub'
require 'grpc/generic/service'
require 'grpc/generic/rpc_server'
# alias GRPC
GRPC = Google::RPC

@ -30,39 +30,37 @@
require 'faraday'
require 'grpc/auth/signet'
module Google
module RPC
# Module Auth provides classes that provide Google-specific authentication
# used to access Google gRPC services.
module Auth
# Extends Signet::OAuth2::Client so that the auth token is obtained from
# the GCE metadata server.
class GCECredentials < Signet::OAuth2::Client
COMPUTE_AUTH_TOKEN_URI = 'http://metadata/computeMetadata/v1/'\
'instance/service-accounts/default/token'
COMPUTE_CHECK_URI = 'http://metadata.google.internal'
module GRPC
# Module Auth provides classes that provide Google-specific authentication
# used to access Google gRPC services.
module Auth
# Extends Signet::OAuth2::Client so that the auth token is obtained from
# the GCE metadata server.
class GCECredentials < Signet::OAuth2::Client
COMPUTE_AUTH_TOKEN_URI = 'http://metadata/computeMetadata/v1/'\
'instance/service-accounts/default/token'
COMPUTE_CHECK_URI = 'http://metadata.google.internal'
# Detect if this appear to be a GCE instance, by checking if metadata
# is available
def self.on_gce?(options = {})
c = options[:connection] || Faraday.default_connection
resp = c.get(COMPUTE_CHECK_URI)
return false unless resp.status == 200
return false unless resp.headers.key?('Metadata-Flavor')
return resp.headers['Metadata-Flavor'] == 'Google'
rescue Faraday::ConnectionFailed
return false
end
# Detect if this appear to be a GCE instance, by checking if metadata
# is available
def self.on_gce?(options = {})
c = options[:connection] || Faraday.default_connection
resp = c.get(COMPUTE_CHECK_URI)
return false unless resp.status == 200
return false unless resp.headers.key?('Metadata-Flavor')
return resp.headers['Metadata-Flavor'] == 'Google'
rescue Faraday::ConnectionFailed
return false
end
# Overrides the super class method to change how access tokens are
# fetched.
def fetch_access_token(options = {})
c = options[:connection] || Faraday.default_connection
c.headers = { 'Metadata-Flavor' => 'Google' }
resp = c.get(COMPUTE_AUTH_TOKEN_URI)
Signet::OAuth2.parse_credentials(resp.body,
resp.headers['content-type'])
end
# Overrides the super class method to change how access tokens are
# fetched.
def fetch_access_token(options = {})
c = options[:connection] || Faraday.default_connection
c.headers = { 'Metadata-Flavor' => 'Google' }
resp = c.get(COMPUTE_AUTH_TOKEN_URI)
Signet::OAuth2.parse_credentials(resp.body,
resp.headers['content-type'])
end
end
end

@ -39,29 +39,27 @@ def read_json_key(json_key_io)
[json_key['private_key'], json_key['client_email']]
end
module Google
module RPC
# Module Auth provides classes that provide Google-specific authentication
# used to access Google gRPC services.
module Auth
# Authenticates requests using Google's Service Account credentials.
# (cf https://developers.google.com/accounts/docs/OAuth2ServiceAccount)
class ServiceAccountCredentials < Signet::OAuth2::Client
TOKEN_CRED_URI = 'https://www.googleapis.com/oauth2/v3/token'
AUDIENCE = TOKEN_CRED_URI
module GRPC
# Module Auth provides classes that provide Google-specific authentication
# used to access Google gRPC services.
module Auth
# Authenticates requests using Google's Service Account credentials.
# (cf https://developers.google.com/accounts/docs/OAuth2ServiceAccount)
class ServiceAccountCredentials < Signet::OAuth2::Client
TOKEN_CRED_URI = 'https://www.googleapis.com/oauth2/v3/token'
AUDIENCE = TOKEN_CRED_URI
# Initializes a ServiceAccountCredentials.
#
# @param scope [string|array] the scope(s) to access
# @param json_key_io [IO] an IO from which the JSON key can be read
def initialize(scope, json_key_io)
private_key, client_email = read_json_key(json_key_io)
super(token_credential_uri: TOKEN_CRED_URI,
audience: AUDIENCE,
scope: scope,
issuer: client_email,
signing_key: OpenSSL::PKey::RSA.new(private_key))
end
# Initializes a ServiceAccountCredentials.
#
# @param scope [string|array] the scope(s) to access
# @param json_key_io [IO] an IO from which the JSON key can be read
def initialize(scope, json_key_io)
private_key, client_email = read_json_key(json_key_io)
super(token_credential_uri: TOKEN_CRED_URI,
audience: AUDIENCE,
scope: scope,
issuer: client_email,
signing_key: OpenSSL::PKey::RSA.new(private_key))
end
end
end

@ -27,16 +27,17 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
module Google
module RPC
module Core
# Event is a class defined in the c extension
#
# Here, we add an inspect method.
class Event
def inspect
"<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>"
end
require 'grpc'
# GRPC contains the General RPC module.
module GRPC
module Core
# Event is a class defined in the c extension
#
# Here, we add an inspect method.
class Event
def inspect
"<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>"
end
end
end

@ -29,44 +29,43 @@
require 'grpc'
module Google
module RPC
module Core
# TimeConsts is a module from the C extension.
# GRPC contains the General RPC module.
module GRPC
module Core
# TimeConsts is a module from the C extension.
#
# Here it's re-opened to add a utility func.
module TimeConsts
# Converts a time delta to an absolute deadline.
#
# Here it's re-opened to add a utility func.
module TimeConsts
# Converts a time delta to an absolute deadline.
#
# Assumes timeish is a relative time, and converts its to an absolute,
# with following exceptions:
#
# * if timish is one of the TimeConsts.TimeSpec constants the value is
# preserved.
# * timish < 0 => TimeConsts.INFINITE_FUTURE
# * timish == 0 => TimeConsts.ZERO
#
# @param timeish [Number|TimeSpec]
# @return timeish [Number|TimeSpec]
def from_relative_time(timeish)
if timeish.is_a? TimeSpec
timeish
elsif timeish.nil?
TimeConsts::ZERO
elsif !timeish.is_a? Numeric
fail(TypeError,
"Cannot make an absolute deadline from #{timeish.inspect}")
elsif timeish < 0
TimeConsts::INFINITE_FUTURE
elsif timeish == 0
TimeConsts::ZERO
else
Time.now + timeish
end
# Assumes timeish is a relative time, and converts its to an absolute,
# with following exceptions:
#
# * if timish is one of the TimeConsts.TimeSpec constants the value is
# preserved.
# * timish < 0 => TimeConsts.INFINITE_FUTURE
# * timish == 0 => TimeConsts.ZERO
#
# @param timeish [Number|TimeSpec]
# @return timeish [Number|TimeSpec]
def from_relative_time(timeish)
if timeish.is_a? TimeSpec
timeish
elsif timeish.nil?
TimeConsts::ZERO
elsif !timeish.is_a? Numeric
fail(TypeError,
"Cannot make an absolute deadline from #{timeish.inspect}")
elsif timeish < 0
TimeConsts::INFINITE_FUTURE
elsif timeish == 0
TimeConsts::ZERO
else
Time.now + timeish
end
module_function :from_relative_time
end
module_function :from_relative_time
end
end
end

@ -29,35 +29,33 @@
require 'grpc'
module Google
# Google::RPC contains the General RPC module.
module RPC
# OutOfTime is an exception class that indicates that an RPC exceeded its
# deadline.
OutOfTime = Class.new(StandardError)
# GRPC contains the General RPC module.
module GRPC
# OutOfTime is an exception class that indicates that an RPC exceeded its
# deadline.
OutOfTime = Class.new(StandardError)
# BadStatus is an exception class that indicates that an error occurred at
# either end of a GRPC connection. When raised, it indicates that a status
# error should be returned to the other end of a GRPC connection; when
# caught it means that this end received a status error.
class BadStatus < StandardError
attr_reader :code, :details
# BadStatus is an exception class that indicates that an error occurred at
# either end of a GRPC connection. When raised, it indicates that a status
# error should be returned to the other end of a GRPC connection; when
# caught it means that this end received a status error.
class BadStatus < StandardError
attr_reader :code, :details
# @param code [Numeric] the status code
# @param details [String] the details of the exception
def initialize(code, details = 'unknown cause')
super("#{code}:#{details}")
@code = code
@details = details
end
# @param code [Numeric] the status code
# @param details [String] the details of the exception
def initialize(code, details = 'unknown cause')
super("#{code}:#{details}")
@code = code
@details = details
end
# Converts the exception to a GRPC::Status for use in the networking
# wrapper layer.
#
# @return [Status] with the same code and details
def to_status
Status.new(code, details)
end
# Converts the exception to a GRPC::Status for use in the networking
# wrapper layer.
#
# @return [Status] with the same code and details
def to_status
Status.new(code, details)
end
end
end

@ -36,502 +36,500 @@ def assert_event_type(ev, want)
fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want
end
module Google
# Google::RPC contains the General RPC module.
module RPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
class ActiveCall
include Core::CompletionType
include Core::StatusCodes
include Core::TimeConsts
attr_reader(:deadline)
# client_invoke begins a client invocation.
#
# Flow Control note: this blocks until flow control accepts that client
# request can go ahead.
#
# deadline is the absolute deadline for the call.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param call [Call] a call on which to start and invocation
# @param q [CompletionQueue] the completion queue
# @param deadline [Fixnum,TimeSpec] the deadline
def self.client_invoke(call, q, _deadline, **kw)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
call.add_metadata(kw) if kw.length > 0
client_metadata_read = Object.new
finished_tag = Object.new
call.invoke(q, client_metadata_read, finished_tag)
[finished_tag, client_metadata_read]
end
# Creates an ActiveCall.
#
# ActiveCall should only be created after a call is accepted. That
# means different things on a client and a server. On the client, the
# call is accepted after calling call.invoke. On the server, this is
# after call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
# the ActiveCall methods are called.
#
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
# @param q [CompletionQueue] the completion queue used to accept
# the call
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag,
# if the call has begun
# @param read_metadata_tag [Object] the object used as the call's finish
# tag, if the call has begun
# @param started [true|false] indicates if the call has begun
def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
read_metadata_tag: nil, started: true)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@call = call
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@read_metadata_tag = read_metadata_tag
@marshal = marshal
@started = started
@unmarshal = unmarshal
# GRPC contains the General RPC module.
module GRPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
class ActiveCall
include Core::CompletionType
include Core::StatusCodes
include Core::TimeConsts
attr_reader(:deadline)
# client_invoke begins a client invocation.
#
# Flow Control note: this blocks until flow control accepts that client
# request can go ahead.
#
# deadline is the absolute deadline for the call.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param call [Call] a call on which to start and invocation
# @param q [CompletionQueue] the completion queue
# @param deadline [Fixnum,TimeSpec] the deadline
def self.client_invoke(call, q, _deadline, **kw)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
call.add_metadata(kw) if kw.length > 0
client_metadata_read = Object.new
finished_tag = Object.new
call.invoke(q, client_metadata_read, finished_tag)
[finished_tag, client_metadata_read]
end
# Obtains the status of the call.
#
# this value is nil until the call completes
# @return this call's status
def status
@call.status
# Creates an ActiveCall.
#
# ActiveCall should only be created after a call is accepted. That
# means different things on a client and a server. On the client, the
# call is accepted after calling call.invoke. On the server, this is
# after call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
# the ActiveCall methods are called.
#
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
# @param q [CompletionQueue] the completion queue used to accept
# the call
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag,
# if the call has begun
# @param read_metadata_tag [Object] the object used as the call's finish
# tag, if the call has begun
# @param started [true|false] indicates if the call has begun
def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
read_metadata_tag: nil, started: true)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@call = call
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@read_metadata_tag = read_metadata_tag
@marshal = marshal
@started = started
@unmarshal = unmarshal
end
# Obtains the metadata of the call.
#
# At the start of the call this will be nil. During the call this gets
# some values as soon as the other end of the connection acknowledges the
# request.
#
# @return this calls's metadata
def metadata
@call.metadata
end
# Obtains the status of the call.
#
# this value is nil until the call completes
# @return this call's status
def status
@call.status
end
# Cancels the call.
#
# Cancels the call. The call does not return any result, but once this it
# has been called, the call should eventually terminate. Due to potential
# races between the execution of the cancel and the in-flight request, the
# result of the call after calling #cancel is indeterminate:
#
# - the call may terminate with a BadStatus exception, with code=CANCELLED
# - the call may terminate with OK Status, and return a response
# - the call may terminate with a different BadStatus exception if that
# was happening
def cancel
@call.cancel
end
# Obtains the metadata of the call.
#
# At the start of the call this will be nil. During the call this gets
# some values as soon as the other end of the connection acknowledges the
# request.
#
# @return this calls's metadata
def metadata
@call.metadata
end
# indicates if the call is shutdown
def shutdown
@shutdown ||= false
end
# Cancels the call.
#
# Cancels the call. The call does not return any result, but once this it
# has been called, the call should eventually terminate. Due to potential
# races between the execution of the cancel and the in-flight request, the
# result of the call after calling #cancel is indeterminate:
#
# - the call may terminate with a BadStatus exception, with code=CANCELLED
# - the call may terminate with OK Status, and return a response
# - the call may terminate with a different BadStatus exception if that
# was happening
def cancel
@call.cancel
end
# indicates if the call is cancelled.
def cancelled
@cancelled ||= false
end
# indicates if the call is shutdown
def shutdown
@shutdown ||= false
end
# multi_req_view provides a restricted view of this ActiveCall for use
# in a server client-streaming handler.
def multi_req_view
MultiReqView.new(self)
end
# indicates if the call is cancelled.
def cancelled
@cancelled ||= false
end
# single_req_view provides a restricted view of this ActiveCall for use in
# a server request-response handler.
def single_req_view
SingleReqView.new(self)
end
# multi_req_view provides a restricted view of this ActiveCall for use
# in a server client-streaming handler.
def multi_req_view
MultiReqView.new(self)
end
# operation provides a restricted view of this ActiveCall for use as
# a Operation.
def operation
Operation.new(self)
end
# single_req_view provides a restricted view of this ActiveCall for use in
# a server request-response handler.
def single_req_view
SingleReqView.new(self)
end
# writes_done indicates that all writes are completed.
#
# It blocks until the remote endpoint acknowledges by sending a FINISHED
# event, unless assert_finished is set to false. Any calls to
# #remote_send after this call will fail.
#
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def writes_done(assert_finished = true)
@call.writes_done(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
logger.debug("Writes done: waiting for finish? #{assert_finished}")
ensure
ev.close
end
# operation provides a restricted view of this ActiveCall for use as
# a Operation.
def operation
Operation.new(self)
end
return unless assert_finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
fail 'unexpected nil event' if ev.nil?
# writes_done indicates that all writes are completed.
#
# It blocks until the remote endpoint acknowledges by sending a FINISHED
# event, unless assert_finished is set to false. Any calls to
# #remote_send after this call will fail.
#
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def writes_done(assert_finished = true)
@call.writes_done(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
logger.debug("Writes done: waiting for finish? #{assert_finished}")
ensure
ev.close
@call.status
end
# finished waits until the call is completed.
#
# It blocks until the remote endpoint acknowledges by sending a FINISHED
# event.
def finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
begin
fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
if @call.metadata.nil?
@call.metadata = ev.result.metadata
else
@call.metadata.merge!(ev.result.metadata)
end
if ev.result.code != Core::StatusCodes::OK
fail BadStatus.new(ev.result.code, ev.result.details)
end
res = ev.result
ensure
ev.close
end
res
end
return unless assert_finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
fail 'unexpected nil event' if ev.nil?
ev.close
@call.status
end
# remote_send sends a request to the remote endpoint.
#
# It blocks until the remote endpoint acknowledges by sending a
# WRITE_ACCEPTED. req can be marshalled already.
#
# @param req [Object, String] the object to send or it's marshal form.
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
assert_queue_is_ready
logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
if marshalled
payload = req
# finished waits until the call is completed.
#
# It blocks until the remote endpoint acknowledges by sending a FINISHED
# event.
def finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
begin
fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
if @call.metadata.nil?
@call.metadata = ev.result.metadata
else
payload = @marshal.call(req)
end
@call.start_write(Core::ByteBuffer.new(payload), self)
# call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
# until the flow control allows another send on this call.
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, WRITE_ACCEPTED)
ensure
ev.close
@call.metadata.merge!(ev.result.metadata)
end
end
# send_status sends a status to the remote endpoint
#
# @param code [int] the status code to send
# @param details [String] details
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def send_status(code = OK, details = '', assert_finished = false)
assert_queue_is_ready
@call.start_write_status(code, details, self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
ensure
ev.close
if ev.result.code != Core::StatusCodes::OK
fail BadStatus.new(ev.result.code, ev.result.details)
end
logger.debug("Status sent: #{code}:'#{details}'")
return finished if assert_finished
nil
res = ev.result
ensure
ev.close
end
res
end
# remote_read reads a response from the remote endpoint.
#
# It blocks until the remote endpoint sends a READ or FINISHED event. On
# a READ, it returns the response after unmarshalling it. On
# FINISHED, it returns nil if the status is OK, otherwise raising
# BadStatus
def remote_read
if @call.metadata.nil? && !@read_metadata_tag.nil?
ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
assert_event_type(ev, CLIENT_METADATA_READ)
@call.metadata = ev.result
@read_metadata_tag = nil
end
# remote_send sends a request to the remote endpoint.
#
# It blocks until the remote endpoint acknowledges by sending a
# WRITE_ACCEPTED. req can be marshalled already.
#
# @param req [Object, String] the object to send or it's marshal form.
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
assert_queue_is_ready
logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
if marshalled
payload = req
else
payload = @marshal.call(req)
end
@call.start_write(Core::ByteBuffer.new(payload), self)
# call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
# until the flow control allows another send on this call.
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, WRITE_ACCEPTED)
ensure
ev.close
end
end
@call.start_read(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, READ)
logger.debug("received req: #{ev.result.inspect}")
unless ev.result.nil?
logger.debug("received req.to_s: #{ev.result}")
res = @unmarshal.call(ev.result.to_s)
logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
end
ensure
ev.close
end
logger.debug('found nil; the final response has been sent')
nil
# send_status sends a status to the remote endpoint
#
# @param code [int] the status code to send
# @param details [String] details
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def send_status(code = OK, details = '', assert_finished = false)
assert_queue_is_ready
@call.start_write_status(code, details, self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
ensure
ev.close
end
logger.debug("Status sent: #{code}:'#{details}'")
return finished if assert_finished
nil
end
# each_remote_read passes each response to the given block or returns an
# enumerator the responses if no block is given.
#
# == Enumerator ==
#
# * #next blocks until the remote endpoint sends a READ or FINISHED
# * for each read, enumerator#next yields the response
# * on status
# * if it's is OK, enumerator#next raises StopException
# * if is not OK, enumerator#next raises RuntimeException
#
# == Block ==
#
# * if provided it is executed for each response
# * the call blocks until no more responses are provided
#
# @return [Enumerator] if no block was given
def each_remote_read
return enum_for(:each_remote_read) unless block_given?
loop do
resp = remote_read
break if resp.is_a? Struct::Status # is an OK status
break if resp.nil? # the last response was received
yield resp
end
# remote_read reads a response from the remote endpoint.
#
# It blocks until the remote endpoint sends a READ or FINISHED event. On
# a READ, it returns the response after unmarshalling it. On
# FINISHED, it returns nil if the status is OK, otherwise raising
# BadStatus
def remote_read
if @call.metadata.nil? && !@read_metadata_tag.nil?
ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
assert_event_type(ev, CLIENT_METADATA_READ)
@call.metadata = ev.result
@read_metadata_tag = nil
end
# each_remote_read_then_finish passes each response to the given block or
# returns an enumerator of the responses if no block is given.
#
# It is like each_remote_read, but it blocks on finishing on detecting
# the final message.
#
# == Enumerator ==
#
# * #next blocks until the remote endpoint sends a READ or FINISHED
# * for each read, enumerator#next yields the response
# * on status
# * if it's is OK, enumerator#next raises StopException
# * if is not OK, enumerator#next raises RuntimeException
#
# == Block ==
#
# * if provided it is executed for each response
# * the call blocks until no more responses are provided
#
# @return [Enumerator] if no block was given
def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) unless block_given?
loop do
resp = remote_read
break if resp.is_a? Struct::Status # is an OK status
if resp.nil? # the last response was received, but not finished yet
finished
break
end
yield resp
@call.start_read(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, READ)
logger.debug("received req: #{ev.result.inspect}")
unless ev.result.nil?
logger.debug("received req.to_s: #{ev.result}")
res = @unmarshal.call(ev.result.to_s)
logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
end
ensure
ev.close
end
logger.debug('found nil; the final response has been sent')
nil
end
# request_response sends a request to a GRPC server, and returns the
# response.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param req [Object] the request sent to the server
# @return [Object] the response received from the server
def request_response(req, **kw)
start_call(**kw) unless @started
remote_send(req)
writes_done(false)
response = remote_read
finished unless response.is_a? Struct::Status
response
# each_remote_read passes each response to the given block or returns an
# enumerator the responses if no block is given.
#
# == Enumerator ==
#
# * #next blocks until the remote endpoint sends a READ or FINISHED
# * for each read, enumerator#next yields the response
# * on status
# * if it's is OK, enumerator#next raises StopException
# * if is not OK, enumerator#next raises RuntimeException
#
# == Block ==
#
# * if provided it is executed for each response
# * the call blocks until no more responses are provided
#
# @return [Enumerator] if no block was given
def each_remote_read
return enum_for(:each_remote_read) unless block_given?
loop do
resp = remote_read
break if resp.is_a? Struct::Status # is an OK status
break if resp.nil? # the last response was received
yield resp
end
end
# client_streamer sends a stream of requests to a GRPC server, and
# returns a single response.
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an Enumerable
# that allows dynamic construction of the marshallable objects.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @return [Object] the response received from the server
def client_streamer(requests, **kw)
start_call(**kw) unless @started
requests.each { |r| remote_send(r) }
writes_done(false)
response = remote_read
finished unless response.is_a? Struct::Status
response
# each_remote_read_then_finish passes each response to the given block or
# returns an enumerator of the responses if no block is given.
#
# It is like each_remote_read, but it blocks on finishing on detecting
# the final message.
#
# == Enumerator ==
#
# * #next blocks until the remote endpoint sends a READ or FINISHED
# * for each read, enumerator#next yields the response
# * on status
# * if it's is OK, enumerator#next raises StopException
# * if is not OK, enumerator#next raises RuntimeException
#
# == Block ==
#
# * if provided it is executed for each response
# * the call blocks until no more responses are provided
#
# @return [Enumerator] if no block was given
def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) unless block_given?
loop do
resp = remote_read
break if resp.is_a? Struct::Status # is an OK status
if resp.nil? # the last response was received, but not finished yet
finished
break
end
yield resp
end
end
# server_streamer sends one request to the GRPC server, which yields a
# stream of responses.
#
# responses provides an enumerator over the streamed responses, i.e. it
# follows Ruby's #each iteration protocol. The enumerator blocks while
# waiting for each response, stops when the server signals that no
# further responses will be supplied. If the implicit block is provided,
# it is executed with each response as the argument and no result is
# returned.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
# any keyword arguments are treated as metadata to be sent to the server.
#
# @param req [Object] the request sent to the server
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, **kw)
start_call(**kw) unless @started
remote_send(req)
writes_done(false)
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
end
# request_response sends a request to a GRPC server, and returns the
# response.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param req [Object] the request sent to the server
# @return [Object] the response received from the server
def request_response(req, **kw)
start_call(**kw) unless @started
remote_send(req)
writes_done(false)
response = remote_read
finished unless response.is_a? Struct::Status
response
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
# a stream of responses.
#
# This method takes an Enumerable of requests, and returns and enumerable
# of responses.
#
# == requests ==
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an
# Enumerable that allows dynamic construction of the marshallable
# objects.
#
# == responses ==
#
# This is an enumerator of responses. I.e, its #next method blocks
# waiting for the next response. Also, if at any point the block needs
# to consume all the remaining responses, this can be done using #each or
# #collect. Calling #each or #collect should only be done if
# the_call#writes_done has been called, otherwise the block will loop
# forever.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd.run_on_client(requests, &blk)
end
# client_streamer sends a stream of requests to a GRPC server, and
# returns a single response.
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an Enumerable
# that allows dynamic construction of the marshallable objects.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @return [Object] the response received from the server
def client_streamer(requests, **kw)
start_call(**kw) unless @started
requests.each { |r| remote_send(r) }
writes_done(false)
response = remote_read
finished unless response.is_a? Struct::Status
response
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
#
# N.B. gen_each_reply is a func(Enumerable<Requests>)
#
# It takes an enumerable of requests as an arg, in case there is a
# relationship between the stream of requests and the stream of replies.
#
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies
def run_server_bidi(gen_each_reply)
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd.run_on_server(gen_each_reply)
end
# server_streamer sends one request to the GRPC server, which yields a
# stream of responses.
#
# responses provides an enumerator over the streamed responses, i.e. it
# follows Ruby's #each iteration protocol. The enumerator blocks while
# waiting for each response, stops when the server signals that no
# further responses will be supplied. If the implicit block is provided,
# it is executed with each response as the argument and no result is
# returned.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
# any keyword arguments are treated as metadata to be sent to the server.
#
# @param req [Object] the request sent to the server
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, **kw)
start_call(**kw) unless @started
remote_send(req)
writes_done(false)
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
# a stream of responses.
#
# This method takes an Enumerable of requests, and returns and enumerable
# of responses.
#
# == requests ==
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an
# Enumerable that allows dynamic construction of the marshallable
# objects.
#
# == responses ==
#
# This is an enumerator of responses. I.e, its #next method blocks
# waiting for the next response. Also, if at any point the block needs
# to consume all the remaining responses, this can be done using #each or
# #collect. Calling #each or #collect should only be done if
# the_call#writes_done has been called, otherwise the block will loop
# forever.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param requests [Object] an Enumerable of requests to send
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd.run_on_client(requests, &blk)
end
private
# run_server_bidi orchestrates a BiDi stream processing on a server.
#
# N.B. gen_each_reply is a func(Enumerable<Requests>)
#
# It takes an enumerable of requests as an arg, in case there is a
# relationship between the stream of requests and the stream of replies.
#
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies
def run_server_bidi(gen_each_reply)
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd.run_on_server(gen_each_reply)
end
def start_call(**kw)
tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
@finished_tag, @read_metadata_tag = tags
@started = true
end
private
def self.view_class(*visible_methods)
Class.new do
extend ::Forwardable
def_delegators :@wrapped, *visible_methods
def start_call(**kw)
tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
@finished_tag, @read_metadata_tag = tags
@started = true
end
# @param wrapped [ActiveCall] the call whose methods are shielded
def initialize(wrapped)
@wrapped = wrapped
end
def self.view_class(*visible_methods)
Class.new do
extend ::Forwardable
def_delegators :@wrapped, *visible_methods
# @param wrapped [ActiveCall] the call whose methods are shielded
def initialize(wrapped)
@wrapped = wrapped
end
end
end
# SingleReqView limits access to an ActiveCall's methods for use in server
# handlers that receive just one request.
SingleReqView = view_class(:cancelled, :deadline)
# MultiReqView limits access to an ActiveCall's methods for use in
# server client_streamer handlers.
MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
:each_remote_read)
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
:metadata, :status)
# confirms that no events are enqueued, and that the queue is not
# shutdown.
def assert_queue_is_ready
ev = nil
begin
ev = @cq.pluck(self, ZERO)
fail "unexpected event #{ev.inspect}" unless ev.nil?
rescue OutOfTime
logging.debug('timed out waiting for next event')
# expected, nothing should be on the queue and the deadline was ZERO,
# except things using another tag
ensure
ev.close unless ev.nil?
end
# SingleReqView limits access to an ActiveCall's methods for use in server
# handlers that receive just one request.
SingleReqView = view_class(:cancelled, :deadline)
# MultiReqView limits access to an ActiveCall's methods for use in
# server client_streamer handlers.
MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
:each_remote_read)
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
:metadata, :status)
# confirms that no events are enqueued, and that the queue is not
# shutdown.
def assert_queue_is_ready
ev = nil
begin
ev = @cq.pluck(self, ZERO)
fail "unexpected event #{ev.inspect}" unless ev.nil?
rescue OutOfTime
logging.debug('timed out waiting for next event')
# expected, nothing should be on the queue and the deadline was ZERO,
# except things using another tag
ensure
ev.close unless ev.nil?
end
end
end

@ -36,186 +36,184 @@ def assert_event_type(ev, want)
fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want
end
module Google
# Google::RPC contains the General RPC module.
module RPC
# The BiDiCall class orchestrates exection of a BiDi stream on a client or
# server.
class BidiCall
include Core::CompletionType
include Core::StatusCodes
include Core::TimeConsts
# GRPC contains the General RPC module.
module GRPC
# The BiDiCall class orchestrates exection of a BiDi stream on a client or
# server.
class BidiCall
include Core::CompletionType
include Core::StatusCodes
include Core::TimeConsts
# Creates a BidiCall.
#
# BidiCall should only be created after a call is accepted. That means
# different things on a client and a server. On the client, the call is
# accepted after call.invoke. On the server, this is after call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
# the BidiCall#run is called.
#
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
# @param q [CompletionQueue] the completion queue used to accept
# the call
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag,
def initialize(call, q, marshal, unmarshal, deadline, finished_tag)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@call = call
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@marshal = marshal
@readq = Queue.new
@unmarshal = unmarshal
# Creates a BidiCall.
#
# BidiCall should only be created after a call is accepted. That means
# different things on a client and a server. On the client, the call is
# accepted after call.invoke. On the server, this is after call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
# the BidiCall#run is called.
#
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
# @param q [CompletionQueue] the completion queue used to accept
# the call
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag,
def initialize(call, q, marshal, unmarshal, deadline, finished_tag)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@call = call
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@marshal = marshal
@readq = Queue.new
@unmarshal = unmarshal
end
# Begins orchestration of the Bidi stream for a client sending requests.
#
# The method either returns an Enumerator of the responses, or accepts a
# block that can be invoked with each response.
#
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client(requests, &blk)
enq_th = start_write_loop(requests)
loop_th = start_read_loop
replies = each_queued_msg
return replies if blk.nil?
replies.each { |r| blk.call(r) }
enq_th.join
loop_th.join
end
# Begins orchestration of the Bidi stream for a client sending requests.
#
# The method either returns an Enumerator of the responses, or accepts a
# block that can be invoked with each response.
#
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client(requests, &blk)
enq_th = start_write_loop(requests)
loop_th = start_read_loop
replies = each_queued_msg
return replies if blk.nil?
replies.each { |r| blk.call(r) }
enq_th.join
loop_th.join
end
# Begins orchestration of the Bidi stream for a server generating replies.
#
# N.B. gen_each_reply is a func(Enumerable<Requests>)
#
# It takes an enumerable of requests as an arg, in case there is a
# relationship between the stream of requests and the stream of replies.
#
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
enq_th = start_write_loop(replys, is_client: false)
loop_th = start_read_loop
loop_th.join
enq_th.join
end
# Begins orchestration of the Bidi stream for a server generating replies.
#
# N.B. gen_each_reply is a func(Enumerable<Requests>)
#
# It takes an enumerable of requests as an arg, in case there is a
# relationship between the stream of requests and the stream of replies.
#
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
enq_th = start_write_loop(replys, is_client: false)
loop_th = start_read_loop
loop_th.join
enq_th.join
end
private
private
END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes
END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes
# each_queued_msg yields each message on this instances readq
#
# - messages are added to the readq by #read_loop
# - iteration ends when the instance itself is added
def each_queued_msg
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
# each_queued_msg yields each message on this instances readq
#
# - messages are added to the readq by #read_loop
# - iteration ends when the instance itself is added
def each_queued_msg
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
end
# during bidi-streaming, read the requests to send from a separate thread
# read so that read_loop does not block waiting for requests to read.
def start_write_loop(requests, is_client: true)
Thread.new do # TODO: run on a thread pool
write_tag = Object.new
begin
count = 0
requests.each do |req|
count += 1
payload = @marshal.call(req)
@call.start_write(Core::ByteBuffer.new(payload), write_tag)
ev = @cq.pluck(write_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, WRITE_ACCEPTED)
ensure
ev.close
end
# during bidi-streaming, read the requests to send from a separate thread
# read so that read_loop does not block waiting for requests to read.
def start_write_loop(requests, is_client: true)
Thread.new do # TODO: run on a thread pool
write_tag = Object.new
begin
count = 0
requests.each do |req|
count += 1
payload = @marshal.call(req)
@call.start_write(Core::ByteBuffer.new(payload), write_tag)
ev = @cq.pluck(write_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, WRITE_ACCEPTED)
ensure
ev.close
end
if is_client
@call.writes_done(write_tag)
ev = @cq.pluck(write_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
ensure
ev.close
end
logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISHED)
ensure
ev.close
end
logger.debug('bidi-client: finished received')
end
if is_client
@call.writes_done(write_tag)
ev = @cq.pluck(write_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
ensure
ev.close
end
rescue StandardError => e
logger.warn('bidi: write_loop failed')
logger.warn(e)
logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISHED)
ensure
ev.close
end
logger.debug('bidi-client: finished received')
end
rescue StandardError => e
logger.warn('bidi: write_loop failed')
logger.warn(e)
end
end
end
# starts the read loop
def start_read_loop
Thread.new do
begin
read_tag = Object.new
count = 0
# queue the initial read before beginning the loop
loop do
logger.debug("waiting for read #{count}")
count += 1
@call.start_read(read_tag)
ev = @cq.pluck(read_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, READ)
# starts the read loop
def start_read_loop
Thread.new do
begin
read_tag = Object.new
count = 0
# handle the next event.
if ev.result.nil?
@readq.push(END_OF_READS)
logger.debug('done reading!')
break
end
# queue the initial read before beginning the loop
loop do
logger.debug("waiting for read #{count}")
count += 1
@call.start_read(read_tag)
ev = @cq.pluck(read_tag, INFINITE_FUTURE)
begin
assert_event_type(ev, READ)
# push the latest read onto the queue and continue reading
logger.debug("received req: #{ev.result}")
res = @unmarshal.call(ev.result.to_s)
@readq.push(res)
ensure
ev.close
# handle the next event.
if ev.result.nil?
@readq.push(END_OF_READS)
logger.debug('done reading!')
break
end
end
rescue StandardError => e
logger.warn('bidi: read_loop failed')
logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error
# push the latest read onto the queue and continue reading
logger.debug("received req: #{ev.result}")
res = @unmarshal.call(ev.result.to_s)
@readq.push(res)
ensure
ev.close
end
end
rescue StandardError => e
logger.warn('bidi: read_loop failed')
logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error
end
end
end

@ -30,381 +30,379 @@
require 'grpc/generic/active_call'
require 'xray/thread_dump_signal_handler'
module Google
# Google::RPC contains the General RPC module.
module RPC
# ClientStub represents an endpoint used to send requests to GRPC servers.
class ClientStub
include Core::StatusCodes
# GRPC contains the General RPC module.
module GRPC
# ClientStub represents an endpoint used to send requests to GRPC servers.
class ClientStub
include Core::StatusCodes
# Default deadline is 5 seconds.
DEFAULT_DEADLINE = 5
# Default deadline is 5 seconds.
DEFAULT_DEADLINE = 5
# Creates a new ClientStub.
#
# Minimally, a stub is created with the just the host of the gRPC service
# it wishes to access, e.g.,
#
# my_stub = ClientStub.new(example.host.com:50505)
#
# Any arbitrary keyword arguments are treated as channel arguments used to
# configure the RPC connection to the host.
#
# There are some specific keyword args that are not used to configure the
# channel:
#
# - :channel_override
# when present, this must be a pre-created GRPC::Channel. If it's
# present the host and arbitrary keyword arg areignored, and the RPC
# connection uses this channel.
#
# - :deadline
# when present, this is the default deadline used for calls
#
# - :update_metadata
# when present, this a func that takes a hash and returns a hash
# it can be used to update metadata, i.e, remove, change or update
# amend metadata values.
#
# @param host [String] the host the stub connects to
# @param q [Core::CompletionQueue] used to wait for events
# @param channel_override [Core::Channel] a pre-created channel
# @param deadline [Number] the default deadline to use in requests
# @param creds [Core::Credentials] the channel
# @param update_metadata a func that updates metadata as described above
# @param kw [KeywordArgs]the channel arguments
def initialize(host, q,
channel_override:nil,
deadline: DEFAULT_DEADLINE,
creds: nil,
update_metadata: nil,
**kw)
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@queue = q
# Creates a new ClientStub.
#
# Minimally, a stub is created with the just the host of the gRPC service
# it wishes to access, e.g.,
#
# my_stub = ClientStub.new(example.host.com:50505)
#
# Any arbitrary keyword arguments are treated as channel arguments used to
# configure the RPC connection to the host.
#
# There are some specific keyword args that are not used to configure the
# channel:
#
# - :channel_override
# when present, this must be a pre-created GRPC::Channel. If it's
# present the host and arbitrary keyword arg areignored, and the RPC
# connection uses this channel.
#
# - :deadline
# when present, this is the default deadline used for calls
#
# - :update_metadata
# when present, this a func that takes a hash and returns a hash
# it can be used to update metadata, i.e, remove, change or update
# amend metadata values.
#
# @param host [String] the host the stub connects to
# @param q [Core::CompletionQueue] used to wait for events
# @param channel_override [Core::Channel] a pre-created channel
# @param deadline [Number] the default deadline to use in requests
# @param creds [Core::Credentials] the channel
# @param update_metadata a func that updates metadata as described above
# @param kw [KeywordArgs]the channel arguments
def initialize(host, q,
channel_override:nil,
deadline: DEFAULT_DEADLINE,
creds: nil,
update_metadata: nil,
**kw)
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@queue = q
# set the channel instance
if !channel_override.nil?
ch = channel_override
fail(ArgumentError, 'not a Channel') unless ch.is_a? Core::Channel
# set the channel instance
if !channel_override.nil?
ch = channel_override
fail(ArgumentError, 'not a Channel') unless ch.is_a? Core::Channel
else
if creds.nil?
ch = Core::Channel.new(host, kw)
elsif !creds.is_a?(Core::Credentials)
fail(ArgumentError, 'not a Credentials')
else
if creds.nil?
ch = Core::Channel.new(host, kw)
elsif !creds.is_a?(Core::Credentials)
fail(ArgumentError, 'not a Credentials')
else
ch = Core::Channel.new(host, kw, creds)
end
ch = Core::Channel.new(host, kw, creds)
end
@ch = ch
@update_metadata = nil
unless update_metadata.nil?
unless update_metadata.is_a? Proc
fail(ArgumentError, 'update_metadata is not a Proc')
end
@update_metadata = update_metadata
end
@host = host
@deadline = deadline
end
@ch = ch
# request_response sends a request to a GRPC server, and returns the
# response.
#
# == Flow Control ==
# This is a blocking call.
#
# * it does not return until a response is received.
#
# * the requests is sent only when GRPC core's flow control allows it to
# be sent.
#
# == Errors ==
# An RuntimeError is raised if
#
# * the server responds with a non-OK status
#
# * the deadline is exceeded
#
# == Return Value ==
#
# If return_op is false, the call returns the response
#
# If return_op is true, the call returns an Operation, calling execute
# on the Operation returns the response.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] (optional) the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @return [Object] the response received from the server
def request_response(method, req, marshal, unmarshal, deadline = nil,
return_op: false, **kw)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.request_response(req, **md) unless return_op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #request_response.
op = c.operation
op.define_singleton_method(:execute) do
c.request_response(req, **md)
@update_metadata = nil
unless update_metadata.nil?
unless update_metadata.is_a? Proc
fail(ArgumentError, 'update_metadata is not a Proc')
end
op
@update_metadata = update_metadata
end
# client_streamer sends a stream of requests to a GRPC server, and
# returns a single response.
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an Enumerable
# that allows dynamic construction of the marshallable objects.
#
# == Flow Control ==
# This is a blocking call.
#
# * it does not return until a response is received.
#
# * each requests is sent only when GRPC core's flow control allows it to
# be sent.
#
# == Errors ==
# An RuntimeError is raised if
#
# * the server responds with a non-OK status
#
# * the deadline is exceeded
#
# == Return Value ==
#
# If return_op is false, the call consumes the requests and returns
# the response.
#
# If return_op is true, the call returns the response.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @return [Object|Operation] the response received from the server
def client_streamer(method, requests, marshal, unmarshal, deadline = nil,
return_op: false, **kw)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.client_streamer(requests, **md) unless return_op
@host = host
@deadline = deadline
end
# request_response sends a request to a GRPC server, and returns the
# response.
#
# == Flow Control ==
# This is a blocking call.
#
# * it does not return until a response is received.
#
# * the requests is sent only when GRPC core's flow control allows it to
# be sent.
#
# == Errors ==
# An RuntimeError is raised if
#
# * the server responds with a non-OK status
#
# * the deadline is exceeded
#
# == Return Value ==
#
# If return_op is false, the call returns the response
#
# If return_op is true, the call returns an Operation, calling execute
# on the Operation returns the response.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] (optional) the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @return [Object] the response received from the server
def request_response(method, req, marshal, unmarshal, deadline = nil,
return_op: false, **kw)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.request_response(req, **md) unless return_op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #client_streamer.
op = c.operation
op.define_singleton_method(:execute) do
c.client_streamer(requests, **md)
end
op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #request_response.
op = c.operation
op.define_singleton_method(:execute) do
c.request_response(req, **md)
end
op
end
# server_streamer sends one request to the GRPC server, which yields a
# stream of responses.
#
# responses provides an enumerator over the streamed responses, i.e. it
# follows Ruby's #each iteration protocol. The enumerator blocks while
# waiting for each response, stops when the server signals that no
# further responses will be supplied. If the implicit block is provided,
# it is executed with each response as the argument and no result is
# returned.
#
# == Flow Control ==
# This is a blocking call.
#
# * the request is sent only when GRPC core's flow control allows it to
# be sent.
#
# * the request will not complete until the server sends the final
# response followed by a status message.
#
# == Errors ==
# An RuntimeError is raised if
#
# * the server responds with a non-OK status when any response is
# * retrieved
#
# * the deadline is exceeded
#
# == Return Value ==
#
# if the return_op is false, the return value is an Enumerator of the
# results, unless a block is provided, in which case the block is
# executed with each response.
#
# if return_op is true, the function returns an Operation whose #execute
# method runs server streamer call. Again, Operation#execute either
# calls the given block with each response or returns an Enumerator of the
# responses.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] the max completion time in seconds
# @param return_op [true|false]return an Operation if true
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above
def server_streamer(method, req, marshal, unmarshal, deadline = nil,
return_op: false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.server_streamer(req, **md, &blk) unless return_op
# client_streamer sends a stream of requests to a GRPC server, and
# returns a single response.
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an Enumerable
# that allows dynamic construction of the marshallable objects.
#
# == Flow Control ==
# This is a blocking call.
#
# * it does not return until a response is received.
#
# * each requests is sent only when GRPC core's flow control allows it to
# be sent.
#
# == Errors ==
# An RuntimeError is raised if
#
# * the server responds with a non-OK status
#
# * the deadline is exceeded
#
# == Return Value ==
#
# If return_op is false, the call consumes the requests and returns
# the response.
#
# If return_op is true, the call returns the response.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @return [Object|Operation] the response received from the server
def client_streamer(method, requests, marshal, unmarshal, deadline = nil,
return_op: false, **kw)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.client_streamer(requests, **md) unless return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #server_streamer
op = c.operation
op.define_singleton_method(:execute) do
c.server_streamer(req, **md, &blk)
end
op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #client_streamer.
op = c.operation
op.define_singleton_method(:execute) do
c.client_streamer(requests, **md)
end
op
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
# a stream of responses.
#
# This method takes an Enumerable of requests, and returns and enumerable
# of responses.
#
# == requests ==
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an
# Enumerable that allows dynamic construction of the marshallable
# objects.
#
# == responses ==
#
# This is an enumerator of responses. I.e, its #next method blocks
# waiting for the next response. Also, if at any point the block needs
# to consume all the remaining responses, this can be done using #each or
# #collect. Calling #each or #collect should only be done if
# the_call#writes_done has been called, otherwise the block will loop
# forever.
#
# == Flow Control ==
# This is a blocking call.
#
# * the call completes when the next call to provided block returns
# * [False]
#
# * the execution block parameters are two objects for sending and
# receiving responses, each of which blocks waiting for flow control.
# E.g, calles to bidi_call#remote_send will wait until flow control
# allows another write before returning; and obviously calls to
# responses#next block until the next response is available.
#
# == Termination ==
#
# As well as sending and receiving messages, the block passed to the
# function is also responsible for:
#
# * calling bidi_call#writes_done to indicate no further reqs will be
# sent.
#
# * returning false if once the bidi stream is functionally completed.
#
# Note that response#next will indicate that there are no further
# responses by throwing StopIteration, but can only happen either
# if bidi_call#writes_done is called.
#
# To terminate the RPC correctly the block:
#
# * must call bidi#writes_done and then
#
# * either return false as soon as there is no need for other responses
#
# * loop on responses#next until no further responses are available
#
# == Errors ==
# An RuntimeError is raised if
#
# * the server responds with a non-OK status when any response is
# * retrieved
#
# * the deadline is exceeded
#
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# == Return Value ==
#
# if the return_op is false, the return value is an Enumerator of the
# results, unless a block is provided, in which case the block is
# executed with each response.
#
# if return_op is true, the function returns an Operation whose #execute
# method runs the Bidi call. Again, Operation#execute either calls a
# given block with each response or returns an Enumerator of the
# responses.
#
# @param method [String] the RPC method to call on the GRPC server
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] (optional) the max completion time in seconds
# @param blk [Block] when provided, is executed for each response
# @param return_op [true|false] return an Operation if true
# @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil,
# server_streamer sends one request to the GRPC server, which yields a
# stream of responses.
#
# responses provides an enumerator over the streamed responses, i.e. it
# follows Ruby's #each iteration protocol. The enumerator blocks while
# waiting for each response, stops when the server signals that no
# further responses will be supplied. If the implicit block is provided,
# it is executed with each response as the argument and no result is
# returned.
#
# == Flow Control ==
# This is a blocking call.
#
# * the request is sent only when GRPC core's flow control allows it to
# be sent.
#
# * the request will not complete until the server sends the final
# response followed by a status message.
#
# == Errors ==
# An RuntimeError is raised if
#
# * the server responds with a non-OK status when any response is
# * retrieved
#
# * the deadline is exceeded
#
# == Return Value ==
#
# if the return_op is false, the return value is an Enumerator of the
# results, unless a block is provided, in which case the block is
# executed with each response.
#
# if return_op is true, the function returns an Operation whose #execute
# method runs server streamer call. Again, Operation#execute either
# calls the given block with each response or returns an Enumerator of the
# responses.
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# @param method [String] the RPC method to call on the GRPC server
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] the max completion time in seconds
# @param return_op [true|false]return an Operation if true
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above
def server_streamer(method, req, marshal, unmarshal, deadline = nil,
return_op: false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.bidi_streamer(requests, **md, &blk) unless return_op
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.server_streamer(req, **md, &blk) unless return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #bidi_streamer
op = c.operation
op.define_singleton_method(:execute) do
c.bidi_streamer(requests, **md, &blk)
end
op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #server_streamer
op = c.operation
op.define_singleton_method(:execute) do
c.server_streamer(req, **md, &blk)
end
op
end
private
# bidi_streamer sends a stream of requests to the GRPC server, and yields
# a stream of responses.
#
# This method takes an Enumerable of requests, and returns and enumerable
# of responses.
#
# == requests ==
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an
# Enumerable that allows dynamic construction of the marshallable
# objects.
#
# == responses ==
#
# This is an enumerator of responses. I.e, its #next method blocks
# waiting for the next response. Also, if at any point the block needs
# to consume all the remaining responses, this can be done using #each or
# #collect. Calling #each or #collect should only be done if
# the_call#writes_done has been called, otherwise the block will loop
# forever.
#
# == Flow Control ==
# This is a blocking call.
#
# * the call completes when the next call to provided block returns
# * [False]
#
# * the execution block parameters are two objects for sending and
# receiving responses, each of which blocks waiting for flow control.
# E.g, calles to bidi_call#remote_send will wait until flow control
# allows another write before returning; and obviously calls to
# responses#next block until the next response is available.
#
# == Termination ==
#
# As well as sending and receiving messages, the block passed to the
# function is also responsible for:
#
# * calling bidi_call#writes_done to indicate no further reqs will be
# sent.
#
# * returning false if once the bidi stream is functionally completed.
#
# Note that response#next will indicate that there are no further
# responses by throwing StopIteration, but can only happen either
# if bidi_call#writes_done is called.
#
# To terminate the RPC correctly the block:
#
# * must call bidi#writes_done and then
#
# * either return false as soon as there is no need for other responses
#
# * loop on responses#next until no further responses are available
#
# == Errors ==
# An RuntimeError is raised if
#
# * the server responds with a non-OK status when any response is
# * retrieved
#
# * the deadline is exceeded
#
#
# == Keyword Args ==
#
# Unspecified keyword arguments are treated as metadata to be sent to the
# server.
#
# == Return Value ==
#
# if the return_op is false, the return value is an Enumerator of the
# results, unless a block is provided, in which case the block is
# executed with each response.
#
# if return_op is true, the function returns an Operation whose #execute
# method runs the Bidi call. Again, Operation#execute either calls a
# given block with each response or returns an Enumerator of the
# responses.
#
# @param method [String] the RPC method to call on the GRPC server
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Numeric] (optional) the max completion time in seconds
# @param blk [Block] when provided, is executed for each response
# @param return_op [true|false] return an Operation if true
# @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil,
return_op: false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
return c.bidi_streamer(requests, **md, &blk) unless return_op
# Creates a new active stub
#
# @param ch [GRPC::Channel] the channel used to create the stub.
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [TimeConst]
def new_active_call(ch, marshal, unmarshal, deadline = nil)
absolute_deadline = Core::TimeConsts.from_relative_time(deadline)
call = @ch.create_call(ch, @host, absolute_deadline)
ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
started: false)
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #bidi_streamer
op = c.operation
op.define_singleton_method(:execute) do
c.bidi_streamer(requests, **md, &blk)
end
op
end
private
# Creates a new active stub
#
# @param ch [GRPC::Channel] the channel used to create the stub.
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [TimeConst]
def new_active_call(ch, marshal, unmarshal, deadline = nil)
absolute_deadline = Core::TimeConsts.from_relative_time(deadline)
call = @ch.create_call(ch, @host, absolute_deadline)
ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
started: false)
end
end
end

@ -29,123 +29,122 @@
require 'grpc/grpc'
module Google
module RPC
# RpcDesc is a Descriptor of an RPC method.
class RpcDesc < Struct.new(:name, :input, :output, :marshal_method,
:unmarshal_method)
include Core::StatusCodes
# GRPC contains the General RPC module.
module GRPC
# RpcDesc is a Descriptor of an RPC method.
class RpcDesc < Struct.new(:name, :input, :output, :marshal_method,
:unmarshal_method)
include Core::StatusCodes
# Used to wrap a message class to indicate that it needs to be streamed.
class Stream
attr_accessor :type
# Used to wrap a message class to indicate that it needs to be streamed.
class Stream
attr_accessor :type
def initialize(type)
@type = type
end
def initialize(type)
@type = type
end
end
# @return [Proc] { |instance| marshalled(instance) }
def marshal_proc
proc { |o| o.class.method(marshal_method).call(o).to_s }
end
# @return [Proc] { |instance| marshalled(instance) }
def marshal_proc
proc { |o| o.class.method(marshal_method).call(o).to_s }
end
# @param [:input, :output] target determines whether to produce the an
# unmarshal Proc for the rpc input parameter or
# its output parameter
#
# @return [Proc] An unmarshal proc { |marshalled(instance)| instance }
def unmarshal_proc(target)
fail ArgumentError unless [:input, :output].include?(target)
unmarshal_class = method(target).call
unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream
proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
end
# @param [:input, :output] target determines whether to produce the an
# unmarshal Proc for the rpc input parameter or
# its output parameter
#
# @return [Proc] An unmarshal proc { |marshalled(instance)| instance }
def unmarshal_proc(target)
fail ArgumentError unless [:input, :output].include?(target)
unmarshal_class = method(target).call
unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream
proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
end
def run_server_method(active_call, mth)
# While a server method is running, it might be cancelled, its deadline
# might be reached, the handler could throw an unknown error, or a
# well-behaved handler could throw a StatusError.
if request_response?
req = active_call.remote_read
resp = mth.call(req, active_call.single_req_view)
active_call.remote_send(resp)
elsif client_streamer?
resp = mth.call(active_call.multi_req_view)
active_call.remote_send(resp)
elsif server_streamer?
req = active_call.remote_read
replys = mth.call(req, active_call.single_req_view)
replys.each { |r| active_call.remote_send(r) }
else # is a bidi_stream
active_call.run_server_bidi(mth)
end
send_status(active_call, OK, 'OK')
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application
# error code and detail message.
logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
send_status(active_call, e.code, e.details)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
rescue OutOfTime
# This is raised when active_call#method.call exceeeds the deadline
# event. Send a status of deadline exceeded
logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
rescue Core::EventError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
rescue StandardError => e
# This will usuaally be an unhandled error in the handling code.
# Send back a UNKNOWN status to the client
logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
logger.warn(e)
send_status(active_call, UNKNOWN, 'no reason given')
def run_server_method(active_call, mth)
# While a server method is running, it might be cancelled, its deadline
# might be reached, the handler could throw an unknown error, or a
# well-behaved handler could throw a StatusError.
if request_response?
req = active_call.remote_read
resp = mth.call(req, active_call.single_req_view)
active_call.remote_send(resp)
elsif client_streamer?
resp = mth.call(active_call.multi_req_view)
active_call.remote_send(resp)
elsif server_streamer?
req = active_call.remote_read
replys = mth.call(req, active_call.single_req_view)
replys.each { |r| active_call.remote_send(r) }
else # is a bidi_stream
active_call.run_server_bidi(mth)
end
send_status(active_call, OK, 'OK')
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application
# error code and detail message.
logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
send_status(active_call, e.code, e.details)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
rescue OutOfTime
# This is raised when active_call#method.call exceeeds the deadline
# event. Send a status of deadline exceeded
logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
rescue Core::EventError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
rescue StandardError => e
# This will usuaally be an unhandled error in the handling code.
# Send back a UNKNOWN status to the client
logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
logger.warn(e)
send_status(active_call, UNKNOWN, 'no reason given')
end
def assert_arity_matches(mth)
if request_response? || server_streamer?
if mth.arity != 2
fail arity_error(mth, 2, "should be #{mth.name}(req, call)")
end
else
if mth.arity != 1
fail arity_error(mth, 1, "should be #{mth.name}(call)")
end
def assert_arity_matches(mth)
if request_response? || server_streamer?
if mth.arity != 2
fail arity_error(mth, 2, "should be #{mth.name}(req, call)")
end
else
if mth.arity != 1
fail arity_error(mth, 1, "should be #{mth.name}(call)")
end
end
end
def request_response?
!input.is_a?(Stream) && !output.is_a?(Stream)
end
def request_response?
!input.is_a?(Stream) && !output.is_a?(Stream)
end
def client_streamer?
input.is_a?(Stream) && !output.is_a?(Stream)
end
def client_streamer?
input.is_a?(Stream) && !output.is_a?(Stream)
end
def server_streamer?
!input.is_a?(Stream) && output.is_a?(Stream)
end
def server_streamer?
!input.is_a?(Stream) && output.is_a?(Stream)
end
def bidi_streamer?
input.is_a?(Stream) && output.is_a?(Stream)
end
def bidi_streamer?
input.is_a?(Stream) && output.is_a?(Stream)
end
def arity_error(mth, want, msg)
"##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}"
end
def arity_error(mth, want, msg)
"##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}"
end
def send_status(active_client, code, details)
details = 'Not sure why' if details.nil?
active_client.send_status(code, details)
rescue StandardError => e
logger.warn("Could not send status #{code}:#{details}")
logger.warn(e)
end
def send_status(active_client, code, details)
details = 'Not sure why' if details.nil?
active_client.send_status(code, details)
rescue StandardError => e
logger.warn("Could not send status #{code}:#{details}")
logger.warn(e)
end
end
end

@ -33,372 +33,370 @@ require 'grpc/generic/service'
require 'thread'
require 'xray/thread_dump_signal_handler'
module Google
# Google::RPC contains the General RPC module.
module RPC
# RpcServer hosts a number of services and makes them available on the
# network.
class RpcServer
include Core::CompletionType
include Core::TimeConsts
extend ::Forwardable
def_delegators :@server, :add_http2_port
# Default thread pool size is 3
DEFAULT_POOL_SIZE = 3
# Default max_waiting_requests size is 20
DEFAULT_MAX_WAITING_REQUESTS = 20
# Creates a new RpcServer.
#
# The RPC server is configured using keyword arguments.
#
# There are some specific keyword args used to configure the RpcServer
# instance, however other arbitrary are allowed and when present are used
# to configure the listeninng connection set up by the RpcServer.
#
# * server_override: which if passed must be a [GRPC::Core::Server]. When
# present.
#
# * poll_period: when present, the server polls for new events with this
# period
#
# * pool_size: the size of the thread pool the server uses to run its
# threads
#
# * completion_queue_override: when supplied, this will be used as the
# completion_queue that the server uses to receive network events,
# otherwise its creates a new instance itself
#
# * creds: [GRPC::Core::ServerCredentials]
# the credentials used to secure the server
#
# * max_waiting_requests: the maximum number of requests that are not
# being handled to allow. When this limit is exceeded, the server responds
# with not available to new requests
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:INFINITE_FUTURE,
completion_queue_override:nil,
creds:nil,
server_override:nil,
**kw)
if completion_queue_override.nil?
cq = Core::CompletionQueue.new
else
cq = completion_queue_override
unless cq.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
# GRPC contains the General RPC module.
module GRPC
# RpcServer hosts a number of services and makes them available on the
# network.
class RpcServer
include Core::CompletionType
include Core::TimeConsts
extend ::Forwardable
def_delegators :@server, :add_http2_port
# Default thread pool size is 3
DEFAULT_POOL_SIZE = 3
# Default max_waiting_requests size is 20
DEFAULT_MAX_WAITING_REQUESTS = 20
# Creates a new RpcServer.
#
# The RPC server is configured using keyword arguments.
#
# There are some specific keyword args used to configure the RpcServer
# instance, however other arbitrary are allowed and when present are used
# to configure the listeninng connection set up by the RpcServer.
#
# * server_override: which if passed must be a [GRPC::Core::Server]. When
# present.
#
# * poll_period: when present, the server polls for new events with this
# period
#
# * pool_size: the size of the thread pool the server uses to run its
# threads
#
# * completion_queue_override: when supplied, this will be used as the
# completion_queue that the server uses to receive network events,
# otherwise its creates a new instance itself
#
# * creds: [GRPC::Core::ServerCredentials]
# the credentials used to secure the server
#
# * max_waiting_requests: the maximum number of requests that are not
# being handled to allow. When this limit is exceeded, the server responds
# with not available to new requests
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:INFINITE_FUTURE,
completion_queue_override:nil,
creds:nil,
server_override:nil,
**kw)
if completion_queue_override.nil?
cq = Core::CompletionQueue.new
else
cq = completion_queue_override
unless cq.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
@cq = cq
end
@cq = cq
if server_override.nil?
if creds.nil?
srv = Core::Server.new(@cq, kw)
elsif !creds.is_a? Core::ServerCredentials
fail(ArgumentError, 'not a ServerCredentials')
else
srv = Core::Server.new(@cq, kw, creds)
end
if server_override.nil?
if creds.nil?
srv = Core::Server.new(@cq, kw)
elsif !creds.is_a? Core::ServerCredentials
fail(ArgumentError, 'not a ServerCredentials')
else
srv = server_override
fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server
srv = Core::Server.new(@cq, kw, creds)
end
@server = srv
@pool_size = pool_size
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@run_mutex = Mutex.new
@run_cond = ConditionVariable.new
@pool = Pool.new(@pool_size)
else
srv = server_override
fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server
end
@server = srv
@pool_size = pool_size
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@run_mutex = Mutex.new
@run_cond = ConditionVariable.new
@pool = Pool.new(@pool_size)
end
# stops a running server
#
# the call has no impact if the server is already stopped, otherwise
# server's current call loop is it's last.
def stop
return unless @running
@stopped = true
@pool.stop
end
# stops a running server
#
# the call has no impact if the server is already stopped, otherwise
# server's current call loop is it's last.
def stop
return unless @running
@stopped = true
@pool.stop
end
# determines if the server is currently running
def running?
@running ||= false
end
# determines if the server is currently running
def running?
@running ||= false
end
# Is called from other threads to wait for #run to start up the server.
#
# If run has not been called, this returns immediately.
#
# @param timeout [Numeric] number of seconds to wait
# @result [true, false] true if the server is running, false otherwise
def wait_till_running(timeout = 0.1)
end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100
while Time.now < end_time
@run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running?
sleep(sleep_period)
end
running?
# Is called from other threads to wait for #run to start up the server.
#
# If run has not been called, this returns immediately.
#
# @param timeout [Numeric] number of seconds to wait
# @result [true, false] true if the server is running, false otherwise
def wait_till_running(timeout = 0.1)
end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100
while Time.now < end_time
@run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running?
sleep(sleep_period)
end
running?
end
# determines if the server is currently stopped
def stopped?
@stopped ||= false
end
# determines if the server is currently stopped
def stopped?
@stopped ||= false
end
# handle registration of classes
#
# service is either a class that includes GRPC::GenericService and whose
# #new function can be called without argument or any instance of such a
# class.
#
# E.g, after
#
# class Divider
# include GRPC::GenericService
# rpc :div DivArgs, DivReply # single request, single response
# def initialize(optional_arg='default option') # no args
# ...
# end
#
# srv = GRPC::RpcServer.new(...)
#
# # Either of these works
#
# srv.handle(Divider)
#
# # or
#
# srv.handle(Divider.new('replace optional arg'))
#
# It raises RuntimeError:
# - if service is not valid service class or object
# - its handler methods are already registered
# - if the server is already running
#
# @param service [Object|Class] a service class or object as described
# above
def handle(service)
fail 'cannot add services if the server is running' if running?
fail 'cannot add services if the server is stopped' if stopped?
cls = service.is_a?(Class) ? service : service.class
assert_valid_service_class(cls)
add_rpc_descs_for(service)
end
# handle registration of classes
#
# service is either a class that includes GRPC::GenericService and whose
# #new function can be called without argument or any instance of such a
# class.
#
# E.g, after
#
# class Divider
# include GRPC::GenericService
# rpc :div DivArgs, DivReply # single request, single response
# def initialize(optional_arg='default option') # no args
# ...
# end
#
# srv = GRPC::RpcServer.new(...)
#
# # Either of these works
#
# srv.handle(Divider)
#
# # or
#
# srv.handle(Divider.new('replace optional arg'))
#
# It raises RuntimeError:
# - if service is not valid service class or object
# - its handler methods are already registered
# - if the server is already running
#
# @param service [Object|Class] a service class or object as described
# above
def handle(service)
fail 'cannot add services if the server is running' if running?
fail 'cannot add services if the server is stopped' if stopped?
cls = service.is_a?(Class) ? service : service.class
assert_valid_service_class(cls)
add_rpc_descs_for(service)
end
# runs the server
#
# - if no rpc_descs are registered, this exits immediately, otherwise it
# continues running permanently and does not return until program exit.
#
# - #running? returns true after this is called, until #stop cause the
# the server to stop.
def run
if rpc_descs.size == 0
logger.warn('did not run as no services were present')
return
end
@run_mutex.synchronize do
@running = true
@run_cond.signal
# runs the server
#
# - if no rpc_descs are registered, this exits immediately, otherwise it
# continues running permanently and does not return until program exit.
#
# - #running? returns true after this is called, until #stop cause the
# the server to stop.
def run
if rpc_descs.size == 0
logger.warn('did not run as no services were present')
return
end
@run_mutex.synchronize do
@running = true
@run_cond.signal
end
@pool.start
@server.start
server_tag = Object.new
until stopped?
@server.request_call(server_tag)
ev = @cq.pluck(server_tag, @poll_period)
next if ev.nil?
if ev.type != SERVER_RPC_NEW
logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
ev.close
next
end
@pool.start
@server.start
server_tag = Object.new
until stopped?
@server.request_call(server_tag)
ev = @cq.pluck(server_tag, @poll_period)
next if ev.nil?
if ev.type != SERVER_RPC_NEW
logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
ev.close
next
end
c = new_active_server_call(ev.call, ev.result)
unless c.nil?
mth = ev.result.method.to_sym
ev.close
@pool.schedule(c) do |call|
rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end
c = new_active_server_call(ev.call, ev.result)
unless c.nil?
mth = ev.result.method.to_sym
ev.close
@pool.schedule(c) do |call|
rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end
end
@running = false
end
@running = false
end
def new_active_server_call(call, new_server_rpc)
# Accept the call. This is necessary even if a status is to be sent
# back immediately
finished_tag = Object.new
call_queue = Core::CompletionQueue.new
call.metadata = new_server_rpc.metadata # store the metadata
call.server_accept(call_queue, finished_tag)
call.server_end_initial_metadata
# Send UNAVAILABLE if there are too many unprocessed jobs
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
logger.info("waiting: #{jobs_count}, max: #{max}")
if @pool.jobs_waiting > @max_waiting_requests
logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop,
new_server_rpc.deadline,
finished_tag: finished_tag)
c.send_status(StatusCodes::UNAVAILABLE, '')
return nil
end
# Send NOT_FOUND if the method does not exist
mth = new_server_rpc.method.to_sym
unless rpc_descs.key?(mth)
logger.warn("NOT_FOUND: #{new_server_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop,
new_server_rpc.deadline,
finished_tag: finished_tag)
c.send_status(StatusCodes::NOT_FOUND, '')
return nil
end
def new_active_server_call(call, new_server_rpc)
# Accept the call. This is necessary even if a status is to be sent
# back immediately
finished_tag = Object.new
call_queue = Core::CompletionQueue.new
call.metadata = new_server_rpc.metadata # store the metadata
call.server_accept(call_queue, finished_tag)
call.server_end_initial_metadata
# Send UNAVAILABLE if there are too many unprocessed jobs
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
logger.info("waiting: #{jobs_count}, max: #{max}")
if @pool.jobs_waiting > @max_waiting_requests
logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop,
new_server_rpc.deadline,
finished_tag: finished_tag)
c.send_status(StatusCodes::UNAVAILABLE, '')
return nil
end
# Create the ActiveCall
rpc_desc = rpc_descs[mth]
logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})")
ActiveCall.new(call, call_queue,
rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
new_server_rpc.deadline, finished_tag: finished_tag)
# Send NOT_FOUND if the method does not exist
mth = new_server_rpc.method.to_sym
unless rpc_descs.key?(mth)
logger.warn("NOT_FOUND: #{new_server_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(call, call_queue, noop, noop,
new_server_rpc.deadline,
finished_tag: finished_tag)
c.send_status(StatusCodes::NOT_FOUND, '')
return nil
end
# Pool is a simple thread pool for running server requests.
class Pool
def initialize(size)
fail 'pool size must be positive' unless size > 0
@jobs = Queue.new
@size = size
@stopped = false
@stop_mutex = Mutex.new
@stop_cond = ConditionVariable.new
@workers = []
end
# Create the ActiveCall
rpc_desc = rpc_descs[mth]
logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})")
ActiveCall.new(call, call_queue,
rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
new_server_rpc.deadline, finished_tag: finished_tag)
end
# Returns the number of jobs waiting
def jobs_waiting
@jobs.size
end
# Pool is a simple thread pool for running server requests.
class Pool
def initialize(size)
fail 'pool size must be positive' unless size > 0
@jobs = Queue.new
@size = size
@stopped = false
@stop_mutex = Mutex.new
@stop_cond = ConditionVariable.new
@workers = []
end
# Runs the given block on the queue with the provided args.
#
# @param args the args passed blk when it is called
# @param blk the block to call
def schedule(*args, &blk)
fail 'already stopped' if @stopped
return if blk.nil?
logger.info('schedule another job')
@jobs << [blk, args]
end
# Returns the number of jobs waiting
def jobs_waiting
@jobs.size
end
# Runs the given block on the queue with the provided args.
#
# @param args the args passed blk when it is called
# @param blk the block to call
def schedule(*args, &blk)
fail 'already stopped' if @stopped
return if blk.nil?
logger.info('schedule another job')
@jobs << [blk, args]
end
# Starts running the jobs in the thread pool.
def start
fail 'already stopped' if @stopped
until @workers.size == @size.to_i
next_thread = Thread.new do
catch(:exit) do # allows { throw :exit } to kill a thread
loop do
begin
blk, args = @jobs.pop
blk.call(*args)
rescue StandardError => e
logger.warn('Error in worker thread')
logger.warn(e)
end
# Starts running the jobs in the thread pool.
def start
fail 'already stopped' if @stopped
until @workers.size == @size.to_i
next_thread = Thread.new do
catch(:exit) do # allows { throw :exit } to kill a thread
loop do
begin
blk, args = @jobs.pop
blk.call(*args)
rescue StandardError => e
logger.warn('Error in worker thread')
logger.warn(e)
end
end
end
# removes the threads from workers, and signal when all the
# threads are complete.
@stop_mutex.synchronize do
@workers.delete(Thread.current)
@stop_cond.signal if @workers.size == 0
end
# removes the threads from workers, and signal when all the
# threads are complete.
@stop_mutex.synchronize do
@workers.delete(Thread.current)
@stop_cond.signal if @workers.size == 0
end
@workers << next_thread
end
@workers << next_thread
end
end
# Stops the jobs in the pool
def stop
logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
@stopped = true
# Stops the jobs in the pool
def stop
logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
@stopped = true
# TODO: allow configuration of the keepalive period
keep_alive = 5
@stop_mutex.synchronize do
@stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0
end
# TODO: allow configuration of the keepalive period
keep_alive = 5
@stop_mutex.synchronize do
@stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0
end
# Forcibly shutdown any threads that are still alive.
if @workers.size > 0
logger.warn("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
t.exit
rescue StandardError => e
logger.warn('error while terminating a worker')
logger.warn(e)
end
# Forcibly shutdown any threads that are still alive.
if @workers.size > 0
logger.warn("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
t.exit
rescue StandardError => e
logger.warn('error while terminating a worker')
logger.warn(e)
end
end
logger.info('stopped, all workers are shutdown')
end
logger.info('stopped, all workers are shutdown')
end
end
protected
protected
def rpc_descs
@rpc_descs ||= {}
end
def rpc_descs
@rpc_descs ||= {}
end
def rpc_handlers
@rpc_handlers ||= {}
end
def rpc_handlers
@rpc_handlers ||= {}
end
private
private
def assert_valid_service_class(cls)
unless cls.include?(GenericService)
fail "#{cls} should 'include GenericService'"
end
if cls.rpc_descs.size == 0
fail "#{cls} should specify some rpc descriptions"
end
cls.assert_rpc_descs_have_methods
def assert_valid_service_class(cls)
unless cls.include?(GenericService)
fail "#{cls} should 'include GenericService'"
end
if cls.rpc_descs.size == 0
fail "#{cls} should specify some rpc descriptions"
end
cls.assert_rpc_descs_have_methods
end
def add_rpc_descs_for(service)
cls = service.is_a?(Class) ? service : service.class
specs = rpc_descs
handlers = rpc_handlers
cls.rpc_descs.each_pair do |name, spec|
route = "/#{cls.service_name}/#{name}".to_sym
if specs.key? route
fail "Cannot add rpc #{route} from #{spec}, already registered"
def add_rpc_descs_for(service)
cls = service.is_a?(Class) ? service : service.class
specs = rpc_descs
handlers = rpc_handlers
cls.rpc_descs.each_pair do |name, spec|
route = "/#{cls.service_name}/#{name}".to_sym
if specs.key? route
fail "Cannot add rpc #{route} from #{spec}, already registered"
else
specs[route] = spec
if service.is_a?(Class)
handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
else
specs[route] = spec
if service.is_a?(Class)
handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
else
handlers[route] = service.method(name.to_s.underscore.to_sym)
end
logger.info("handling #{route} with #{handlers[route]}")
handlers[route] = service.method(name.to_s.underscore.to_sym)
end
logger.info("handling #{route} with #{handlers[route]}")
end
end
end

@ -48,188 +48,186 @@ class String
end
end
module Google
# Google::RPC contains the General RPC module.
module RPC
# Provides behaviour used to implement schema-derived service classes.
#
# Is intended to be used to support both client and server
# IDL-schema-derived servers.
module GenericService
# Used to indicate that a name has already been specified
class DuplicateRpcName < StandardError
def initialize(name)
super("rpc (#{name}) is already defined")
end
# GRPC contains the General RPC module.
module GRPC
# Provides behaviour used to implement schema-derived service classes.
#
# Is intended to be used to support both client and server
# IDL-schema-derived servers.
module GenericService
# Used to indicate that a name has already been specified
class DuplicateRpcName < StandardError
def initialize(name)
super("rpc (#{name}) is already defined")
end
end
# Provides a simple DSL to describe RPC services.
# Provides a simple DSL to describe RPC services.
#
# E.g, a Maths service that uses the serializable messages DivArgs,
# DivReply and Num might define its endpoint uses the following way:
#
# rpc :div DivArgs, DivReply # single request, single response
# rpc :sum stream(Num), Num # streamed input, single response
# rpc :fib FibArgs, stream(Num) # single request, streamed response
# rpc :div_many stream(DivArgs), stream(DivReply)
# # streamed req and resp
#
# Each 'rpc' adds an RpcDesc to classes including this module, and
# #assert_rpc_descs_have_methods is used to ensure the including class
# provides methods with signatures that support all the descriptors.
module Dsl
# This configures the method names that the serializable message
# implementation uses to marshal and unmarshal messages.
#
# E.g, a Maths service that uses the serializable messages DivArgs,
# DivReply and Num might define its endpoint uses the following way:
# - unmarshal_class method must be a class method on the serializable
# message type that takes a string (byte stream) and produces and object
#
# rpc :div DivArgs, DivReply # single request, single response
# rpc :sum stream(Num), Num # streamed input, single response
# rpc :fib FibArgs, stream(Num) # single request, streamed response
# rpc :div_many stream(DivArgs), stream(DivReply)
# # streamed req and resp
# - marshal_class_method is called on a serializable message instance
# and produces a serialized string.
#
# Each 'rpc' adds an RpcDesc to classes including this module, and
# #assert_rpc_descs_have_methods is used to ensure the including class
# provides methods with signatures that support all the descriptors.
module Dsl
# This configures the method names that the serializable message
# implementation uses to marshal and unmarshal messages.
#
# - unmarshal_class method must be a class method on the serializable
# message type that takes a string (byte stream) and produces and object
#
# - marshal_class_method is called on a serializable message instance
# and produces a serialized string.
#
# The Dsl verifies that the types in the descriptor have both the
# unmarshal and marshal methods.
attr_writer(:marshal_class_method, :unmarshal_class_method)
# This allows configuration of the service name.
attr_accessor(:service_name)
# Adds an RPC spec.
#
# Takes the RPC name and the classes representing the types to be
# serialized, and adds them to the including classes rpc_desc hash.
#
# input and output should both have the methods #marshal and #unmarshal
# that are responsible for writing and reading an object instance from a
# byte buffer respectively.
#
# @param name [String] the name of the rpc
# @param input [Object] the input parameter's class
# @param output [Object] the output parameter's class
def rpc(name, input, output)
fail(DuplicateRpcName, name) if rpc_descs.key? name
assert_can_marshal(input)
assert_can_marshal(output)
rpc_descs[name] = RpcDesc.new(name, input, output,
marshal_class_method,
unmarshal_class_method)
end
# The Dsl verifies that the types in the descriptor have both the
# unmarshal and marshal methods.
attr_writer(:marshal_class_method, :unmarshal_class_method)
def inherited(subclass)
# Each subclass should have a distinct class variable with its own
# rpc_descs
subclass.rpc_descs.merge!(rpc_descs)
subclass.service_name = service_name
end
# This allows configuration of the service name.
attr_accessor(:service_name)
# the name of the instance method used to marshal events to a byte
# stream.
def marshal_class_method
@marshal_class_method ||= :marshal
end
# Adds an RPC spec.
#
# Takes the RPC name and the classes representing the types to be
# serialized, and adds them to the including classes rpc_desc hash.
#
# input and output should both have the methods #marshal and #unmarshal
# that are responsible for writing and reading an object instance from a
# byte buffer respectively.
#
# @param name [String] the name of the rpc
# @param input [Object] the input parameter's class
# @param output [Object] the output parameter's class
def rpc(name, input, output)
fail(DuplicateRpcName, name) if rpc_descs.key? name
assert_can_marshal(input)
assert_can_marshal(output)
rpc_descs[name] = RpcDesc.new(name, input, output,
marshal_class_method,
unmarshal_class_method)
end
# the name of the class method used to unmarshal from a byte stream.
def unmarshal_class_method
@unmarshal_class_method ||= :unmarshal
end
def inherited(subclass)
# Each subclass should have a distinct class variable with its own
# rpc_descs
subclass.rpc_descs.merge!(rpc_descs)
subclass.service_name = service_name
end
def assert_can_marshal(cls)
cls = cls.type if cls.is_a? RpcDesc::Stream
mth = unmarshal_class_method
unless cls.methods.include? mth
fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
end
mth = marshal_class_method
return if cls.methods.include? mth
# the name of the instance method used to marshal events to a byte
# stream.
def marshal_class_method
@marshal_class_method ||= :marshal
end
# the name of the class method used to unmarshal from a byte stream.
def unmarshal_class_method
@unmarshal_class_method ||= :unmarshal
end
def assert_can_marshal(cls)
cls = cls.type if cls.is_a? RpcDesc::Stream
mth = unmarshal_class_method
unless cls.methods.include? mth
fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
end
mth = marshal_class_method
return if cls.methods.include? mth
fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
end
# @param cls [Class] the class of a serializable type
# @return cls wrapped in a RpcDesc::Stream
def stream(cls)
assert_can_marshal(cls)
RpcDesc::Stream.new(cls)
end
# @param cls [Class] the class of a serializable type
# @return cls wrapped in a RpcDesc::Stream
def stream(cls)
assert_can_marshal(cls)
RpcDesc::Stream.new(cls)
end
# the RpcDescs defined for this GenericService, keyed by name.
def rpc_descs
@rpc_descs ||= {}
end
# the RpcDescs defined for this GenericService, keyed by name.
def rpc_descs
@rpc_descs ||= {}
end
# Creates a rpc client class with methods for accessing the methods
# currently in rpc_descs.
def rpc_stub_class
descs = rpc_descs
route_prefix = service_name
Class.new(ClientStub) do
# @param host [String] the host the stub connects to
# @param kw [KeywordArgs] the channel arguments, plus any optional
# args for configuring the client's channel
def initialize(host, **kw)
super(host, Core::CompletionQueue.new, **kw)
end
# Creates a rpc client class with methods for accessing the methods
# currently in rpc_descs.
def rpc_stub_class
descs = rpc_descs
route_prefix = service_name
Class.new(ClientStub) do
# @param host [String] the host the stub connects to
# @param kw [KeywordArgs] the channel arguments, plus any optional
# args for configuring the client's channel
def initialize(host, **kw)
super(host, Core::CompletionQueue.new, **kw)
end
# Used define_method to add a method for each rpc_desc. Each method
# calls the base class method for the given descriptor.
descs.each_pair do |name, desc|
mth_name = name.to_s.underscore.to_sym
marshal = desc.marshal_proc
unmarshal = desc.unmarshal_proc(:output)
route = "/#{route_prefix}/#{name}"
if desc.request_response?
define_method(mth_name) do |req, deadline = nil|
logger.debug("calling #{@host}:#{route}")
request_response(route, req, marshal, unmarshal, deadline)
end
elsif desc.client_streamer?
define_method(mth_name) do |reqs, deadline = nil|
logger.debug("calling #{@host}:#{route}")
client_streamer(route, reqs, marshal, unmarshal, deadline)
end
elsif desc.server_streamer?
define_method(mth_name) do |req, deadline = nil, &blk|
logger.debug("calling #{@host}:#{route}")
server_streamer(route, req, marshal, unmarshal, deadline,
&blk)
end
else # is a bidi_stream
define_method(mth_name) do |reqs, deadline = nil, &blk|
logger.debug("calling #{@host}:#{route}")
bidi_streamer(route, reqs, marshal, unmarshal, deadline, &blk)
end
# Used define_method to add a method for each rpc_desc. Each method
# calls the base class method for the given descriptor.
descs.each_pair do |name, desc|
mth_name = name.to_s.underscore.to_sym
marshal = desc.marshal_proc
unmarshal = desc.unmarshal_proc(:output)
route = "/#{route_prefix}/#{name}"
if desc.request_response?
define_method(mth_name) do |req, deadline = nil|
logger.debug("calling #{@host}:#{route}")
request_response(route, req, marshal, unmarshal, deadline)
end
elsif desc.client_streamer?
define_method(mth_name) do |reqs, deadline = nil|
logger.debug("calling #{@host}:#{route}")
client_streamer(route, reqs, marshal, unmarshal, deadline)
end
elsif desc.server_streamer?
define_method(mth_name) do |req, deadline = nil, &blk|
logger.debug("calling #{@host}:#{route}")
server_streamer(route, req, marshal, unmarshal, deadline,
&blk)
end
else # is a bidi_stream
define_method(mth_name) do |reqs, deadline = nil, &blk|
logger.debug("calling #{@host}:#{route}")
bidi_streamer(route, reqs, marshal, unmarshal, deadline, &blk)
end
end
end
end
end
# Asserts that the appropriate methods are defined for each added rpc
# spec. Is intended to aid verifying that server classes are correctly
# implemented.
def assert_rpc_descs_have_methods
rpc_descs.each_pair do |m, spec|
mth_name = m.to_s.underscore.to_sym
unless instance_methods.include?(mth_name)
fail "#{self} does not provide instance method '#{mth_name}'"
end
spec.assert_arity_matches(instance_method(mth_name))
# Asserts that the appropriate methods are defined for each added rpc
# spec. Is intended to aid verifying that server classes are correctly
# implemented.
def assert_rpc_descs_have_methods
rpc_descs.each_pair do |m, spec|
mth_name = m.to_s.underscore.to_sym
unless instance_methods.include?(mth_name)
fail "#{self} does not provide instance method '#{mth_name}'"
end
spec.assert_arity_matches(instance_method(mth_name))
end
end
end
def self.included(o)
o.extend(Dsl)
# Update to the use the service name including module. Proivde a default
# that can be nil e,g. when modules are declared dynamically.
return unless o.service_name.nil?
if o.name.nil?
o.service_name = 'GenericService'
def self.included(o)
o.extend(Dsl)
# Update to the use the service name including module. Proivde a default
# that can be nil e,g. when modules are declared dynamically.
return unless o.service_name.nil?
if o.name.nil?
o.service_name = 'GenericService'
else
modules = o.name.split('::')
if modules.length > 2
o.service_name = modules[modules.length - 2]
else
modules = o.name.split('::')
if modules.length > 2
o.service_name = modules[modules.length - 2]
else
o.service_name = modules.first
end
o.service_name = modules.first
end
end
end

@ -35,6 +35,6 @@ Logging.logger.root.appenders = Logging.appenders.stdout
Logging.logger.root.level = :info
# TODO: provide command-line configuration for logging
Logging.logger['Google::RPC'].level = :debug
Logging.logger['Google::RPC::ActiveCall'].level = :info
Logging.logger['Google::RPC::BidiCall'].level = :info
Logging.logger['GRPC'].level = :debug
Logging.logger['GRPC::ActiveCall'].level = :info
Logging.logger['GRPC::BidiCall'].level = :info

@ -27,9 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
module Google
# Google::RPC contains the General RPC module.
module RPC
VERSION = '0.0.1'
end
# GRPC contains the General RPC module.
module GRPC
VERSION = '0.0.1'
end

@ -36,9 +36,9 @@ require 'faraday'
require 'grpc/auth/compute_engine'
require 'spec_helper'
describe Google::RPC::Auth::GCECredentials do
describe GRPC::Auth::GCECredentials do
MD_URI = '/computeMetadata/v1/instance/service-accounts/default/token'
GCECredentials = Google::RPC::Auth::GCECredentials
GCECredentials = GRPC::Auth::GCECredentials
before(:example) do
@client = GCECredentials.new

@ -38,7 +38,7 @@ require 'multi_json'
require 'openssl'
require 'spec_helper'
describe Google::RPC::Auth::ServiceAccountCredentials do
describe GRPC::Auth::ServiceAccountCredentials do
before(:example) do
@key = OpenSSL::PKey::RSA.new(2048)
cred_json = {
@ -49,7 +49,7 @@ describe Google::RPC::Auth::ServiceAccountCredentials do
type: 'service_account'
}
cred_json_text = MultiJson.dump(cred_json)
@client = Google::RPC::Auth::ServiceAccountCredentials.new(
@client = GRPC::Auth::ServiceAccountCredentials.new(
'https://www.googleapis.com/auth/userinfo.profile',
StringIO.new(cred_json_text))
end

Loading…
Cancel
Save