diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c index f73b12c417f..8586915026c 100644 --- a/src/ruby/ext/grpc/rb_byte_buffer.c +++ b/src/ruby/ext/grpc/rb_byte_buffer.c @@ -202,9 +202,9 @@ static VALUE grpc_rb_byte_buffer_init(VALUE self, VALUE src) { /* rb_cByteBuffer is the ruby class that proxies grpc_byte_buffer. */ VALUE rb_cByteBuffer = Qnil; -void Init_google_rpc_byte_buffer() { +void Init_grpc_byte_buffer() { rb_cByteBuffer = - rb_define_class_under(rb_mGoogleRpcCore, "ByteBuffer", rb_cObject); + rb_define_class_under(rb_mGrpcCore, "ByteBuffer", rb_cObject); /* Allocates an object managed by the ruby runtime */ rb_define_alloc_func(rb_cByteBuffer, grpc_rb_byte_buffer_alloc); diff --git a/src/ruby/ext/grpc/rb_byte_buffer.h b/src/ruby/ext/grpc/rb_byte_buffer.h index 322c268f377..1340c355e26 100644 --- a/src/ruby/ext/grpc/rb_byte_buffer.h +++ b/src/ruby/ext/grpc/rb_byte_buffer.h @@ -42,7 +42,7 @@ extern VALUE rb_cByteBuffer; /* Initializes the ByteBuffer class. */ -void Init_google_rpc_byte_buffer(); +void Init_grpc_byte_buffer(); /* grpc_rb_byte_buffer_create_with_mark creates a grpc_rb_byte_buffer with a * ruby mark object that will be kept alive while the byte_buffer is alive. */ diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 5d723076682..af5ef25e143 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -449,9 +449,9 @@ VALUE rb_cCall = Qnil; operations; */ VALUE rb_eCallError = Qnil; -void Init_google_rpc_error_codes() { +void Init_grpc_error_codes() { /* Constants representing the error codes of grpc_call_error in grpc.h */ - VALUE rb_RpcErrors = rb_define_module_under(rb_mGoogleRpcCore, "RpcErrors"); + VALUE rb_RpcErrors = rb_define_module_under(rb_mGrpcCore, "RpcErrors"); rb_define_const(rb_RpcErrors, "OK", UINT2NUM(GRPC_CALL_OK)); rb_define_const(rb_RpcErrors, "ERROR", UINT2NUM(GRPC_CALL_ERROR)); rb_define_const(rb_RpcErrors, "NOT_ON_SERVER", @@ -500,11 +500,11 @@ void Init_google_rpc_error_codes() { rb_obj_freeze(rb_error_code_details); } -void Init_google_rpc_call() { +void Init_grpc_call() { /* CallError inherits from Exception to signal that it is non-recoverable */ rb_eCallError = - rb_define_class_under(rb_mGoogleRpcCore, "CallError", rb_eException); - rb_cCall = rb_define_class_under(rb_mGoogleRpcCore, "Call", rb_cObject); + rb_define_class_under(rb_mGrpcCore, "CallError", rb_eException); + rb_cCall = rb_define_class_under(rb_mGrpcCore, "Call", rb_cObject); /* Prevent allocation or inialization of the Call class */ rb_define_alloc_func(rb_cCall, grpc_rb_cannot_alloc); @@ -542,7 +542,7 @@ void Init_google_rpc_call() { hash_all_calls = rb_hash_new(); rb_define_const(rb_cCall, "INTERNAL_ALL_CALLs", hash_all_calls); - Init_google_rpc_error_codes(); + Init_grpc_error_codes(); } /* Gets the call from the ruby object */ diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h index 965e9eef409..3dbe0b33284 100644 --- a/src/ruby/ext/grpc/rb_call.h +++ b/src/ruby/ext/grpc/rb_call.h @@ -54,6 +54,6 @@ extern VALUE rb_cCall; extern VALUE rb_eCallError; /* Initializes the Call class. */ -void Init_google_rpc_call(); +void Init_grpc_call(); #endif /* GRPC_RB_CALL_H_ */ diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 7c98e66c33d..e63656425d9 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -227,9 +227,9 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { /* rb_cChannel is the ruby class that proxies grpc_channel. */ VALUE rb_cChannel = Qnil; -void Init_google_rpc_channel() { +void Init_grpc_channel() { rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject); - rb_cChannel = rb_define_class_under(rb_mGoogleRpcCore, "Channel", rb_cObject); + rb_cChannel = rb_define_class_under(rb_mGrpcCore, "Channel", rb_cObject); /* Allocates an object managed by the ruby runtime */ rb_define_alloc_func(rb_cChannel, grpc_rb_channel_alloc); diff --git a/src/ruby/ext/grpc/rb_channel.h b/src/ruby/ext/grpc/rb_channel.h index 6c1210e812d..93155455dcb 100644 --- a/src/ruby/ext/grpc/rb_channel.h +++ b/src/ruby/ext/grpc/rb_channel.h @@ -41,7 +41,7 @@ extern VALUE rb_cChannel; /* Initializes the Channel class. */ -void Init_google_rpc_channel(); +void Init_grpc_channel(); /* Gets the wrapped channel from the ruby wrapper */ grpc_channel* grpc_rb_get_wrapped_channel(VALUE v); diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index 47776a991a1..e406336d17e 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -159,9 +159,9 @@ static VALUE grpc_rb_completion_queue_pluck(VALUE self, VALUE tag, /* rb_cCompletionQueue is the ruby class that proxies grpc_completion_queue. */ VALUE rb_cCompletionQueue = Qnil; -void Init_google_rpc_completion_queue() { +void Init_grpc_completion_queue() { rb_cCompletionQueue = - rb_define_class_under(rb_mGoogleRpcCore, "CompletionQueue", rb_cObject); + rb_define_class_under(rb_mGrpcCore, "CompletionQueue", rb_cObject); /* constructor: uses an alloc func without an initializer. Using a simple alloc func works here as the grpc header does not specify any args for diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h index c563662c2d4..a8155b5f13a 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.h +++ b/src/ruby/ext/grpc/rb_completion_queue.h @@ -45,6 +45,6 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v); extern VALUE rb_cCompletionQueue; /* Initializes the CompletionQueue class. */ -void Init_google_rpc_completion_queue(); +void Init_grpc_completion_queue(); #endif /* GRPC_RB_COMPLETION_QUEUE_H_ */ diff --git a/src/ruby/ext/grpc/rb_credentials.c b/src/ruby/ext/grpc/rb_credentials.c index 87a5d0a299c..44b2d03e6fd 100644 --- a/src/ruby/ext/grpc/rb_credentials.c +++ b/src/ruby/ext/grpc/rb_credentials.c @@ -245,9 +245,9 @@ static VALUE grpc_rb_credentials_init(int argc, VALUE *argv, VALUE self) { /* rb_cCredentials is the ruby class that proxies grpc_credentials. */ VALUE rb_cCredentials = Qnil; -void Init_google_rpc_credentials() { +void Init_grpc_credentials() { rb_cCredentials = - rb_define_class_under(rb_mGoogleRpcCore, "Credentials", rb_cObject); + rb_define_class_under(rb_mGrpcCore, "Credentials", rb_cObject); /* Allocates an object managed by the ruby runtime */ rb_define_alloc_func(rb_cCredentials, grpc_rb_credentials_alloc); diff --git a/src/ruby/ext/grpc/rb_credentials.h b/src/ruby/ext/grpc/rb_credentials.h index fada3639d58..efa30a2a565 100644 --- a/src/ruby/ext/grpc/rb_credentials.h +++ b/src/ruby/ext/grpc/rb_credentials.h @@ -42,7 +42,7 @@ extern VALUE rb_cCredentials; /* Initializes the ruby Credentials class. */ -void Init_google_rpc_credentials(); +void Init_grpc_credentials(); /* Gets the wrapped credentials from the ruby wrapper */ grpc_credentials* grpc_rb_get_wrapped_credentials(VALUE v); diff --git a/src/ruby/ext/grpc/rb_event.c b/src/ruby/ext/grpc/rb_event.c index 72c9dd2ec00..38a90c13e79 100644 --- a/src/ruby/ext/grpc/rb_event.c +++ b/src/ruby/ext/grpc/rb_event.c @@ -312,10 +312,10 @@ VALUE rb_cEvent = Qnil; rpc event processing. */ VALUE rb_eEventError = Qnil; -void Init_google_rpc_event() { +void Init_grpc_event() { rb_eEventError = - rb_define_class_under(rb_mGoogleRpcCore, "EventError", rb_eStandardError); - rb_cEvent = rb_define_class_under(rb_mGoogleRpcCore, "Event", rb_cObject); + rb_define_class_under(rb_mGrpcCore, "EventError", rb_eStandardError); + rb_cEvent = rb_define_class_under(rb_mGrpcCore, "Event", rb_cObject); /* Prevent allocation or inialization from ruby. */ rb_define_alloc_func(rb_cEvent, grpc_rb_cannot_alloc); @@ -332,7 +332,7 @@ void Init_google_rpc_event() { /* Constants representing the completion types */ rb_mCompletionType = - rb_define_module_under(rb_mGoogleRpcCore, "CompletionType"); + rb_define_module_under(rb_mGrpcCore, "CompletionType"); rb_define_const(rb_mCompletionType, "QUEUE_SHUTDOWN", INT2NUM(GRPC_QUEUE_SHUTDOWN)); rb_define_const(rb_mCompletionType, "OP_COMPLETE", INT2NUM(GRPC_OP_COMPLETE)); diff --git a/src/ruby/ext/grpc/rb_event.h b/src/ruby/ext/grpc/rb_event.h index a406e9e9f17..591a3528786 100644 --- a/src/ruby/ext/grpc/rb_event.h +++ b/src/ruby/ext/grpc/rb_event.h @@ -48,6 +48,6 @@ extern VALUE rb_eEventError; VALUE grpc_rb_new_event(grpc_event *ev); /* Initializes the Event and EventError classes. */ -void Init_google_rpc_event(); +void Init_grpc_event(); #endif /* GRPC_RB_EVENT_H_ */ diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index 8feefb047cc..c8e2bb5c4ae 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -153,10 +153,10 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval) { return t; } -void Init_google_status_codes() { +void Init_grpc_status_codes() { /* Constants representing the status codes or grpc_status_code in status.h */ VALUE rb_mStatusCodes = - rb_define_module_under(rb_mGoogleRpcCore, "StatusCodes"); + rb_define_module_under(rb_mGrpcCore, "StatusCodes"); rb_define_const(rb_mStatusCodes, "OK", INT2NUM(GRPC_STATUS_OK)); rb_define_const(rb_mStatusCodes, "CANCELLED", INT2NUM(GRPC_STATUS_CANCELLED)); rb_define_const(rb_mStatusCodes, "UNKNOWN", INT2NUM(GRPC_STATUS_UNKNOWN)); @@ -214,11 +214,11 @@ VALUE grpc_rb_time_val_to_s(VALUE self) { } /* Adds a module with constants that map to gpr's static timeval structs. */ -void Init_google_time_consts() { +void Init_grpc_time_consts() { VALUE rb_mTimeConsts = - rb_define_module_under(rb_mGoogleRpcCore, "TimeConsts"); + rb_define_module_under(rb_mGrpcCore, "TimeConsts"); rb_cTimeVal = - rb_define_class_under(rb_mGoogleRpcCore, "TimeSpec", rb_cObject); + rb_define_class_under(rb_mGrpcCore, "TimeSpec", rb_cObject); rb_define_const(rb_mTimeConsts, "ZERO", Data_Wrap_Struct(rb_cTimeVal, GC_NOT_MARKED, GC_DONT_FREE, (void *)&gpr_time_0)); @@ -240,37 +240,35 @@ void Init_google_time_consts() { void grpc_rb_shutdown(void *vm) { grpc_shutdown(); } -/* Initialize the Google RPC module structs */ +/* Initialize the GRPC module structs */ /* rb_sNewServerRpc is the struct that holds new server rpc details. */ VALUE rb_sNewServerRpc = Qnil; /* rb_sStatus is the struct that holds status details. */ VALUE rb_sStatus = Qnil; -/* Initialize the Google RPC module. */ -VALUE rb_mGoogle = Qnil; -VALUE rb_mGoogleRPC = Qnil; -VALUE rb_mGoogleRpcCore = Qnil; +/* Initialize the GRPC module. */ +VALUE rb_mGRPC = Qnil; +VALUE rb_mGrpcCore = Qnil; void Init_grpc() { grpc_init(); ruby_vm_at_exit(grpc_rb_shutdown); - rb_mGoogle = rb_define_module("Google"); - rb_mGoogleRPC = rb_define_module_under(rb_mGoogle, "RPC"); - rb_mGoogleRpcCore = rb_define_module_under(rb_mGoogleRPC, "Core"); + rb_mGRPC = rb_define_module("GRPC"); + rb_mGrpcCore = rb_define_module_under(rb_mGRPC, "Core"); rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host", "deadline", "metadata", NULL); rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL); - Init_google_rpc_byte_buffer(); - Init_google_rpc_event(); - Init_google_rpc_channel(); - Init_google_rpc_completion_queue(); - Init_google_rpc_call(); - Init_google_rpc_credentials(); - Init_google_rpc_metadata(); - Init_google_rpc_server(); - Init_google_rpc_server_credentials(); - Init_google_status_codes(); - Init_google_time_consts(); + Init_grpc_byte_buffer(); + Init_grpc_event(); + Init_grpc_channel(); + Init_grpc_completion_queue(); + Init_grpc_call(); + Init_grpc_credentials(); + Init_grpc_metadata(); + Init_grpc_server(); + Init_grpc_server_credentials(); + Init_grpc_status_codes(); + Init_grpc_time_consts(); } diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h index d5e8930fca6..664b94fd261 100644 --- a/src/ruby/ext/grpc/rb_grpc.h +++ b/src/ruby/ext/grpc/rb_grpc.h @@ -38,11 +38,8 @@ #include #include -/* rb_mGoogle is the top-level Google module. */ -extern VALUE rb_mGoogle; - -/* rb_mGoogleRpcCore is the module containing the ruby wrapper GRPC classes. */ -extern VALUE rb_mGoogleRpcCore; +/* rb_mGrpcCore is the module containing the ruby wrapper GRPC classes. */ +extern VALUE rb_mGrpcCore; /* Class used to wrap timeval structs. */ extern VALUE rb_cTimeVal; diff --git a/src/ruby/ext/grpc/rb_metadata.c b/src/ruby/ext/grpc/rb_metadata.c index 88eb62ab738..55a429c9595 100644 --- a/src/ruby/ext/grpc/rb_metadata.c +++ b/src/ruby/ext/grpc/rb_metadata.c @@ -187,9 +187,9 @@ static VALUE grpc_rb_metadata_value(VALUE self) { /* rb_cMetadata is the Metadata class whose instances proxy grpc_metadata. */ VALUE rb_cMetadata = Qnil; -void Init_google_rpc_metadata() { +void Init_grpc_metadata() { rb_cMetadata = - rb_define_class_under(rb_mGoogleRpcCore, "Metadata", rb_cObject); + rb_define_class_under(rb_mGrpcCore, "Metadata", rb_cObject); /* Allocates an object managed by the ruby runtime */ rb_define_alloc_func(rb_cMetadata, grpc_rb_metadata_alloc); diff --git a/src/ruby/ext/grpc/rb_metadata.h b/src/ruby/ext/grpc/rb_metadata.h index 329ef15c68a..8e452b64051 100644 --- a/src/ruby/ext/grpc/rb_metadata.h +++ b/src/ruby/ext/grpc/rb_metadata.h @@ -48,6 +48,6 @@ extern VALUE grpc_rb_metadata_create_with_mark(VALUE mark, grpc_metadata* md); grpc_metadata* grpc_rb_get_wrapped_metadata(VALUE v); /* Initializes the Metadata class. */ -void Init_google_rpc_metadata(); +void Init_grpc_metadata(); #endif /* GRPC_RB_METADATA_H_ */ diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index e68843e12b1..b2877e98e4e 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -251,8 +251,8 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) { return INT2NUM(recvd_port); } -void Init_google_rpc_server() { - rb_cServer = rb_define_class_under(rb_mGoogleRpcCore, "Server", rb_cObject); +void Init_grpc_server() { + rb_cServer = rb_define_class_under(rb_mGrpcCore, "Server", rb_cObject); /* Allocates an object managed by the ruby runtime */ rb_define_alloc_func(rb_cServer, grpc_rb_server_alloc); diff --git a/src/ruby/ext/grpc/rb_server.h b/src/ruby/ext/grpc/rb_server.h index 92047efd187..d9c6362f8a4 100644 --- a/src/ruby/ext/grpc/rb_server.h +++ b/src/ruby/ext/grpc/rb_server.h @@ -42,7 +42,7 @@ extern VALUE rb_cServer; /* Initializes the Server class. */ -void Init_google_rpc_server(); +void Init_grpc_server(); /* Gets the wrapped server from the ruby wrapper */ grpc_server* grpc_rb_get_wrapped_server(VALUE v); diff --git a/src/ruby/ext/grpc/rb_server_credentials.c b/src/ruby/ext/grpc/rb_server_credentials.c index 4f6c67ea5e3..f825297225f 100644 --- a/src/ruby/ext/grpc/rb_server_credentials.c +++ b/src/ruby/ext/grpc/rb_server_credentials.c @@ -184,9 +184,9 @@ static VALUE grpc_rb_server_credentials_init(VALUE self, VALUE pem_root_certs, grpc_server_credentials. */ VALUE rb_cServerCredentials = Qnil; -void Init_google_rpc_server_credentials() { +void Init_grpc_server_credentials() { rb_cServerCredentials = - rb_define_class_under(rb_mGoogleRpcCore, "ServerCredentials", rb_cObject); + rb_define_class_under(rb_mGrpcCore, "ServerCredentials", rb_cObject); /* Allocates an object managed by the ruby runtime */ rb_define_alloc_func(rb_cServerCredentials, grpc_rb_server_credentials_alloc); diff --git a/src/ruby/ext/grpc/rb_server_credentials.h b/src/ruby/ext/grpc/rb_server_credentials.h index 2a2e1fbc822..2be627727ab 100644 --- a/src/ruby/ext/grpc/rb_server_credentials.h +++ b/src/ruby/ext/grpc/rb_server_credentials.h @@ -42,7 +42,7 @@ extern VALUE rb_cServerCredentials; /* Initializes the ruby ServerCredentials class. */ -void Init_google_rpc_server_credentials(); +void Init_grpc_server_credentials(); /* Gets the wrapped server_credentials from the ruby wrapper */ grpc_server_credentials* grpc_rb_get_wrapped_server_credentials(VALUE v); diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index 758ac0c2d16..3176a158452 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -39,6 +39,3 @@ require 'grpc/generic/active_call' require 'grpc/generic/client_stub' require 'grpc/generic/service' require 'grpc/generic/rpc_server' - -# alias GRPC -GRPC = Google::RPC diff --git a/src/ruby/lib/grpc/auth/compute_engine.rb b/src/ruby/lib/grpc/auth/compute_engine.rb index 9004bef46e5..5cb1e1a4dcf 100644 --- a/src/ruby/lib/grpc/auth/compute_engine.rb +++ b/src/ruby/lib/grpc/auth/compute_engine.rb @@ -30,39 +30,37 @@ require 'faraday' require 'grpc/auth/signet' -module Google - module RPC - # Module Auth provides classes that provide Google-specific authentication - # used to access Google gRPC services. - module Auth - # Extends Signet::OAuth2::Client so that the auth token is obtained from - # the GCE metadata server. - class GCECredentials < Signet::OAuth2::Client - COMPUTE_AUTH_TOKEN_URI = 'http://metadata/computeMetadata/v1/'\ - 'instance/service-accounts/default/token' - COMPUTE_CHECK_URI = 'http://metadata.google.internal' +module GRPC + # Module Auth provides classes that provide Google-specific authentication + # used to access Google gRPC services. + module Auth + # Extends Signet::OAuth2::Client so that the auth token is obtained from + # the GCE metadata server. + class GCECredentials < Signet::OAuth2::Client + COMPUTE_AUTH_TOKEN_URI = 'http://metadata/computeMetadata/v1/'\ + 'instance/service-accounts/default/token' + COMPUTE_CHECK_URI = 'http://metadata.google.internal' - # Detect if this appear to be a GCE instance, by checking if metadata - # is available - def self.on_gce?(options = {}) - c = options[:connection] || Faraday.default_connection - resp = c.get(COMPUTE_CHECK_URI) - return false unless resp.status == 200 - return false unless resp.headers.key?('Metadata-Flavor') - return resp.headers['Metadata-Flavor'] == 'Google' - rescue Faraday::ConnectionFailed - return false - end + # Detect if this appear to be a GCE instance, by checking if metadata + # is available + def self.on_gce?(options = {}) + c = options[:connection] || Faraday.default_connection + resp = c.get(COMPUTE_CHECK_URI) + return false unless resp.status == 200 + return false unless resp.headers.key?('Metadata-Flavor') + return resp.headers['Metadata-Flavor'] == 'Google' + rescue Faraday::ConnectionFailed + return false + end - # Overrides the super class method to change how access tokens are - # fetched. - def fetch_access_token(options = {}) - c = options[:connection] || Faraday.default_connection - c.headers = { 'Metadata-Flavor' => 'Google' } - resp = c.get(COMPUTE_AUTH_TOKEN_URI) - Signet::OAuth2.parse_credentials(resp.body, - resp.headers['content-type']) - end + # Overrides the super class method to change how access tokens are + # fetched. + def fetch_access_token(options = {}) + c = options[:connection] || Faraday.default_connection + c.headers = { 'Metadata-Flavor' => 'Google' } + resp = c.get(COMPUTE_AUTH_TOKEN_URI) + Signet::OAuth2.parse_credentials(resp.body, + resp.headers['content-type']) end end end diff --git a/src/ruby/lib/grpc/auth/service_account.rb b/src/ruby/lib/grpc/auth/service_account.rb index 35b5cbfe2de..14b81a9e034 100644 --- a/src/ruby/lib/grpc/auth/service_account.rb +++ b/src/ruby/lib/grpc/auth/service_account.rb @@ -39,29 +39,27 @@ def read_json_key(json_key_io) [json_key['private_key'], json_key['client_email']] end -module Google - module RPC - # Module Auth provides classes that provide Google-specific authentication - # used to access Google gRPC services. - module Auth - # Authenticates requests using Google's Service Account credentials. - # (cf https://developers.google.com/accounts/docs/OAuth2ServiceAccount) - class ServiceAccountCredentials < Signet::OAuth2::Client - TOKEN_CRED_URI = 'https://www.googleapis.com/oauth2/v3/token' - AUDIENCE = TOKEN_CRED_URI +module GRPC + # Module Auth provides classes that provide Google-specific authentication + # used to access Google gRPC services. + module Auth + # Authenticates requests using Google's Service Account credentials. + # (cf https://developers.google.com/accounts/docs/OAuth2ServiceAccount) + class ServiceAccountCredentials < Signet::OAuth2::Client + TOKEN_CRED_URI = 'https://www.googleapis.com/oauth2/v3/token' + AUDIENCE = TOKEN_CRED_URI - # Initializes a ServiceAccountCredentials. - # - # @param scope [string|array] the scope(s) to access - # @param json_key_io [IO] an IO from which the JSON key can be read - def initialize(scope, json_key_io) - private_key, client_email = read_json_key(json_key_io) - super(token_credential_uri: TOKEN_CRED_URI, - audience: AUDIENCE, - scope: scope, - issuer: client_email, - signing_key: OpenSSL::PKey::RSA.new(private_key)) - end + # Initializes a ServiceAccountCredentials. + # + # @param scope [string|array] the scope(s) to access + # @param json_key_io [IO] an IO from which the JSON key can be read + def initialize(scope, json_key_io) + private_key, client_email = read_json_key(json_key_io) + super(token_credential_uri: TOKEN_CRED_URI, + audience: AUDIENCE, + scope: scope, + issuer: client_email, + signing_key: OpenSSL::PKey::RSA.new(private_key)) end end end diff --git a/src/ruby/lib/grpc/core/event.rb b/src/ruby/lib/grpc/core/event.rb index 9a333589c21..27df86b4b6d 100644 --- a/src/ruby/lib/grpc/core/event.rb +++ b/src/ruby/lib/grpc/core/event.rb @@ -27,16 +27,17 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -module Google - module RPC - module Core - # Event is a class defined in the c extension - # - # Here, we add an inspect method. - class Event - def inspect - "<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>" - end +require 'grpc' + +# GRPC contains the General RPC module. +module GRPC + module Core + # Event is a class defined in the c extension + # + # Here, we add an inspect method. + class Event + def inspect + "<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>" end end end diff --git a/src/ruby/lib/grpc/core/time_consts.rb b/src/ruby/lib/grpc/core/time_consts.rb index 6876dcb02eb..3f3414cfda7 100644 --- a/src/ruby/lib/grpc/core/time_consts.rb +++ b/src/ruby/lib/grpc/core/time_consts.rb @@ -29,44 +29,43 @@ require 'grpc' -module Google - module RPC - module Core - # TimeConsts is a module from the C extension. +# GRPC contains the General RPC module. +module GRPC + module Core + # TimeConsts is a module from the C extension. + # + # Here it's re-opened to add a utility func. + module TimeConsts + # Converts a time delta to an absolute deadline. # - # Here it's re-opened to add a utility func. - module TimeConsts - # Converts a time delta to an absolute deadline. - # - # Assumes timeish is a relative time, and converts its to an absolute, - # with following exceptions: - # - # * if timish is one of the TimeConsts.TimeSpec constants the value is - # preserved. - # * timish < 0 => TimeConsts.INFINITE_FUTURE - # * timish == 0 => TimeConsts.ZERO - # - # @param timeish [Number|TimeSpec] - # @return timeish [Number|TimeSpec] - def from_relative_time(timeish) - if timeish.is_a? TimeSpec - timeish - elsif timeish.nil? - TimeConsts::ZERO - elsif !timeish.is_a? Numeric - fail(TypeError, - "Cannot make an absolute deadline from #{timeish.inspect}") - elsif timeish < 0 - TimeConsts::INFINITE_FUTURE - elsif timeish == 0 - TimeConsts::ZERO - else - Time.now + timeish - end + # Assumes timeish is a relative time, and converts its to an absolute, + # with following exceptions: + # + # * if timish is one of the TimeConsts.TimeSpec constants the value is + # preserved. + # * timish < 0 => TimeConsts.INFINITE_FUTURE + # * timish == 0 => TimeConsts.ZERO + # + # @param timeish [Number|TimeSpec] + # @return timeish [Number|TimeSpec] + def from_relative_time(timeish) + if timeish.is_a? TimeSpec + timeish + elsif timeish.nil? + TimeConsts::ZERO + elsif !timeish.is_a? Numeric + fail(TypeError, + "Cannot make an absolute deadline from #{timeish.inspect}") + elsif timeish < 0 + TimeConsts::INFINITE_FUTURE + elsif timeish == 0 + TimeConsts::ZERO + else + Time.now + timeish end - - module_function :from_relative_time end + + module_function :from_relative_time end end end diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb index 70a92bfed77..c2356b7004e 100644 --- a/src/ruby/lib/grpc/errors.rb +++ b/src/ruby/lib/grpc/errors.rb @@ -29,35 +29,33 @@ require 'grpc' -module Google - # Google::RPC contains the General RPC module. - module RPC - # OutOfTime is an exception class that indicates that an RPC exceeded its - # deadline. - OutOfTime = Class.new(StandardError) +# GRPC contains the General RPC module. +module GRPC + # OutOfTime is an exception class that indicates that an RPC exceeded its + # deadline. + OutOfTime = Class.new(StandardError) - # BadStatus is an exception class that indicates that an error occurred at - # either end of a GRPC connection. When raised, it indicates that a status - # error should be returned to the other end of a GRPC connection; when - # caught it means that this end received a status error. - class BadStatus < StandardError - attr_reader :code, :details + # BadStatus is an exception class that indicates that an error occurred at + # either end of a GRPC connection. When raised, it indicates that a status + # error should be returned to the other end of a GRPC connection; when + # caught it means that this end received a status error. + class BadStatus < StandardError + attr_reader :code, :details - # @param code [Numeric] the status code - # @param details [String] the details of the exception - def initialize(code, details = 'unknown cause') - super("#{code}:#{details}") - @code = code - @details = details - end + # @param code [Numeric] the status code + # @param details [String] the details of the exception + def initialize(code, details = 'unknown cause') + super("#{code}:#{details}") + @code = code + @details = details + end - # Converts the exception to a GRPC::Status for use in the networking - # wrapper layer. - # - # @return [Status] with the same code and details - def to_status - Status.new(code, details) - end + # Converts the exception to a GRPC::Status for use in the networking + # wrapper layer. + # + # @return [Status] with the same code and details + def to_status + Status.new(code, details) end end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 6c2b6e91c24..5a4f129dce0 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -36,502 +36,500 @@ def assert_event_type(ev, want) fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want end -module Google - # Google::RPC contains the General RPC module. - module RPC - # The ActiveCall class provides simple methods for sending marshallable - # data to a call - class ActiveCall - include Core::CompletionType - include Core::StatusCodes - include Core::TimeConsts - attr_reader(:deadline) - - # client_invoke begins a client invocation. - # - # Flow Control note: this blocks until flow control accepts that client - # request can go ahead. - # - # deadline is the absolute deadline for the call. - # - # == Keyword Arguments == - # any keyword arguments are treated as metadata to be sent to the server - # if a keyword value is a list, multiple metadata for it's key are sent - # - # @param call [Call] a call on which to start and invocation - # @param q [CompletionQueue] the completion queue - # @param deadline [Fixnum,TimeSpec] the deadline - def self.client_invoke(call, q, _deadline, **kw) - fail(ArgumentError, 'not a call') unless call.is_a? Core::Call - unless q.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') - end - call.add_metadata(kw) if kw.length > 0 - client_metadata_read = Object.new - finished_tag = Object.new - call.invoke(q, client_metadata_read, finished_tag) - [finished_tag, client_metadata_read] - end - - # Creates an ActiveCall. - # - # ActiveCall should only be created after a call is accepted. That - # means different things on a client and a server. On the client, the - # call is accepted after calling call.invoke. On the server, this is - # after call.accept. - # - # #initialize cannot determine if the call is accepted or not; so if a - # call that's not accepted is used here, the error won't be visible until - # the ActiveCall methods are called. - # - # deadline is the absolute deadline for the call. - # - # @param call [Call] the call used by the ActiveCall - # @param q [CompletionQueue] the completion queue used to accept - # the call - # @param marshal [Function] f(obj)->string that marshal requests - # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Fixnum] the deadline for the call to complete - # @param finished_tag [Object] the object used as the call's finish tag, - # if the call has begun - # @param read_metadata_tag [Object] the object used as the call's finish - # tag, if the call has begun - # @param started [true|false] indicates if the call has begun - def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil, - read_metadata_tag: nil, started: true) - fail(ArgumentError, 'not a call') unless call.is_a? Core::Call - unless q.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') - end - @call = call - @cq = q - @deadline = deadline - @finished_tag = finished_tag - @read_metadata_tag = read_metadata_tag - @marshal = marshal - @started = started - @unmarshal = unmarshal +# GRPC contains the General RPC module. +module GRPC + # The ActiveCall class provides simple methods for sending marshallable + # data to a call + class ActiveCall + include Core::CompletionType + include Core::StatusCodes + include Core::TimeConsts + attr_reader(:deadline) + + # client_invoke begins a client invocation. + # + # Flow Control note: this blocks until flow control accepts that client + # request can go ahead. + # + # deadline is the absolute deadline for the call. + # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + # + # @param call [Call] a call on which to start and invocation + # @param q [CompletionQueue] the completion queue + # @param deadline [Fixnum,TimeSpec] the deadline + def self.client_invoke(call, q, _deadline, **kw) + fail(ArgumentError, 'not a call') unless call.is_a? Core::Call + unless q.is_a? Core::CompletionQueue + fail(ArgumentError, 'not a CompletionQueue') end + call.add_metadata(kw) if kw.length > 0 + client_metadata_read = Object.new + finished_tag = Object.new + call.invoke(q, client_metadata_read, finished_tag) + [finished_tag, client_metadata_read] + end - # Obtains the status of the call. - # - # this value is nil until the call completes - # @return this call's status - def status - @call.status + # Creates an ActiveCall. + # + # ActiveCall should only be created after a call is accepted. That + # means different things on a client and a server. On the client, the + # call is accepted after calling call.invoke. On the server, this is + # after call.accept. + # + # #initialize cannot determine if the call is accepted or not; so if a + # call that's not accepted is used here, the error won't be visible until + # the ActiveCall methods are called. + # + # deadline is the absolute deadline for the call. + # + # @param call [Call] the call used by the ActiveCall + # @param q [CompletionQueue] the completion queue used to accept + # the call + # @param marshal [Function] f(obj)->string that marshal requests + # @param unmarshal [Function] f(string)->obj that unmarshals responses + # @param deadline [Fixnum] the deadline for the call to complete + # @param finished_tag [Object] the object used as the call's finish tag, + # if the call has begun + # @param read_metadata_tag [Object] the object used as the call's finish + # tag, if the call has begun + # @param started [true|false] indicates if the call has begun + def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil, + read_metadata_tag: nil, started: true) + fail(ArgumentError, 'not a call') unless call.is_a? Core::Call + unless q.is_a? Core::CompletionQueue + fail(ArgumentError, 'not a CompletionQueue') end + @call = call + @cq = q + @deadline = deadline + @finished_tag = finished_tag + @read_metadata_tag = read_metadata_tag + @marshal = marshal + @started = started + @unmarshal = unmarshal + end - # Obtains the metadata of the call. - # - # At the start of the call this will be nil. During the call this gets - # some values as soon as the other end of the connection acknowledges the - # request. - # - # @return this calls's metadata - def metadata - @call.metadata - end + # Obtains the status of the call. + # + # this value is nil until the call completes + # @return this call's status + def status + @call.status + end - # Cancels the call. - # - # Cancels the call. The call does not return any result, but once this it - # has been called, the call should eventually terminate. Due to potential - # races between the execution of the cancel and the in-flight request, the - # result of the call after calling #cancel is indeterminate: - # - # - the call may terminate with a BadStatus exception, with code=CANCELLED - # - the call may terminate with OK Status, and return a response - # - the call may terminate with a different BadStatus exception if that - # was happening - def cancel - @call.cancel - end + # Obtains the metadata of the call. + # + # At the start of the call this will be nil. During the call this gets + # some values as soon as the other end of the connection acknowledges the + # request. + # + # @return this calls's metadata + def metadata + @call.metadata + end - # indicates if the call is shutdown - def shutdown - @shutdown ||= false - end + # Cancels the call. + # + # Cancels the call. The call does not return any result, but once this it + # has been called, the call should eventually terminate. Due to potential + # races between the execution of the cancel and the in-flight request, the + # result of the call after calling #cancel is indeterminate: + # + # - the call may terminate with a BadStatus exception, with code=CANCELLED + # - the call may terminate with OK Status, and return a response + # - the call may terminate with a different BadStatus exception if that + # was happening + def cancel + @call.cancel + end - # indicates if the call is cancelled. - def cancelled - @cancelled ||= false - end + # indicates if the call is shutdown + def shutdown + @shutdown ||= false + end - # multi_req_view provides a restricted view of this ActiveCall for use - # in a server client-streaming handler. - def multi_req_view - MultiReqView.new(self) - end + # indicates if the call is cancelled. + def cancelled + @cancelled ||= false + end - # single_req_view provides a restricted view of this ActiveCall for use in - # a server request-response handler. - def single_req_view - SingleReqView.new(self) - end + # multi_req_view provides a restricted view of this ActiveCall for use + # in a server client-streaming handler. + def multi_req_view + MultiReqView.new(self) + end - # operation provides a restricted view of this ActiveCall for use as - # a Operation. - def operation - Operation.new(self) - end + # single_req_view provides a restricted view of this ActiveCall for use in + # a server request-response handler. + def single_req_view + SingleReqView.new(self) + end - # writes_done indicates that all writes are completed. - # - # It blocks until the remote endpoint acknowledges by sending a FINISHED - # event, unless assert_finished is set to false. Any calls to - # #remote_send after this call will fail. - # - # @param assert_finished [true, false] when true(default), waits for - # FINISHED. - def writes_done(assert_finished = true) - @call.writes_done(self) - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISH_ACCEPTED) - logger.debug("Writes done: waiting for finish? #{assert_finished}") - ensure - ev.close - end + # operation provides a restricted view of this ActiveCall for use as + # a Operation. + def operation + Operation.new(self) + end - return unless assert_finished - ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - fail 'unexpected nil event' if ev.nil? + # writes_done indicates that all writes are completed. + # + # It blocks until the remote endpoint acknowledges by sending a FINISHED + # event, unless assert_finished is set to false. Any calls to + # #remote_send after this call will fail. + # + # @param assert_finished [true, false] when true(default), waits for + # FINISHED. + def writes_done(assert_finished = true) + @call.writes_done(self) + ev = @cq.pluck(self, INFINITE_FUTURE) + begin + assert_event_type(ev, FINISH_ACCEPTED) + logger.debug("Writes done: waiting for finish? #{assert_finished}") + ensure ev.close - @call.status end - # finished waits until the call is completed. - # - # It blocks until the remote endpoint acknowledges by sending a FINISHED - # event. - def finished - ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - begin - fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED - if @call.metadata.nil? - @call.metadata = ev.result.metadata - else - @call.metadata.merge!(ev.result.metadata) - end - - if ev.result.code != Core::StatusCodes::OK - fail BadStatus.new(ev.result.code, ev.result.details) - end - res = ev.result - ensure - ev.close - end - res - end + return unless assert_finished + ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) + fail 'unexpected nil event' if ev.nil? + ev.close + @call.status + end - # remote_send sends a request to the remote endpoint. - # - # It blocks until the remote endpoint acknowledges by sending a - # WRITE_ACCEPTED. req can be marshalled already. - # - # @param req [Object, String] the object to send or it's marshal form. - # @param marshalled [false, true] indicates if the object is already - # marshalled. - def remote_send(req, marshalled = false) - assert_queue_is_ready - logger.debug("sending #{req.inspect}, marshalled? #{marshalled}") - if marshalled - payload = req + # finished waits until the call is completed. + # + # It blocks until the remote endpoint acknowledges by sending a FINISHED + # event. + def finished + ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) + begin + fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED + if @call.metadata.nil? + @call.metadata = ev.result.metadata else - payload = @marshal.call(req) - end - @call.start_write(Core::ByteBuffer.new(payload), self) - - # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return - # until the flow control allows another send on this call. - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, WRITE_ACCEPTED) - ensure - ev.close + @call.metadata.merge!(ev.result.metadata) end - end - # send_status sends a status to the remote endpoint - # - # @param code [int] the status code to send - # @param details [String] details - # @param assert_finished [true, false] when true(default), waits for - # FINISHED. - def send_status(code = OK, details = '', assert_finished = false) - assert_queue_is_ready - @call.start_write_status(code, details, self) - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISH_ACCEPTED) - ensure - ev.close + if ev.result.code != Core::StatusCodes::OK + fail BadStatus.new(ev.result.code, ev.result.details) end - logger.debug("Status sent: #{code}:'#{details}'") - return finished if assert_finished - nil + res = ev.result + ensure + ev.close end + res + end - # remote_read reads a response from the remote endpoint. - # - # It blocks until the remote endpoint sends a READ or FINISHED event. On - # a READ, it returns the response after unmarshalling it. On - # FINISHED, it returns nil if the status is OK, otherwise raising - # BadStatus - def remote_read - if @call.metadata.nil? && !@read_metadata_tag.nil? - ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE) - assert_event_type(ev, CLIENT_METADATA_READ) - @call.metadata = ev.result - @read_metadata_tag = nil - end + # remote_send sends a request to the remote endpoint. + # + # It blocks until the remote endpoint acknowledges by sending a + # WRITE_ACCEPTED. req can be marshalled already. + # + # @param req [Object, String] the object to send or it's marshal form. + # @param marshalled [false, true] indicates if the object is already + # marshalled. + def remote_send(req, marshalled = false) + assert_queue_is_ready + logger.debug("sending #{req.inspect}, marshalled? #{marshalled}") + if marshalled + payload = req + else + payload = @marshal.call(req) + end + @call.start_write(Core::ByteBuffer.new(payload), self) + + # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return + # until the flow control allows another send on this call. + ev = @cq.pluck(self, INFINITE_FUTURE) + begin + assert_event_type(ev, WRITE_ACCEPTED) + ensure + ev.close + end + end - @call.start_read(self) - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, READ) - logger.debug("received req: #{ev.result.inspect}") - unless ev.result.nil? - logger.debug("received req.to_s: #{ev.result}") - res = @unmarshal.call(ev.result.to_s) - logger.debug("received_req (unmarshalled): #{res.inspect}") - return res - end - ensure - ev.close - end - logger.debug('found nil; the final response has been sent') - nil + # send_status sends a status to the remote endpoint + # + # @param code [int] the status code to send + # @param details [String] details + # @param assert_finished [true, false] when true(default), waits for + # FINISHED. + def send_status(code = OK, details = '', assert_finished = false) + assert_queue_is_ready + @call.start_write_status(code, details, self) + ev = @cq.pluck(self, INFINITE_FUTURE) + begin + assert_event_type(ev, FINISH_ACCEPTED) + ensure + ev.close end + logger.debug("Status sent: #{code}:'#{details}'") + return finished if assert_finished + nil + end - # each_remote_read passes each response to the given block or returns an - # enumerator the responses if no block is given. - # - # == Enumerator == - # - # * #next blocks until the remote endpoint sends a READ or FINISHED - # * for each read, enumerator#next yields the response - # * on status - # * if it's is OK, enumerator#next raises StopException - # * if is not OK, enumerator#next raises RuntimeException - # - # == Block == - # - # * if provided it is executed for each response - # * the call blocks until no more responses are provided - # - # @return [Enumerator] if no block was given - def each_remote_read - return enum_for(:each_remote_read) unless block_given? - loop do - resp = remote_read - break if resp.is_a? Struct::Status # is an OK status - break if resp.nil? # the last response was received - yield resp - end + # remote_read reads a response from the remote endpoint. + # + # It blocks until the remote endpoint sends a READ or FINISHED event. On + # a READ, it returns the response after unmarshalling it. On + # FINISHED, it returns nil if the status is OK, otherwise raising + # BadStatus + def remote_read + if @call.metadata.nil? && !@read_metadata_tag.nil? + ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE) + assert_event_type(ev, CLIENT_METADATA_READ) + @call.metadata = ev.result + @read_metadata_tag = nil end - # each_remote_read_then_finish passes each response to the given block or - # returns an enumerator of the responses if no block is given. - # - # It is like each_remote_read, but it blocks on finishing on detecting - # the final message. - # - # == Enumerator == - # - # * #next blocks until the remote endpoint sends a READ or FINISHED - # * for each read, enumerator#next yields the response - # * on status - # * if it's is OK, enumerator#next raises StopException - # * if is not OK, enumerator#next raises RuntimeException - # - # == Block == - # - # * if provided it is executed for each response - # * the call blocks until no more responses are provided - # - # @return [Enumerator] if no block was given - def each_remote_read_then_finish - return enum_for(:each_remote_read_then_finish) unless block_given? - loop do - resp = remote_read - break if resp.is_a? Struct::Status # is an OK status - if resp.nil? # the last response was received, but not finished yet - finished - break - end - yield resp + @call.start_read(self) + ev = @cq.pluck(self, INFINITE_FUTURE) + begin + assert_event_type(ev, READ) + logger.debug("received req: #{ev.result.inspect}") + unless ev.result.nil? + logger.debug("received req.to_s: #{ev.result}") + res = @unmarshal.call(ev.result.to_s) + logger.debug("received_req (unmarshalled): #{res.inspect}") + return res end + ensure + ev.close end + logger.debug('found nil; the final response has been sent') + nil + end - # request_response sends a request to a GRPC server, and returns the - # response. - # - # == Keyword Arguments == - # any keyword arguments are treated as metadata to be sent to the server - # if a keyword value is a list, multiple metadata for it's key are sent - # - # @param req [Object] the request sent to the server - # @return [Object] the response received from the server - def request_response(req, **kw) - start_call(**kw) unless @started - remote_send(req) - writes_done(false) - response = remote_read - finished unless response.is_a? Struct::Status - response + # each_remote_read passes each response to the given block or returns an + # enumerator the responses if no block is given. + # + # == Enumerator == + # + # * #next blocks until the remote endpoint sends a READ or FINISHED + # * for each read, enumerator#next yields the response + # * on status + # * if it's is OK, enumerator#next raises StopException + # * if is not OK, enumerator#next raises RuntimeException + # + # == Block == + # + # * if provided it is executed for each response + # * the call blocks until no more responses are provided + # + # @return [Enumerator] if no block was given + def each_remote_read + return enum_for(:each_remote_read) unless block_given? + loop do + resp = remote_read + break if resp.is_a? Struct::Status # is an OK status + break if resp.nil? # the last response was received + yield resp end + end - # client_streamer sends a stream of requests to a GRPC server, and - # returns a single response. - # - # requests provides an 'iterable' of Requests. I.e. it follows Ruby's - # #each enumeration protocol. In the simplest case, requests will be an - # array of marshallable objects; in typical case it will be an Enumerable - # that allows dynamic construction of the marshallable objects. - # - # == Keyword Arguments == - # any keyword arguments are treated as metadata to be sent to the server - # if a keyword value is a list, multiple metadata for it's key are sent - # - # @param requests [Object] an Enumerable of requests to send - # @return [Object] the response received from the server - def client_streamer(requests, **kw) - start_call(**kw) unless @started - requests.each { |r| remote_send(r) } - writes_done(false) - response = remote_read - finished unless response.is_a? Struct::Status - response + # each_remote_read_then_finish passes each response to the given block or + # returns an enumerator of the responses if no block is given. + # + # It is like each_remote_read, but it blocks on finishing on detecting + # the final message. + # + # == Enumerator == + # + # * #next blocks until the remote endpoint sends a READ or FINISHED + # * for each read, enumerator#next yields the response + # * on status + # * if it's is OK, enumerator#next raises StopException + # * if is not OK, enumerator#next raises RuntimeException + # + # == Block == + # + # * if provided it is executed for each response + # * the call blocks until no more responses are provided + # + # @return [Enumerator] if no block was given + def each_remote_read_then_finish + return enum_for(:each_remote_read_then_finish) unless block_given? + loop do + resp = remote_read + break if resp.is_a? Struct::Status # is an OK status + if resp.nil? # the last response was received, but not finished yet + finished + break + end + yield resp end + end - # server_streamer sends one request to the GRPC server, which yields a - # stream of responses. - # - # responses provides an enumerator over the streamed responses, i.e. it - # follows Ruby's #each iteration protocol. The enumerator blocks while - # waiting for each response, stops when the server signals that no - # further responses will be supplied. If the implicit block is provided, - # it is executed with each response as the argument and no result is - # returned. - # - # == Keyword Arguments == - # any keyword arguments are treated as metadata to be sent to the server - # if a keyword value is a list, multiple metadata for it's key are sent - # any keyword arguments are treated as metadata to be sent to the server. - # - # @param req [Object] the request sent to the server - # @return [Enumerator|nil] a response Enumerator - def server_streamer(req, **kw) - start_call(**kw) unless @started - remote_send(req) - writes_done(false) - replies = enum_for(:each_remote_read_then_finish) - return replies unless block_given? - replies.each { |r| yield r } - end + # request_response sends a request to a GRPC server, and returns the + # response. + # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + # + # @param req [Object] the request sent to the server + # @return [Object] the response received from the server + def request_response(req, **kw) + start_call(**kw) unless @started + remote_send(req) + writes_done(false) + response = remote_read + finished unless response.is_a? Struct::Status + response + end - # bidi_streamer sends a stream of requests to the GRPC server, and yields - # a stream of responses. - # - # This method takes an Enumerable of requests, and returns and enumerable - # of responses. - # - # == requests == - # - # requests provides an 'iterable' of Requests. I.e. it follows Ruby's - # #each enumeration protocol. In the simplest case, requests will be an - # array of marshallable objects; in typical case it will be an - # Enumerable that allows dynamic construction of the marshallable - # objects. - # - # == responses == - # - # This is an enumerator of responses. I.e, its #next method blocks - # waiting for the next response. Also, if at any point the block needs - # to consume all the remaining responses, this can be done using #each or - # #collect. Calling #each or #collect should only be done if - # the_call#writes_done has been called, otherwise the block will loop - # forever. - # - # == Keyword Arguments == - # any keyword arguments are treated as metadata to be sent to the server - # if a keyword value is a list, multiple metadata for it's key are sent - # - # @param requests [Object] an Enumerable of requests to send - # @return [Enumerator, nil] a response Enumerator - def bidi_streamer(requests, **kw, &blk) - start_call(**kw) unless @started - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline, - @finished_tag) - bd.run_on_client(requests, &blk) - end + # client_streamer sends a stream of requests to a GRPC server, and + # returns a single response. + # + # requests provides an 'iterable' of Requests. I.e. it follows Ruby's + # #each enumeration protocol. In the simplest case, requests will be an + # array of marshallable objects; in typical case it will be an Enumerable + # that allows dynamic construction of the marshallable objects. + # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + # + # @param requests [Object] an Enumerable of requests to send + # @return [Object] the response received from the server + def client_streamer(requests, **kw) + start_call(**kw) unless @started + requests.each { |r| remote_send(r) } + writes_done(false) + response = remote_read + finished unless response.is_a? Struct::Status + response + end - # run_server_bidi orchestrates a BiDi stream processing on a server. - # - # N.B. gen_each_reply is a func(Enumerable) - # - # It takes an enumerable of requests as an arg, in case there is a - # relationship between the stream of requests and the stream of replies. - # - # This does not mean that must necessarily be one. E.g, the replies - # produced by gen_each_reply could ignore the received_msgs - # - # @param gen_each_reply [Proc] generates the BiDi stream replies - def run_server_bidi(gen_each_reply) - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline, - @finished_tag) - bd.run_on_server(gen_each_reply) - end + # server_streamer sends one request to the GRPC server, which yields a + # stream of responses. + # + # responses provides an enumerator over the streamed responses, i.e. it + # follows Ruby's #each iteration protocol. The enumerator blocks while + # waiting for each response, stops when the server signals that no + # further responses will be supplied. If the implicit block is provided, + # it is executed with each response as the argument and no result is + # returned. + # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + # any keyword arguments are treated as metadata to be sent to the server. + # + # @param req [Object] the request sent to the server + # @return [Enumerator|nil] a response Enumerator + def server_streamer(req, **kw) + start_call(**kw) unless @started + remote_send(req) + writes_done(false) + replies = enum_for(:each_remote_read_then_finish) + return replies unless block_given? + replies.each { |r| yield r } + end + + # bidi_streamer sends a stream of requests to the GRPC server, and yields + # a stream of responses. + # + # This method takes an Enumerable of requests, and returns and enumerable + # of responses. + # + # == requests == + # + # requests provides an 'iterable' of Requests. I.e. it follows Ruby's + # #each enumeration protocol. In the simplest case, requests will be an + # array of marshallable objects; in typical case it will be an + # Enumerable that allows dynamic construction of the marshallable + # objects. + # + # == responses == + # + # This is an enumerator of responses. I.e, its #next method blocks + # waiting for the next response. Also, if at any point the block needs + # to consume all the remaining responses, this can be done using #each or + # #collect. Calling #each or #collect should only be done if + # the_call#writes_done has been called, otherwise the block will loop + # forever. + # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + # + # @param requests [Object] an Enumerable of requests to send + # @return [Enumerator, nil] a response Enumerator + def bidi_streamer(requests, **kw, &blk) + start_call(**kw) unless @started + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline, + @finished_tag) + bd.run_on_client(requests, &blk) + end - private + # run_server_bidi orchestrates a BiDi stream processing on a server. + # + # N.B. gen_each_reply is a func(Enumerable) + # + # It takes an enumerable of requests as an arg, in case there is a + # relationship between the stream of requests and the stream of replies. + # + # This does not mean that must necessarily be one. E.g, the replies + # produced by gen_each_reply could ignore the received_msgs + # + # @param gen_each_reply [Proc] generates the BiDi stream replies + def run_server_bidi(gen_each_reply) + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline, + @finished_tag) + bd.run_on_server(gen_each_reply) + end - def start_call(**kw) - tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) - @finished_tag, @read_metadata_tag = tags - @started = true - end + private - def self.view_class(*visible_methods) - Class.new do - extend ::Forwardable - def_delegators :@wrapped, *visible_methods + def start_call(**kw) + tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) + @finished_tag, @read_metadata_tag = tags + @started = true + end - # @param wrapped [ActiveCall] the call whose methods are shielded - def initialize(wrapped) - @wrapped = wrapped - end + def self.view_class(*visible_methods) + Class.new do + extend ::Forwardable + def_delegators :@wrapped, *visible_methods + + # @param wrapped [ActiveCall] the call whose methods are shielded + def initialize(wrapped) + @wrapped = wrapped end end + end - # SingleReqView limits access to an ActiveCall's methods for use in server - # handlers that receive just one request. - SingleReqView = view_class(:cancelled, :deadline) - - # MultiReqView limits access to an ActiveCall's methods for use in - # server client_streamer handlers. - MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg, - :each_remote_read) - - # Operation limits access to an ActiveCall's methods for use as - # a Operation on the client. - Operation = view_class(:cancel, :cancelled, :deadline, :execute, - :metadata, :status) - - # confirms that no events are enqueued, and that the queue is not - # shutdown. - def assert_queue_is_ready - ev = nil - begin - ev = @cq.pluck(self, ZERO) - fail "unexpected event #{ev.inspect}" unless ev.nil? - rescue OutOfTime - logging.debug('timed out waiting for next event') - # expected, nothing should be on the queue and the deadline was ZERO, - # except things using another tag - ensure - ev.close unless ev.nil? - end + # SingleReqView limits access to an ActiveCall's methods for use in server + # handlers that receive just one request. + SingleReqView = view_class(:cancelled, :deadline) + + # MultiReqView limits access to an ActiveCall's methods for use in + # server client_streamer handlers. + MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg, + :each_remote_read) + + # Operation limits access to an ActiveCall's methods for use as + # a Operation on the client. + Operation = view_class(:cancel, :cancelled, :deadline, :execute, + :metadata, :status) + + # confirms that no events are enqueued, and that the queue is not + # shutdown. + def assert_queue_is_ready + ev = nil + begin + ev = @cq.pluck(self, ZERO) + fail "unexpected event #{ev.inspect}" unless ev.nil? + rescue OutOfTime + logging.debug('timed out waiting for next event') + # expected, nothing should be on the queue and the deadline was ZERO, + # except things using another tag + ensure + ev.close unless ev.nil? end end end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 099d57151c0..8350690fdf3 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -36,186 +36,184 @@ def assert_event_type(ev, want) fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want end -module Google - # Google::RPC contains the General RPC module. - module RPC - # The BiDiCall class orchestrates exection of a BiDi stream on a client or - # server. - class BidiCall - include Core::CompletionType - include Core::StatusCodes - include Core::TimeConsts +# GRPC contains the General RPC module. +module GRPC + # The BiDiCall class orchestrates exection of a BiDi stream on a client or + # server. + class BidiCall + include Core::CompletionType + include Core::StatusCodes + include Core::TimeConsts - # Creates a BidiCall. - # - # BidiCall should only be created after a call is accepted. That means - # different things on a client and a server. On the client, the call is - # accepted after call.invoke. On the server, this is after call.accept. - # - # #initialize cannot determine if the call is accepted or not; so if a - # call that's not accepted is used here, the error won't be visible until - # the BidiCall#run is called. - # - # deadline is the absolute deadline for the call. - # - # @param call [Call] the call used by the ActiveCall - # @param q [CompletionQueue] the completion queue used to accept - # the call - # @param marshal [Function] f(obj)->string that marshal requests - # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Fixnum] the deadline for the call to complete - # @param finished_tag [Object] the object used as the call's finish tag, - def initialize(call, q, marshal, unmarshal, deadline, finished_tag) - fail(ArgumentError, 'not a call') unless call.is_a? Core::Call - unless q.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') - end - @call = call - @cq = q - @deadline = deadline - @finished_tag = finished_tag - @marshal = marshal - @readq = Queue.new - @unmarshal = unmarshal + # Creates a BidiCall. + # + # BidiCall should only be created after a call is accepted. That means + # different things on a client and a server. On the client, the call is + # accepted after call.invoke. On the server, this is after call.accept. + # + # #initialize cannot determine if the call is accepted or not; so if a + # call that's not accepted is used here, the error won't be visible until + # the BidiCall#run is called. + # + # deadline is the absolute deadline for the call. + # + # @param call [Call] the call used by the ActiveCall + # @param q [CompletionQueue] the completion queue used to accept + # the call + # @param marshal [Function] f(obj)->string that marshal requests + # @param unmarshal [Function] f(string)->obj that unmarshals responses + # @param deadline [Fixnum] the deadline for the call to complete + # @param finished_tag [Object] the object used as the call's finish tag, + def initialize(call, q, marshal, unmarshal, deadline, finished_tag) + fail(ArgumentError, 'not a call') unless call.is_a? Core::Call + unless q.is_a? Core::CompletionQueue + fail(ArgumentError, 'not a CompletionQueue') end + @call = call + @cq = q + @deadline = deadline + @finished_tag = finished_tag + @marshal = marshal + @readq = Queue.new + @unmarshal = unmarshal + end - # Begins orchestration of the Bidi stream for a client sending requests. - # - # The method either returns an Enumerator of the responses, or accepts a - # block that can be invoked with each response. - # - # @param requests the Enumerable of requests to send - # @return an Enumerator of requests to yield - def run_on_client(requests, &blk) - enq_th = start_write_loop(requests) - loop_th = start_read_loop - replies = each_queued_msg - return replies if blk.nil? - replies.each { |r| blk.call(r) } - enq_th.join - loop_th.join - end + # Begins orchestration of the Bidi stream for a client sending requests. + # + # The method either returns an Enumerator of the responses, or accepts a + # block that can be invoked with each response. + # + # @param requests the Enumerable of requests to send + # @return an Enumerator of requests to yield + def run_on_client(requests, &blk) + enq_th = start_write_loop(requests) + loop_th = start_read_loop + replies = each_queued_msg + return replies if blk.nil? + replies.each { |r| blk.call(r) } + enq_th.join + loop_th.join + end - # Begins orchestration of the Bidi stream for a server generating replies. - # - # N.B. gen_each_reply is a func(Enumerable) - # - # It takes an enumerable of requests as an arg, in case there is a - # relationship between the stream of requests and the stream of replies. - # - # This does not mean that must necessarily be one. E.g, the replies - # produced by gen_each_reply could ignore the received_msgs - # - # @param gen_each_reply [Proc] generates the BiDi stream replies. - def run_on_server(gen_each_reply) - replys = gen_each_reply.call(each_queued_msg) - enq_th = start_write_loop(replys, is_client: false) - loop_th = start_read_loop - loop_th.join - enq_th.join - end + # Begins orchestration of the Bidi stream for a server generating replies. + # + # N.B. gen_each_reply is a func(Enumerable) + # + # It takes an enumerable of requests as an arg, in case there is a + # relationship between the stream of requests and the stream of replies. + # + # This does not mean that must necessarily be one. E.g, the replies + # produced by gen_each_reply could ignore the received_msgs + # + # @param gen_each_reply [Proc] generates the BiDi stream replies. + def run_on_server(gen_each_reply) + replys = gen_each_reply.call(each_queued_msg) + enq_th = start_write_loop(replys, is_client: false) + loop_th = start_read_loop + loop_th.join + enq_th.join + end - private + private - END_OF_READS = :end_of_reads - END_OF_WRITES = :end_of_writes + END_OF_READS = :end_of_reads + END_OF_WRITES = :end_of_writes - # each_queued_msg yields each message on this instances readq - # - # - messages are added to the readq by #read_loop - # - iteration ends when the instance itself is added - def each_queued_msg - return enum_for(:each_queued_msg) unless block_given? - count = 0 - loop do - logger.debug("each_queued_msg: msg##{count}") - count += 1 - req = @readq.pop - throw req if req.is_a? StandardError - break if req.equal?(END_OF_READS) - yield req - end + # each_queued_msg yields each message on this instances readq + # + # - messages are added to the readq by #read_loop + # - iteration ends when the instance itself is added + def each_queued_msg + return enum_for(:each_queued_msg) unless block_given? + count = 0 + loop do + logger.debug("each_queued_msg: msg##{count}") + count += 1 + req = @readq.pop + throw req if req.is_a? StandardError + break if req.equal?(END_OF_READS) + yield req end + end - # during bidi-streaming, read the requests to send from a separate thread - # read so that read_loop does not block waiting for requests to read. - def start_write_loop(requests, is_client: true) - Thread.new do # TODO: run on a thread pool - write_tag = Object.new - begin - count = 0 - requests.each do |req| - count += 1 - payload = @marshal.call(req) - @call.start_write(Core::ByteBuffer.new(payload), write_tag) - ev = @cq.pluck(write_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, WRITE_ACCEPTED) - ensure - ev.close - end + # during bidi-streaming, read the requests to send from a separate thread + # read so that read_loop does not block waiting for requests to read. + def start_write_loop(requests, is_client: true) + Thread.new do # TODO: run on a thread pool + write_tag = Object.new + begin + count = 0 + requests.each do |req| + count += 1 + payload = @marshal.call(req) + @call.start_write(Core::ByteBuffer.new(payload), write_tag) + ev = @cq.pluck(write_tag, INFINITE_FUTURE) + begin + assert_event_type(ev, WRITE_ACCEPTED) + ensure + ev.close end - if is_client - @call.writes_done(write_tag) - ev = @cq.pluck(write_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISH_ACCEPTED) - ensure - ev.close - end - logger.debug("bidi-client: sent #{count} reqs, waiting to finish") - ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISHED) - ensure - ev.close - end - logger.debug('bidi-client: finished received') + end + if is_client + @call.writes_done(write_tag) + ev = @cq.pluck(write_tag, INFINITE_FUTURE) + begin + assert_event_type(ev, FINISH_ACCEPTED) + ensure + ev.close end - rescue StandardError => e - logger.warn('bidi: write_loop failed') - logger.warn(e) + logger.debug("bidi-client: sent #{count} reqs, waiting to finish") + ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) + begin + assert_event_type(ev, FINISHED) + ensure + ev.close + end + logger.debug('bidi-client: finished received') end + rescue StandardError => e + logger.warn('bidi: write_loop failed') + logger.warn(e) end end + end - # starts the read loop - def start_read_loop - Thread.new do - begin - read_tag = Object.new - count = 0 - - # queue the initial read before beginning the loop - loop do - logger.debug("waiting for read #{count}") - count += 1 - @call.start_read(read_tag) - ev = @cq.pluck(read_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, READ) + # starts the read loop + def start_read_loop + Thread.new do + begin + read_tag = Object.new + count = 0 - # handle the next event. - if ev.result.nil? - @readq.push(END_OF_READS) - logger.debug('done reading!') - break - end + # queue the initial read before beginning the loop + loop do + logger.debug("waiting for read #{count}") + count += 1 + @call.start_read(read_tag) + ev = @cq.pluck(read_tag, INFINITE_FUTURE) + begin + assert_event_type(ev, READ) - # push the latest read onto the queue and continue reading - logger.debug("received req: #{ev.result}") - res = @unmarshal.call(ev.result.to_s) - @readq.push(res) - ensure - ev.close + # handle the next event. + if ev.result.nil? + @readq.push(END_OF_READS) + logger.debug('done reading!') + break end - end - rescue StandardError => e - logger.warn('bidi: read_loop failed') - logger.warn(e) - @readq.push(e) # let each_queued_msg terminate with this error + # push the latest read onto the queue and continue reading + logger.debug("received req: #{ev.result}") + res = @unmarshal.call(ev.result.to_s) + @readq.push(res) + ensure + ev.close + end end + + rescue StandardError => e + logger.warn('bidi: read_loop failed') + logger.warn(e) + @readq.push(e) # let each_queued_msg terminate with this error end end end diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index 7e13de19ca5..1a4ed58fa3f 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -30,381 +30,379 @@ require 'grpc/generic/active_call' require 'xray/thread_dump_signal_handler' -module Google - # Google::RPC contains the General RPC module. - module RPC - # ClientStub represents an endpoint used to send requests to GRPC servers. - class ClientStub - include Core::StatusCodes +# GRPC contains the General RPC module. +module GRPC + # ClientStub represents an endpoint used to send requests to GRPC servers. + class ClientStub + include Core::StatusCodes - # Default deadline is 5 seconds. - DEFAULT_DEADLINE = 5 + # Default deadline is 5 seconds. + DEFAULT_DEADLINE = 5 - # Creates a new ClientStub. - # - # Minimally, a stub is created with the just the host of the gRPC service - # it wishes to access, e.g., - # - # my_stub = ClientStub.new(example.host.com:50505) - # - # Any arbitrary keyword arguments are treated as channel arguments used to - # configure the RPC connection to the host. - # - # There are some specific keyword args that are not used to configure the - # channel: - # - # - :channel_override - # when present, this must be a pre-created GRPC::Channel. If it's - # present the host and arbitrary keyword arg areignored, and the RPC - # connection uses this channel. - # - # - :deadline - # when present, this is the default deadline used for calls - # - # - :update_metadata - # when present, this a func that takes a hash and returns a hash - # it can be used to update metadata, i.e, remove, change or update - # amend metadata values. - # - # @param host [String] the host the stub connects to - # @param q [Core::CompletionQueue] used to wait for events - # @param channel_override [Core::Channel] a pre-created channel - # @param deadline [Number] the default deadline to use in requests - # @param creds [Core::Credentials] the channel - # @param update_metadata a func that updates metadata as described above - # @param kw [KeywordArgs]the channel arguments - def initialize(host, q, - channel_override:nil, - deadline: DEFAULT_DEADLINE, - creds: nil, - update_metadata: nil, - **kw) - unless q.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') - end - @queue = q + # Creates a new ClientStub. + # + # Minimally, a stub is created with the just the host of the gRPC service + # it wishes to access, e.g., + # + # my_stub = ClientStub.new(example.host.com:50505) + # + # Any arbitrary keyword arguments are treated as channel arguments used to + # configure the RPC connection to the host. + # + # There are some specific keyword args that are not used to configure the + # channel: + # + # - :channel_override + # when present, this must be a pre-created GRPC::Channel. If it's + # present the host and arbitrary keyword arg areignored, and the RPC + # connection uses this channel. + # + # - :deadline + # when present, this is the default deadline used for calls + # + # - :update_metadata + # when present, this a func that takes a hash and returns a hash + # it can be used to update metadata, i.e, remove, change or update + # amend metadata values. + # + # @param host [String] the host the stub connects to + # @param q [Core::CompletionQueue] used to wait for events + # @param channel_override [Core::Channel] a pre-created channel + # @param deadline [Number] the default deadline to use in requests + # @param creds [Core::Credentials] the channel + # @param update_metadata a func that updates metadata as described above + # @param kw [KeywordArgs]the channel arguments + def initialize(host, q, + channel_override:nil, + deadline: DEFAULT_DEADLINE, + creds: nil, + update_metadata: nil, + **kw) + unless q.is_a? Core::CompletionQueue + fail(ArgumentError, 'not a CompletionQueue') + end + @queue = q - # set the channel instance - if !channel_override.nil? - ch = channel_override - fail(ArgumentError, 'not a Channel') unless ch.is_a? Core::Channel + # set the channel instance + if !channel_override.nil? + ch = channel_override + fail(ArgumentError, 'not a Channel') unless ch.is_a? Core::Channel + else + if creds.nil? + ch = Core::Channel.new(host, kw) + elsif !creds.is_a?(Core::Credentials) + fail(ArgumentError, 'not a Credentials') else - if creds.nil? - ch = Core::Channel.new(host, kw) - elsif !creds.is_a?(Core::Credentials) - fail(ArgumentError, 'not a Credentials') - else - ch = Core::Channel.new(host, kw, creds) - end + ch = Core::Channel.new(host, kw, creds) end - @ch = ch - - @update_metadata = nil - unless update_metadata.nil? - unless update_metadata.is_a? Proc - fail(ArgumentError, 'update_metadata is not a Proc') - end - @update_metadata = update_metadata - end - - @host = host - @deadline = deadline end + @ch = ch - # request_response sends a request to a GRPC server, and returns the - # response. - # - # == Flow Control == - # This is a blocking call. - # - # * it does not return until a response is received. - # - # * the requests is sent only when GRPC core's flow control allows it to - # be sent. - # - # == Errors == - # An RuntimeError is raised if - # - # * the server responds with a non-OK status - # - # * the deadline is exceeded - # - # == Return Value == - # - # If return_op is false, the call returns the response - # - # If return_op is true, the call returns an Operation, calling execute - # on the Operation returns the response. - # - # == Keyword Args == - # - # Unspecified keyword arguments are treated as metadata to be sent to the - # server. - # - # @param method [String] the RPC method to call on the GRPC server - # @param req [Object] the request sent to the server - # @param marshal [Function] f(obj)->string that marshals requests - # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] (optional) the max completion time in seconds - # @param return_op [true|false] return an Operation if true - # @return [Object] the response received from the server - def request_response(method, req, marshal, unmarshal, deadline = nil, - return_op: false, **kw) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) - return c.request_response(req, **md) unless return_op - - # return the operation view of the active_call; define #execute as a - # new method for this instance that invokes #request_response. - op = c.operation - op.define_singleton_method(:execute) do - c.request_response(req, **md) + @update_metadata = nil + unless update_metadata.nil? + unless update_metadata.is_a? Proc + fail(ArgumentError, 'update_metadata is not a Proc') end - op + @update_metadata = update_metadata end - # client_streamer sends a stream of requests to a GRPC server, and - # returns a single response. - # - # requests provides an 'iterable' of Requests. I.e. it follows Ruby's - # #each enumeration protocol. In the simplest case, requests will be an - # array of marshallable objects; in typical case it will be an Enumerable - # that allows dynamic construction of the marshallable objects. - # - # == Flow Control == - # This is a blocking call. - # - # * it does not return until a response is received. - # - # * each requests is sent only when GRPC core's flow control allows it to - # be sent. - # - # == Errors == - # An RuntimeError is raised if - # - # * the server responds with a non-OK status - # - # * the deadline is exceeded - # - # == Return Value == - # - # If return_op is false, the call consumes the requests and returns - # the response. - # - # If return_op is true, the call returns the response. - # - # == Keyword Args == - # - # Unspecified keyword arguments are treated as metadata to be sent to the - # server. - # - # @param method [String] the RPC method to call on the GRPC server - # @param requests [Object] an Enumerable of requests to send - # @param marshal [Function] f(obj)->string that marshals requests - # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] the max completion time in seconds - # @param return_op [true|false] return an Operation if true - # @return [Object|Operation] the response received from the server - def client_streamer(method, requests, marshal, unmarshal, deadline = nil, - return_op: false, **kw) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) - return c.client_streamer(requests, **md) unless return_op + @host = host + @deadline = deadline + end + + # request_response sends a request to a GRPC server, and returns the + # response. + # + # == Flow Control == + # This is a blocking call. + # + # * it does not return until a response is received. + # + # * the requests is sent only when GRPC core's flow control allows it to + # be sent. + # + # == Errors == + # An RuntimeError is raised if + # + # * the server responds with a non-OK status + # + # * the deadline is exceeded + # + # == Return Value == + # + # If return_op is false, the call returns the response + # + # If return_op is true, the call returns an Operation, calling execute + # on the Operation returns the response. + # + # == Keyword Args == + # + # Unspecified keyword arguments are treated as metadata to be sent to the + # server. + # + # @param method [String] the RPC method to call on the GRPC server + # @param req [Object] the request sent to the server + # @param marshal [Function] f(obj)->string that marshals requests + # @param unmarshal [Function] f(string)->obj that unmarshals responses + # @param deadline [Numeric] (optional) the max completion time in seconds + # @param return_op [true|false] return an Operation if true + # @return [Object] the response received from the server + def request_response(method, req, marshal, unmarshal, deadline = nil, + return_op: false, **kw) + c = new_active_call(method, marshal, unmarshal, deadline || @deadline) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + return c.request_response(req, **md) unless return_op - # return the operation view of the active_call; define #execute as a - # new method for this instance that invokes #client_streamer. - op = c.operation - op.define_singleton_method(:execute) do - c.client_streamer(requests, **md) - end - op + # return the operation view of the active_call; define #execute as a + # new method for this instance that invokes #request_response. + op = c.operation + op.define_singleton_method(:execute) do + c.request_response(req, **md) end + op + end - # server_streamer sends one request to the GRPC server, which yields a - # stream of responses. - # - # responses provides an enumerator over the streamed responses, i.e. it - # follows Ruby's #each iteration protocol. The enumerator blocks while - # waiting for each response, stops when the server signals that no - # further responses will be supplied. If the implicit block is provided, - # it is executed with each response as the argument and no result is - # returned. - # - # == Flow Control == - # This is a blocking call. - # - # * the request is sent only when GRPC core's flow control allows it to - # be sent. - # - # * the request will not complete until the server sends the final - # response followed by a status message. - # - # == Errors == - # An RuntimeError is raised if - # - # * the server responds with a non-OK status when any response is - # * retrieved - # - # * the deadline is exceeded - # - # == Return Value == - # - # if the return_op is false, the return value is an Enumerator of the - # results, unless a block is provided, in which case the block is - # executed with each response. - # - # if return_op is true, the function returns an Operation whose #execute - # method runs server streamer call. Again, Operation#execute either - # calls the given block with each response or returns an Enumerator of the - # responses. - # - # == Keyword Args == - # - # Unspecified keyword arguments are treated as metadata to be sent to the - # server. - # - # @param method [String] the RPC method to call on the GRPC server - # @param req [Object] the request sent to the server - # @param marshal [Function] f(obj)->string that marshals requests - # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] the max completion time in seconds - # @param return_op [true|false]return an Operation if true - # @param blk [Block] when provided, is executed for each response - # @return [Enumerator|Operation|nil] as discussed above - def server_streamer(method, req, marshal, unmarshal, deadline = nil, - return_op: false, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) - return c.server_streamer(req, **md, &blk) unless return_op + # client_streamer sends a stream of requests to a GRPC server, and + # returns a single response. + # + # requests provides an 'iterable' of Requests. I.e. it follows Ruby's + # #each enumeration protocol. In the simplest case, requests will be an + # array of marshallable objects; in typical case it will be an Enumerable + # that allows dynamic construction of the marshallable objects. + # + # == Flow Control == + # This is a blocking call. + # + # * it does not return until a response is received. + # + # * each requests is sent only when GRPC core's flow control allows it to + # be sent. + # + # == Errors == + # An RuntimeError is raised if + # + # * the server responds with a non-OK status + # + # * the deadline is exceeded + # + # == Return Value == + # + # If return_op is false, the call consumes the requests and returns + # the response. + # + # If return_op is true, the call returns the response. + # + # == Keyword Args == + # + # Unspecified keyword arguments are treated as metadata to be sent to the + # server. + # + # @param method [String] the RPC method to call on the GRPC server + # @param requests [Object] an Enumerable of requests to send + # @param marshal [Function] f(obj)->string that marshals requests + # @param unmarshal [Function] f(string)->obj that unmarshals responses + # @param deadline [Numeric] the max completion time in seconds + # @param return_op [true|false] return an Operation if true + # @return [Object|Operation] the response received from the server + def client_streamer(method, requests, marshal, unmarshal, deadline = nil, + return_op: false, **kw) + c = new_active_call(method, marshal, unmarshal, deadline || @deadline) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + return c.client_streamer(requests, **md) unless return_op - # return the operation view of the active_call; define #execute - # as a new method for this instance that invokes #server_streamer - op = c.operation - op.define_singleton_method(:execute) do - c.server_streamer(req, **md, &blk) - end - op + # return the operation view of the active_call; define #execute as a + # new method for this instance that invokes #client_streamer. + op = c.operation + op.define_singleton_method(:execute) do + c.client_streamer(requests, **md) end + op + end - # bidi_streamer sends a stream of requests to the GRPC server, and yields - # a stream of responses. - # - # This method takes an Enumerable of requests, and returns and enumerable - # of responses. - # - # == requests == - # - # requests provides an 'iterable' of Requests. I.e. it follows Ruby's - # #each enumeration protocol. In the simplest case, requests will be an - # array of marshallable objects; in typical case it will be an - # Enumerable that allows dynamic construction of the marshallable - # objects. - # - # == responses == - # - # This is an enumerator of responses. I.e, its #next method blocks - # waiting for the next response. Also, if at any point the block needs - # to consume all the remaining responses, this can be done using #each or - # #collect. Calling #each or #collect should only be done if - # the_call#writes_done has been called, otherwise the block will loop - # forever. - # - # == Flow Control == - # This is a blocking call. - # - # * the call completes when the next call to provided block returns - # * [False] - # - # * the execution block parameters are two objects for sending and - # receiving responses, each of which blocks waiting for flow control. - # E.g, calles to bidi_call#remote_send will wait until flow control - # allows another write before returning; and obviously calls to - # responses#next block until the next response is available. - # - # == Termination == - # - # As well as sending and receiving messages, the block passed to the - # function is also responsible for: - # - # * calling bidi_call#writes_done to indicate no further reqs will be - # sent. - # - # * returning false if once the bidi stream is functionally completed. - # - # Note that response#next will indicate that there are no further - # responses by throwing StopIteration, but can only happen either - # if bidi_call#writes_done is called. - # - # To terminate the RPC correctly the block: - # - # * must call bidi#writes_done and then - # - # * either return false as soon as there is no need for other responses - # - # * loop on responses#next until no further responses are available - # - # == Errors == - # An RuntimeError is raised if - # - # * the server responds with a non-OK status when any response is - # * retrieved - # - # * the deadline is exceeded - # - # - # == Keyword Args == - # - # Unspecified keyword arguments are treated as metadata to be sent to the - # server. - # - # == Return Value == - # - # if the return_op is false, the return value is an Enumerator of the - # results, unless a block is provided, in which case the block is - # executed with each response. - # - # if return_op is true, the function returns an Operation whose #execute - # method runs the Bidi call. Again, Operation#execute either calls a - # given block with each response or returns an Enumerator of the - # responses. - # - # @param method [String] the RPC method to call on the GRPC server - # @param requests [Object] an Enumerable of requests to send - # @param marshal [Function] f(obj)->string that marshals requests - # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] (optional) the max completion time in seconds - # @param blk [Block] when provided, is executed for each response - # @param return_op [true|false] return an Operation if true - # @return [Enumerator|nil|Operation] as discussed above - def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil, + # server_streamer sends one request to the GRPC server, which yields a + # stream of responses. + # + # responses provides an enumerator over the streamed responses, i.e. it + # follows Ruby's #each iteration protocol. The enumerator blocks while + # waiting for each response, stops when the server signals that no + # further responses will be supplied. If the implicit block is provided, + # it is executed with each response as the argument and no result is + # returned. + # + # == Flow Control == + # This is a blocking call. + # + # * the request is sent only when GRPC core's flow control allows it to + # be sent. + # + # * the request will not complete until the server sends the final + # response followed by a status message. + # + # == Errors == + # An RuntimeError is raised if + # + # * the server responds with a non-OK status when any response is + # * retrieved + # + # * the deadline is exceeded + # + # == Return Value == + # + # if the return_op is false, the return value is an Enumerator of the + # results, unless a block is provided, in which case the block is + # executed with each response. + # + # if return_op is true, the function returns an Operation whose #execute + # method runs server streamer call. Again, Operation#execute either + # calls the given block with each response or returns an Enumerator of the + # responses. + # + # == Keyword Args == + # + # Unspecified keyword arguments are treated as metadata to be sent to the + # server. + # + # @param method [String] the RPC method to call on the GRPC server + # @param req [Object] the request sent to the server + # @param marshal [Function] f(obj)->string that marshals requests + # @param unmarshal [Function] f(string)->obj that unmarshals responses + # @param deadline [Numeric] the max completion time in seconds + # @param return_op [true|false]return an Operation if true + # @param blk [Block] when provided, is executed for each response + # @return [Enumerator|Operation|nil] as discussed above + def server_streamer(method, req, marshal, unmarshal, deadline = nil, return_op: false, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) - return c.bidi_streamer(requests, **md, &blk) unless return_op + c = new_active_call(method, marshal, unmarshal, deadline || @deadline) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + return c.server_streamer(req, **md, &blk) unless return_op - # return the operation view of the active_call; define #execute - # as a new method for this instance that invokes #bidi_streamer - op = c.operation - op.define_singleton_method(:execute) do - c.bidi_streamer(requests, **md, &blk) - end - op + # return the operation view of the active_call; define #execute + # as a new method for this instance that invokes #server_streamer + op = c.operation + op.define_singleton_method(:execute) do + c.server_streamer(req, **md, &blk) end + op + end - private + # bidi_streamer sends a stream of requests to the GRPC server, and yields + # a stream of responses. + # + # This method takes an Enumerable of requests, and returns and enumerable + # of responses. + # + # == requests == + # + # requests provides an 'iterable' of Requests. I.e. it follows Ruby's + # #each enumeration protocol. In the simplest case, requests will be an + # array of marshallable objects; in typical case it will be an + # Enumerable that allows dynamic construction of the marshallable + # objects. + # + # == responses == + # + # This is an enumerator of responses. I.e, its #next method blocks + # waiting for the next response. Also, if at any point the block needs + # to consume all the remaining responses, this can be done using #each or + # #collect. Calling #each or #collect should only be done if + # the_call#writes_done has been called, otherwise the block will loop + # forever. + # + # == Flow Control == + # This is a blocking call. + # + # * the call completes when the next call to provided block returns + # * [False] + # + # * the execution block parameters are two objects for sending and + # receiving responses, each of which blocks waiting for flow control. + # E.g, calles to bidi_call#remote_send will wait until flow control + # allows another write before returning; and obviously calls to + # responses#next block until the next response is available. + # + # == Termination == + # + # As well as sending and receiving messages, the block passed to the + # function is also responsible for: + # + # * calling bidi_call#writes_done to indicate no further reqs will be + # sent. + # + # * returning false if once the bidi stream is functionally completed. + # + # Note that response#next will indicate that there are no further + # responses by throwing StopIteration, but can only happen either + # if bidi_call#writes_done is called. + # + # To terminate the RPC correctly the block: + # + # * must call bidi#writes_done and then + # + # * either return false as soon as there is no need for other responses + # + # * loop on responses#next until no further responses are available + # + # == Errors == + # An RuntimeError is raised if + # + # * the server responds with a non-OK status when any response is + # * retrieved + # + # * the deadline is exceeded + # + # + # == Keyword Args == + # + # Unspecified keyword arguments are treated as metadata to be sent to the + # server. + # + # == Return Value == + # + # if the return_op is false, the return value is an Enumerator of the + # results, unless a block is provided, in which case the block is + # executed with each response. + # + # if return_op is true, the function returns an Operation whose #execute + # method runs the Bidi call. Again, Operation#execute either calls a + # given block with each response or returns an Enumerator of the + # responses. + # + # @param method [String] the RPC method to call on the GRPC server + # @param requests [Object] an Enumerable of requests to send + # @param marshal [Function] f(obj)->string that marshals requests + # @param unmarshal [Function] f(string)->obj that unmarshals responses + # @param deadline [Numeric] (optional) the max completion time in seconds + # @param blk [Block] when provided, is executed for each response + # @param return_op [true|false] return an Operation if true + # @return [Enumerator|nil|Operation] as discussed above + def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil, + return_op: false, **kw, &blk) + c = new_active_call(method, marshal, unmarshal, deadline || @deadline) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + return c.bidi_streamer(requests, **md, &blk) unless return_op - # Creates a new active stub - # - # @param ch [GRPC::Channel] the channel used to create the stub. - # @param marshal [Function] f(obj)->string that marshals requests - # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [TimeConst] - def new_active_call(ch, marshal, unmarshal, deadline = nil) - absolute_deadline = Core::TimeConsts.from_relative_time(deadline) - call = @ch.create_call(ch, @host, absolute_deadline) - ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline, - started: false) + # return the operation view of the active_call; define #execute + # as a new method for this instance that invokes #bidi_streamer + op = c.operation + op.define_singleton_method(:execute) do + c.bidi_streamer(requests, **md, &blk) end + op + end + + private + + # Creates a new active stub + # + # @param ch [GRPC::Channel] the channel used to create the stub. + # @param marshal [Function] f(obj)->string that marshals requests + # @param unmarshal [Function] f(string)->obj that unmarshals responses + # @param deadline [TimeConst] + def new_active_call(ch, marshal, unmarshal, deadline = nil) + absolute_deadline = Core::TimeConsts.from_relative_time(deadline) + call = @ch.create_call(ch, @host, absolute_deadline) + ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline, + started: false) end end end diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 876397a6e70..6d8bdeb3cc3 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -29,123 +29,122 @@ require 'grpc/grpc' -module Google - module RPC - # RpcDesc is a Descriptor of an RPC method. - class RpcDesc < Struct.new(:name, :input, :output, :marshal_method, - :unmarshal_method) - include Core::StatusCodes +# GRPC contains the General RPC module. +module GRPC + # RpcDesc is a Descriptor of an RPC method. + class RpcDesc < Struct.new(:name, :input, :output, :marshal_method, + :unmarshal_method) + include Core::StatusCodes - # Used to wrap a message class to indicate that it needs to be streamed. - class Stream - attr_accessor :type + # Used to wrap a message class to indicate that it needs to be streamed. + class Stream + attr_accessor :type - def initialize(type) - @type = type - end + def initialize(type) + @type = type end + end - # @return [Proc] { |instance| marshalled(instance) } - def marshal_proc - proc { |o| o.class.method(marshal_method).call(o).to_s } - end + # @return [Proc] { |instance| marshalled(instance) } + def marshal_proc + proc { |o| o.class.method(marshal_method).call(o).to_s } + end - # @param [:input, :output] target determines whether to produce the an - # unmarshal Proc for the rpc input parameter or - # its output parameter - # - # @return [Proc] An unmarshal proc { |marshalled(instance)| instance } - def unmarshal_proc(target) - fail ArgumentError unless [:input, :output].include?(target) - unmarshal_class = method(target).call - unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream - proc { |o| unmarshal_class.method(unmarshal_method).call(o) } - end + # @param [:input, :output] target determines whether to produce the an + # unmarshal Proc for the rpc input parameter or + # its output parameter + # + # @return [Proc] An unmarshal proc { |marshalled(instance)| instance } + def unmarshal_proc(target) + fail ArgumentError unless [:input, :output].include?(target) + unmarshal_class = method(target).call + unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream + proc { |o| unmarshal_class.method(unmarshal_method).call(o) } + end - def run_server_method(active_call, mth) - # While a server method is running, it might be cancelled, its deadline - # might be reached, the handler could throw an unknown error, or a - # well-behaved handler could throw a StatusError. - if request_response? - req = active_call.remote_read - resp = mth.call(req, active_call.single_req_view) - active_call.remote_send(resp) - elsif client_streamer? - resp = mth.call(active_call.multi_req_view) - active_call.remote_send(resp) - elsif server_streamer? - req = active_call.remote_read - replys = mth.call(req, active_call.single_req_view) - replys.each { |r| active_call.remote_send(r) } - else # is a bidi_stream - active_call.run_server_bidi(mth) - end - send_status(active_call, OK, 'OK') - rescue BadStatus => e - # this is raised by handlers that want GRPC to send an application - # error code and detail message. - logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}") - send_status(active_call, e.code, e.details) - rescue Core::CallError => e - # This is raised by GRPC internals but should rarely, if ever happen. - # Log it, but don't notify the other endpoint.. - logger.warn("failed call: #{active_call}\n#{e}") - rescue OutOfTime - # This is raised when active_call#method.call exceeeds the deadline - # event. Send a status of deadline exceeded - logger.warn("late call: #{active_call}") - send_status(active_call, DEADLINE_EXCEEDED, 'late') - rescue Core::EventError => e - # This is raised by GRPC internals but should rarely, if ever happen. - # Log it, but don't notify the other endpoint.. - logger.warn("failed call: #{active_call}\n#{e}") - rescue StandardError => e - # This will usuaally be an unhandled error in the handling code. - # Send back a UNKNOWN status to the client - logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") - logger.warn(e) - send_status(active_call, UNKNOWN, 'no reason given') + def run_server_method(active_call, mth) + # While a server method is running, it might be cancelled, its deadline + # might be reached, the handler could throw an unknown error, or a + # well-behaved handler could throw a StatusError. + if request_response? + req = active_call.remote_read + resp = mth.call(req, active_call.single_req_view) + active_call.remote_send(resp) + elsif client_streamer? + resp = mth.call(active_call.multi_req_view) + active_call.remote_send(resp) + elsif server_streamer? + req = active_call.remote_read + replys = mth.call(req, active_call.single_req_view) + replys.each { |r| active_call.remote_send(r) } + else # is a bidi_stream + active_call.run_server_bidi(mth) end + send_status(active_call, OK, 'OK') + rescue BadStatus => e + # this is raised by handlers that want GRPC to send an application + # error code and detail message. + logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}") + send_status(active_call, e.code, e.details) + rescue Core::CallError => e + # This is raised by GRPC internals but should rarely, if ever happen. + # Log it, but don't notify the other endpoint.. + logger.warn("failed call: #{active_call}\n#{e}") + rescue OutOfTime + # This is raised when active_call#method.call exceeeds the deadline + # event. Send a status of deadline exceeded + logger.warn("late call: #{active_call}") + send_status(active_call, DEADLINE_EXCEEDED, 'late') + rescue Core::EventError => e + # This is raised by GRPC internals but should rarely, if ever happen. + # Log it, but don't notify the other endpoint.. + logger.warn("failed call: #{active_call}\n#{e}") + rescue StandardError => e + # This will usuaally be an unhandled error in the handling code. + # Send back a UNKNOWN status to the client + logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") + logger.warn(e) + send_status(active_call, UNKNOWN, 'no reason given') + end - def assert_arity_matches(mth) - if request_response? || server_streamer? - if mth.arity != 2 - fail arity_error(mth, 2, "should be #{mth.name}(req, call)") - end - else - if mth.arity != 1 - fail arity_error(mth, 1, "should be #{mth.name}(call)") - end + def assert_arity_matches(mth) + if request_response? || server_streamer? + if mth.arity != 2 + fail arity_error(mth, 2, "should be #{mth.name}(req, call)") + end + else + if mth.arity != 1 + fail arity_error(mth, 1, "should be #{mth.name}(call)") end end + end - def request_response? - !input.is_a?(Stream) && !output.is_a?(Stream) - end + def request_response? + !input.is_a?(Stream) && !output.is_a?(Stream) + end - def client_streamer? - input.is_a?(Stream) && !output.is_a?(Stream) - end + def client_streamer? + input.is_a?(Stream) && !output.is_a?(Stream) + end - def server_streamer? - !input.is_a?(Stream) && output.is_a?(Stream) - end + def server_streamer? + !input.is_a?(Stream) && output.is_a?(Stream) + end - def bidi_streamer? - input.is_a?(Stream) && output.is_a?(Stream) - end + def bidi_streamer? + input.is_a?(Stream) && output.is_a?(Stream) + end - def arity_error(mth, want, msg) - "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" - end + def arity_error(mth, want, msg) + "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" + end - def send_status(active_client, code, details) - details = 'Not sure why' if details.nil? - active_client.send_status(code, details) - rescue StandardError => e - logger.warn("Could not send status #{code}:#{details}") - logger.warn(e) - end + def send_status(active_client, code, details) + details = 'Not sure why' if details.nil? + active_client.send_status(code, details) + rescue StandardError => e + logger.warn("Could not send status #{code}:#{details}") + logger.warn(e) end end end diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 40c5ec118e3..dd40eba57f0 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -33,372 +33,370 @@ require 'grpc/generic/service' require 'thread' require 'xray/thread_dump_signal_handler' -module Google - # Google::RPC contains the General RPC module. - module RPC - # RpcServer hosts a number of services and makes them available on the - # network. - class RpcServer - include Core::CompletionType - include Core::TimeConsts - extend ::Forwardable - - def_delegators :@server, :add_http2_port - - # Default thread pool size is 3 - DEFAULT_POOL_SIZE = 3 - - # Default max_waiting_requests size is 20 - DEFAULT_MAX_WAITING_REQUESTS = 20 - - # Creates a new RpcServer. - # - # The RPC server is configured using keyword arguments. - # - # There are some specific keyword args used to configure the RpcServer - # instance, however other arbitrary are allowed and when present are used - # to configure the listeninng connection set up by the RpcServer. - # - # * server_override: which if passed must be a [GRPC::Core::Server]. When - # present. - # - # * poll_period: when present, the server polls for new events with this - # period - # - # * pool_size: the size of the thread pool the server uses to run its - # threads - # - # * completion_queue_override: when supplied, this will be used as the - # completion_queue that the server uses to receive network events, - # otherwise its creates a new instance itself - # - # * creds: [GRPC::Core::ServerCredentials] - # the credentials used to secure the server - # - # * max_waiting_requests: the maximum number of requests that are not - # being handled to allow. When this limit is exceeded, the server responds - # with not available to new requests - def initialize(pool_size:DEFAULT_POOL_SIZE, - max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, - poll_period:INFINITE_FUTURE, - completion_queue_override:nil, - creds:nil, - server_override:nil, - **kw) - if completion_queue_override.nil? - cq = Core::CompletionQueue.new - else - cq = completion_queue_override - unless cq.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') - end +# GRPC contains the General RPC module. +module GRPC + # RpcServer hosts a number of services and makes them available on the + # network. + class RpcServer + include Core::CompletionType + include Core::TimeConsts + extend ::Forwardable + + def_delegators :@server, :add_http2_port + + # Default thread pool size is 3 + DEFAULT_POOL_SIZE = 3 + + # Default max_waiting_requests size is 20 + DEFAULT_MAX_WAITING_REQUESTS = 20 + + # Creates a new RpcServer. + # + # The RPC server is configured using keyword arguments. + # + # There are some specific keyword args used to configure the RpcServer + # instance, however other arbitrary are allowed and when present are used + # to configure the listeninng connection set up by the RpcServer. + # + # * server_override: which if passed must be a [GRPC::Core::Server]. When + # present. + # + # * poll_period: when present, the server polls for new events with this + # period + # + # * pool_size: the size of the thread pool the server uses to run its + # threads + # + # * completion_queue_override: when supplied, this will be used as the + # completion_queue that the server uses to receive network events, + # otherwise its creates a new instance itself + # + # * creds: [GRPC::Core::ServerCredentials] + # the credentials used to secure the server + # + # * max_waiting_requests: the maximum number of requests that are not + # being handled to allow. When this limit is exceeded, the server responds + # with not available to new requests + def initialize(pool_size:DEFAULT_POOL_SIZE, + max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, + poll_period:INFINITE_FUTURE, + completion_queue_override:nil, + creds:nil, + server_override:nil, + **kw) + if completion_queue_override.nil? + cq = Core::CompletionQueue.new + else + cq = completion_queue_override + unless cq.is_a? Core::CompletionQueue + fail(ArgumentError, 'not a CompletionQueue') end - @cq = cq + end + @cq = cq - if server_override.nil? - if creds.nil? - srv = Core::Server.new(@cq, kw) - elsif !creds.is_a? Core::ServerCredentials - fail(ArgumentError, 'not a ServerCredentials') - else - srv = Core::Server.new(@cq, kw, creds) - end + if server_override.nil? + if creds.nil? + srv = Core::Server.new(@cq, kw) + elsif !creds.is_a? Core::ServerCredentials + fail(ArgumentError, 'not a ServerCredentials') else - srv = server_override - fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server + srv = Core::Server.new(@cq, kw, creds) end - @server = srv - - @pool_size = pool_size - @max_waiting_requests = max_waiting_requests - @poll_period = poll_period - @run_mutex = Mutex.new - @run_cond = ConditionVariable.new - @pool = Pool.new(@pool_size) + else + srv = server_override + fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server end + @server = srv + + @pool_size = pool_size + @max_waiting_requests = max_waiting_requests + @poll_period = poll_period + @run_mutex = Mutex.new + @run_cond = ConditionVariable.new + @pool = Pool.new(@pool_size) + end - # stops a running server - # - # the call has no impact if the server is already stopped, otherwise - # server's current call loop is it's last. - def stop - return unless @running - @stopped = true - @pool.stop - end + # stops a running server + # + # the call has no impact if the server is already stopped, otherwise + # server's current call loop is it's last. + def stop + return unless @running + @stopped = true + @pool.stop + end - # determines if the server is currently running - def running? - @running ||= false - end + # determines if the server is currently running + def running? + @running ||= false + end - # Is called from other threads to wait for #run to start up the server. - # - # If run has not been called, this returns immediately. - # - # @param timeout [Numeric] number of seconds to wait - # @result [true, false] true if the server is running, false otherwise - def wait_till_running(timeout = 0.1) - end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100 - while Time.now < end_time - @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running? - sleep(sleep_period) - end - running? + # Is called from other threads to wait for #run to start up the server. + # + # If run has not been called, this returns immediately. + # + # @param timeout [Numeric] number of seconds to wait + # @result [true, false] true if the server is running, false otherwise + def wait_till_running(timeout = 0.1) + end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100 + while Time.now < end_time + @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running? + sleep(sleep_period) end + running? + end - # determines if the server is currently stopped - def stopped? - @stopped ||= false - end + # determines if the server is currently stopped + def stopped? + @stopped ||= false + end - # handle registration of classes - # - # service is either a class that includes GRPC::GenericService and whose - # #new function can be called without argument or any instance of such a - # class. - # - # E.g, after - # - # class Divider - # include GRPC::GenericService - # rpc :div DivArgs, DivReply # single request, single response - # def initialize(optional_arg='default option') # no args - # ... - # end - # - # srv = GRPC::RpcServer.new(...) - # - # # Either of these works - # - # srv.handle(Divider) - # - # # or - # - # srv.handle(Divider.new('replace optional arg')) - # - # It raises RuntimeError: - # - if service is not valid service class or object - # - its handler methods are already registered - # - if the server is already running - # - # @param service [Object|Class] a service class or object as described - # above - def handle(service) - fail 'cannot add services if the server is running' if running? - fail 'cannot add services if the server is stopped' if stopped? - cls = service.is_a?(Class) ? service : service.class - assert_valid_service_class(cls) - add_rpc_descs_for(service) - end + # handle registration of classes + # + # service is either a class that includes GRPC::GenericService and whose + # #new function can be called without argument or any instance of such a + # class. + # + # E.g, after + # + # class Divider + # include GRPC::GenericService + # rpc :div DivArgs, DivReply # single request, single response + # def initialize(optional_arg='default option') # no args + # ... + # end + # + # srv = GRPC::RpcServer.new(...) + # + # # Either of these works + # + # srv.handle(Divider) + # + # # or + # + # srv.handle(Divider.new('replace optional arg')) + # + # It raises RuntimeError: + # - if service is not valid service class or object + # - its handler methods are already registered + # - if the server is already running + # + # @param service [Object|Class] a service class or object as described + # above + def handle(service) + fail 'cannot add services if the server is running' if running? + fail 'cannot add services if the server is stopped' if stopped? + cls = service.is_a?(Class) ? service : service.class + assert_valid_service_class(cls) + add_rpc_descs_for(service) + end - # runs the server - # - # - if no rpc_descs are registered, this exits immediately, otherwise it - # continues running permanently and does not return until program exit. - # - # - #running? returns true after this is called, until #stop cause the - # the server to stop. - def run - if rpc_descs.size == 0 - logger.warn('did not run as no services were present') - return - end - @run_mutex.synchronize do - @running = true - @run_cond.signal + # runs the server + # + # - if no rpc_descs are registered, this exits immediately, otherwise it + # continues running permanently and does not return until program exit. + # + # - #running? returns true after this is called, until #stop cause the + # the server to stop. + def run + if rpc_descs.size == 0 + logger.warn('did not run as no services were present') + return + end + @run_mutex.synchronize do + @running = true + @run_cond.signal + end + @pool.start + @server.start + server_tag = Object.new + until stopped? + @server.request_call(server_tag) + ev = @cq.pluck(server_tag, @poll_period) + next if ev.nil? + if ev.type != SERVER_RPC_NEW + logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}") + ev.close + next end - @pool.start - @server.start - server_tag = Object.new - until stopped? - @server.request_call(server_tag) - ev = @cq.pluck(server_tag, @poll_period) - next if ev.nil? - if ev.type != SERVER_RPC_NEW - logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}") - ev.close - next - end - c = new_active_server_call(ev.call, ev.result) - unless c.nil? - mth = ev.result.method.to_sym - ev.close - @pool.schedule(c) do |call| - rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) - end + c = new_active_server_call(ev.call, ev.result) + unless c.nil? + mth = ev.result.method.to_sym + ev.close + @pool.schedule(c) do |call| + rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) end end - @running = false end + @running = false + end - def new_active_server_call(call, new_server_rpc) - # Accept the call. This is necessary even if a status is to be sent - # back immediately - finished_tag = Object.new - call_queue = Core::CompletionQueue.new - call.metadata = new_server_rpc.metadata # store the metadata - call.server_accept(call_queue, finished_tag) - call.server_end_initial_metadata - - # Send UNAVAILABLE if there are too many unprocessed jobs - jobs_count, max = @pool.jobs_waiting, @max_waiting_requests - logger.info("waiting: #{jobs_count}, max: #{max}") - if @pool.jobs_waiting > @max_waiting_requests - logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}") - noop = proc { |x| x } - c = ActiveCall.new(call, call_queue, noop, noop, - new_server_rpc.deadline, - finished_tag: finished_tag) - c.send_status(StatusCodes::UNAVAILABLE, '') - return nil - end - - # Send NOT_FOUND if the method does not exist - mth = new_server_rpc.method.to_sym - unless rpc_descs.key?(mth) - logger.warn("NOT_FOUND: #{new_server_rpc}") - noop = proc { |x| x } - c = ActiveCall.new(call, call_queue, noop, noop, - new_server_rpc.deadline, - finished_tag: finished_tag) - c.send_status(StatusCodes::NOT_FOUND, '') - return nil - end + def new_active_server_call(call, new_server_rpc) + # Accept the call. This is necessary even if a status is to be sent + # back immediately + finished_tag = Object.new + call_queue = Core::CompletionQueue.new + call.metadata = new_server_rpc.metadata # store the metadata + call.server_accept(call_queue, finished_tag) + call.server_end_initial_metadata + + # Send UNAVAILABLE if there are too many unprocessed jobs + jobs_count, max = @pool.jobs_waiting, @max_waiting_requests + logger.info("waiting: #{jobs_count}, max: #{max}") + if @pool.jobs_waiting > @max_waiting_requests + logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}") + noop = proc { |x| x } + c = ActiveCall.new(call, call_queue, noop, noop, + new_server_rpc.deadline, + finished_tag: finished_tag) + c.send_status(StatusCodes::UNAVAILABLE, '') + return nil + end - # Create the ActiveCall - rpc_desc = rpc_descs[mth] - logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})") - ActiveCall.new(call, call_queue, - rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), - new_server_rpc.deadline, finished_tag: finished_tag) + # Send NOT_FOUND if the method does not exist + mth = new_server_rpc.method.to_sym + unless rpc_descs.key?(mth) + logger.warn("NOT_FOUND: #{new_server_rpc}") + noop = proc { |x| x } + c = ActiveCall.new(call, call_queue, noop, noop, + new_server_rpc.deadline, + finished_tag: finished_tag) + c.send_status(StatusCodes::NOT_FOUND, '') + return nil end - # Pool is a simple thread pool for running server requests. - class Pool - def initialize(size) - fail 'pool size must be positive' unless size > 0 - @jobs = Queue.new - @size = size - @stopped = false - @stop_mutex = Mutex.new - @stop_cond = ConditionVariable.new - @workers = [] - end + # Create the ActiveCall + rpc_desc = rpc_descs[mth] + logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})") + ActiveCall.new(call, call_queue, + rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), + new_server_rpc.deadline, finished_tag: finished_tag) + end - # Returns the number of jobs waiting - def jobs_waiting - @jobs.size - end + # Pool is a simple thread pool for running server requests. + class Pool + def initialize(size) + fail 'pool size must be positive' unless size > 0 + @jobs = Queue.new + @size = size + @stopped = false + @stop_mutex = Mutex.new + @stop_cond = ConditionVariable.new + @workers = [] + end - # Runs the given block on the queue with the provided args. - # - # @param args the args passed blk when it is called - # @param blk the block to call - def schedule(*args, &blk) - fail 'already stopped' if @stopped - return if blk.nil? - logger.info('schedule another job') - @jobs << [blk, args] - end + # Returns the number of jobs waiting + def jobs_waiting + @jobs.size + end + + # Runs the given block on the queue with the provided args. + # + # @param args the args passed blk when it is called + # @param blk the block to call + def schedule(*args, &blk) + fail 'already stopped' if @stopped + return if blk.nil? + logger.info('schedule another job') + @jobs << [blk, args] + end - # Starts running the jobs in the thread pool. - def start - fail 'already stopped' if @stopped - until @workers.size == @size.to_i - next_thread = Thread.new do - catch(:exit) do # allows { throw :exit } to kill a thread - loop do - begin - blk, args = @jobs.pop - blk.call(*args) - rescue StandardError => e - logger.warn('Error in worker thread') - logger.warn(e) - end + # Starts running the jobs in the thread pool. + def start + fail 'already stopped' if @stopped + until @workers.size == @size.to_i + next_thread = Thread.new do + catch(:exit) do # allows { throw :exit } to kill a thread + loop do + begin + blk, args = @jobs.pop + blk.call(*args) + rescue StandardError => e + logger.warn('Error in worker thread') + logger.warn(e) end end + end - # removes the threads from workers, and signal when all the - # threads are complete. - @stop_mutex.synchronize do - @workers.delete(Thread.current) - @stop_cond.signal if @workers.size == 0 - end + # removes the threads from workers, and signal when all the + # threads are complete. + @stop_mutex.synchronize do + @workers.delete(Thread.current) + @stop_cond.signal if @workers.size == 0 end - @workers << next_thread end + @workers << next_thread end + end - # Stops the jobs in the pool - def stop - logger.info('stopping, will wait for all the workers to exit') - @workers.size.times { schedule { throw :exit } } - @stopped = true + # Stops the jobs in the pool + def stop + logger.info('stopping, will wait for all the workers to exit') + @workers.size.times { schedule { throw :exit } } + @stopped = true - # TODO: allow configuration of the keepalive period - keep_alive = 5 - @stop_mutex.synchronize do - @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0 - end + # TODO: allow configuration of the keepalive period + keep_alive = 5 + @stop_mutex.synchronize do + @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0 + end - # Forcibly shutdown any threads that are still alive. - if @workers.size > 0 - logger.warn("forcibly terminating #{@workers.size} worker(s)") - @workers.each do |t| - next unless t.alive? - begin - t.exit - rescue StandardError => e - logger.warn('error while terminating a worker') - logger.warn(e) - end + # Forcibly shutdown any threads that are still alive. + if @workers.size > 0 + logger.warn("forcibly terminating #{@workers.size} worker(s)") + @workers.each do |t| + next unless t.alive? + begin + t.exit + rescue StandardError => e + logger.warn('error while terminating a worker') + logger.warn(e) end end - - logger.info('stopped, all workers are shutdown') end + + logger.info('stopped, all workers are shutdown') end + end - protected + protected - def rpc_descs - @rpc_descs ||= {} - end + def rpc_descs + @rpc_descs ||= {} + end - def rpc_handlers - @rpc_handlers ||= {} - end + def rpc_handlers + @rpc_handlers ||= {} + end - private + private - def assert_valid_service_class(cls) - unless cls.include?(GenericService) - fail "#{cls} should 'include GenericService'" - end - if cls.rpc_descs.size == 0 - fail "#{cls} should specify some rpc descriptions" - end - cls.assert_rpc_descs_have_methods + def assert_valid_service_class(cls) + unless cls.include?(GenericService) + fail "#{cls} should 'include GenericService'" + end + if cls.rpc_descs.size == 0 + fail "#{cls} should specify some rpc descriptions" end + cls.assert_rpc_descs_have_methods + end - def add_rpc_descs_for(service) - cls = service.is_a?(Class) ? service : service.class - specs = rpc_descs - handlers = rpc_handlers - cls.rpc_descs.each_pair do |name, spec| - route = "/#{cls.service_name}/#{name}".to_sym - if specs.key? route - fail "Cannot add rpc #{route} from #{spec}, already registered" + def add_rpc_descs_for(service) + cls = service.is_a?(Class) ? service : service.class + specs = rpc_descs + handlers = rpc_handlers + cls.rpc_descs.each_pair do |name, spec| + route = "/#{cls.service_name}/#{name}".to_sym + if specs.key? route + fail "Cannot add rpc #{route} from #{spec}, already registered" + else + specs[route] = spec + if service.is_a?(Class) + handlers[route] = cls.new.method(name.to_s.underscore.to_sym) else - specs[route] = spec - if service.is_a?(Class) - handlers[route] = cls.new.method(name.to_s.underscore.to_sym) - else - handlers[route] = service.method(name.to_s.underscore.to_sym) - end - logger.info("handling #{route} with #{handlers[route]}") + handlers[route] = service.method(name.to_s.underscore.to_sym) end + logger.info("handling #{route} with #{handlers[route]}") end end end diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb index ff37617ccfa..f0b85a0fa20 100644 --- a/src/ruby/lib/grpc/generic/service.rb +++ b/src/ruby/lib/grpc/generic/service.rb @@ -48,188 +48,186 @@ class String end end -module Google - # Google::RPC contains the General RPC module. - module RPC - # Provides behaviour used to implement schema-derived service classes. - # - # Is intended to be used to support both client and server - # IDL-schema-derived servers. - module GenericService - # Used to indicate that a name has already been specified - class DuplicateRpcName < StandardError - def initialize(name) - super("rpc (#{name}) is already defined") - end +# GRPC contains the General RPC module. +module GRPC + # Provides behaviour used to implement schema-derived service classes. + # + # Is intended to be used to support both client and server + # IDL-schema-derived servers. + module GenericService + # Used to indicate that a name has already been specified + class DuplicateRpcName < StandardError + def initialize(name) + super("rpc (#{name}) is already defined") end + end - # Provides a simple DSL to describe RPC services. + # Provides a simple DSL to describe RPC services. + # + # E.g, a Maths service that uses the serializable messages DivArgs, + # DivReply and Num might define its endpoint uses the following way: + # + # rpc :div DivArgs, DivReply # single request, single response + # rpc :sum stream(Num), Num # streamed input, single response + # rpc :fib FibArgs, stream(Num) # single request, streamed response + # rpc :div_many stream(DivArgs), stream(DivReply) + # # streamed req and resp + # + # Each 'rpc' adds an RpcDesc to classes including this module, and + # #assert_rpc_descs_have_methods is used to ensure the including class + # provides methods with signatures that support all the descriptors. + module Dsl + # This configures the method names that the serializable message + # implementation uses to marshal and unmarshal messages. # - # E.g, a Maths service that uses the serializable messages DivArgs, - # DivReply and Num might define its endpoint uses the following way: + # - unmarshal_class method must be a class method on the serializable + # message type that takes a string (byte stream) and produces and object # - # rpc :div DivArgs, DivReply # single request, single response - # rpc :sum stream(Num), Num # streamed input, single response - # rpc :fib FibArgs, stream(Num) # single request, streamed response - # rpc :div_many stream(DivArgs), stream(DivReply) - # # streamed req and resp + # - marshal_class_method is called on a serializable message instance + # and produces a serialized string. # - # Each 'rpc' adds an RpcDesc to classes including this module, and - # #assert_rpc_descs_have_methods is used to ensure the including class - # provides methods with signatures that support all the descriptors. - module Dsl - # This configures the method names that the serializable message - # implementation uses to marshal and unmarshal messages. - # - # - unmarshal_class method must be a class method on the serializable - # message type that takes a string (byte stream) and produces and object - # - # - marshal_class_method is called on a serializable message instance - # and produces a serialized string. - # - # The Dsl verifies that the types in the descriptor have both the - # unmarshal and marshal methods. - attr_writer(:marshal_class_method, :unmarshal_class_method) - - # This allows configuration of the service name. - attr_accessor(:service_name) - - # Adds an RPC spec. - # - # Takes the RPC name and the classes representing the types to be - # serialized, and adds them to the including classes rpc_desc hash. - # - # input and output should both have the methods #marshal and #unmarshal - # that are responsible for writing and reading an object instance from a - # byte buffer respectively. - # - # @param name [String] the name of the rpc - # @param input [Object] the input parameter's class - # @param output [Object] the output parameter's class - def rpc(name, input, output) - fail(DuplicateRpcName, name) if rpc_descs.key? name - assert_can_marshal(input) - assert_can_marshal(output) - rpc_descs[name] = RpcDesc.new(name, input, output, - marshal_class_method, - unmarshal_class_method) - end + # The Dsl verifies that the types in the descriptor have both the + # unmarshal and marshal methods. + attr_writer(:marshal_class_method, :unmarshal_class_method) - def inherited(subclass) - # Each subclass should have a distinct class variable with its own - # rpc_descs - subclass.rpc_descs.merge!(rpc_descs) - subclass.service_name = service_name - end + # This allows configuration of the service name. + attr_accessor(:service_name) - # the name of the instance method used to marshal events to a byte - # stream. - def marshal_class_method - @marshal_class_method ||= :marshal - end + # Adds an RPC spec. + # + # Takes the RPC name and the classes representing the types to be + # serialized, and adds them to the including classes rpc_desc hash. + # + # input and output should both have the methods #marshal and #unmarshal + # that are responsible for writing and reading an object instance from a + # byte buffer respectively. + # + # @param name [String] the name of the rpc + # @param input [Object] the input parameter's class + # @param output [Object] the output parameter's class + def rpc(name, input, output) + fail(DuplicateRpcName, name) if rpc_descs.key? name + assert_can_marshal(input) + assert_can_marshal(output) + rpc_descs[name] = RpcDesc.new(name, input, output, + marshal_class_method, + unmarshal_class_method) + end - # the name of the class method used to unmarshal from a byte stream. - def unmarshal_class_method - @unmarshal_class_method ||= :unmarshal - end + def inherited(subclass) + # Each subclass should have a distinct class variable with its own + # rpc_descs + subclass.rpc_descs.merge!(rpc_descs) + subclass.service_name = service_name + end - def assert_can_marshal(cls) - cls = cls.type if cls.is_a? RpcDesc::Stream - mth = unmarshal_class_method - unless cls.methods.include? mth - fail(ArgumentError, "#{cls} needs #{cls}.#{mth}") - end - mth = marshal_class_method - return if cls.methods.include? mth + # the name of the instance method used to marshal events to a byte + # stream. + def marshal_class_method + @marshal_class_method ||= :marshal + end + + # the name of the class method used to unmarshal from a byte stream. + def unmarshal_class_method + @unmarshal_class_method ||= :unmarshal + end + + def assert_can_marshal(cls) + cls = cls.type if cls.is_a? RpcDesc::Stream + mth = unmarshal_class_method + unless cls.methods.include? mth fail(ArgumentError, "#{cls} needs #{cls}.#{mth}") end + mth = marshal_class_method + return if cls.methods.include? mth + fail(ArgumentError, "#{cls} needs #{cls}.#{mth}") + end - # @param cls [Class] the class of a serializable type - # @return cls wrapped in a RpcDesc::Stream - def stream(cls) - assert_can_marshal(cls) - RpcDesc::Stream.new(cls) - end + # @param cls [Class] the class of a serializable type + # @return cls wrapped in a RpcDesc::Stream + def stream(cls) + assert_can_marshal(cls) + RpcDesc::Stream.new(cls) + end - # the RpcDescs defined for this GenericService, keyed by name. - def rpc_descs - @rpc_descs ||= {} - end + # the RpcDescs defined for this GenericService, keyed by name. + def rpc_descs + @rpc_descs ||= {} + end - # Creates a rpc client class with methods for accessing the methods - # currently in rpc_descs. - def rpc_stub_class - descs = rpc_descs - route_prefix = service_name - Class.new(ClientStub) do - # @param host [String] the host the stub connects to - # @param kw [KeywordArgs] the channel arguments, plus any optional - # args for configuring the client's channel - def initialize(host, **kw) - super(host, Core::CompletionQueue.new, **kw) - end + # Creates a rpc client class with methods for accessing the methods + # currently in rpc_descs. + def rpc_stub_class + descs = rpc_descs + route_prefix = service_name + Class.new(ClientStub) do + # @param host [String] the host the stub connects to + # @param kw [KeywordArgs] the channel arguments, plus any optional + # args for configuring the client's channel + def initialize(host, **kw) + super(host, Core::CompletionQueue.new, **kw) + end - # Used define_method to add a method for each rpc_desc. Each method - # calls the base class method for the given descriptor. - descs.each_pair do |name, desc| - mth_name = name.to_s.underscore.to_sym - marshal = desc.marshal_proc - unmarshal = desc.unmarshal_proc(:output) - route = "/#{route_prefix}/#{name}" - if desc.request_response? - define_method(mth_name) do |req, deadline = nil| - logger.debug("calling #{@host}:#{route}") - request_response(route, req, marshal, unmarshal, deadline) - end - elsif desc.client_streamer? - define_method(mth_name) do |reqs, deadline = nil| - logger.debug("calling #{@host}:#{route}") - client_streamer(route, reqs, marshal, unmarshal, deadline) - end - elsif desc.server_streamer? - define_method(mth_name) do |req, deadline = nil, &blk| - logger.debug("calling #{@host}:#{route}") - server_streamer(route, req, marshal, unmarshal, deadline, - &blk) - end - else # is a bidi_stream - define_method(mth_name) do |reqs, deadline = nil, &blk| - logger.debug("calling #{@host}:#{route}") - bidi_streamer(route, reqs, marshal, unmarshal, deadline, &blk) - end + # Used define_method to add a method for each rpc_desc. Each method + # calls the base class method for the given descriptor. + descs.each_pair do |name, desc| + mth_name = name.to_s.underscore.to_sym + marshal = desc.marshal_proc + unmarshal = desc.unmarshal_proc(:output) + route = "/#{route_prefix}/#{name}" + if desc.request_response? + define_method(mth_name) do |req, deadline = nil| + logger.debug("calling #{@host}:#{route}") + request_response(route, req, marshal, unmarshal, deadline) + end + elsif desc.client_streamer? + define_method(mth_name) do |reqs, deadline = nil| + logger.debug("calling #{@host}:#{route}") + client_streamer(route, reqs, marshal, unmarshal, deadline) + end + elsif desc.server_streamer? + define_method(mth_name) do |req, deadline = nil, &blk| + logger.debug("calling #{@host}:#{route}") + server_streamer(route, req, marshal, unmarshal, deadline, + &blk) + end + else # is a bidi_stream + define_method(mth_name) do |reqs, deadline = nil, &blk| + logger.debug("calling #{@host}:#{route}") + bidi_streamer(route, reqs, marshal, unmarshal, deadline, &blk) end end end end + end - # Asserts that the appropriate methods are defined for each added rpc - # spec. Is intended to aid verifying that server classes are correctly - # implemented. - def assert_rpc_descs_have_methods - rpc_descs.each_pair do |m, spec| - mth_name = m.to_s.underscore.to_sym - unless instance_methods.include?(mth_name) - fail "#{self} does not provide instance method '#{mth_name}'" - end - spec.assert_arity_matches(instance_method(mth_name)) + # Asserts that the appropriate methods are defined for each added rpc + # spec. Is intended to aid verifying that server classes are correctly + # implemented. + def assert_rpc_descs_have_methods + rpc_descs.each_pair do |m, spec| + mth_name = m.to_s.underscore.to_sym + unless instance_methods.include?(mth_name) + fail "#{self} does not provide instance method '#{mth_name}'" end + spec.assert_arity_matches(instance_method(mth_name)) end end + end - def self.included(o) - o.extend(Dsl) - # Update to the use the service name including module. Proivde a default - # that can be nil e,g. when modules are declared dynamically. - return unless o.service_name.nil? - if o.name.nil? - o.service_name = 'GenericService' + def self.included(o) + o.extend(Dsl) + # Update to the use the service name including module. Proivde a default + # that can be nil e,g. when modules are declared dynamically. + return unless o.service_name.nil? + if o.name.nil? + o.service_name = 'GenericService' + else + modules = o.name.split('::') + if modules.length > 2 + o.service_name = modules[modules.length - 2] else - modules = o.name.split('::') - if modules.length > 2 - o.service_name = modules[modules.length - 2] - else - o.service_name = modules.first - end + o.service_name = modules.first end end end diff --git a/src/ruby/lib/grpc/logconfig.rb b/src/ruby/lib/grpc/logconfig.rb index 6442f23e895..cf280feae64 100644 --- a/src/ruby/lib/grpc/logconfig.rb +++ b/src/ruby/lib/grpc/logconfig.rb @@ -35,6 +35,6 @@ Logging.logger.root.appenders = Logging.appenders.stdout Logging.logger.root.level = :info # TODO: provide command-line configuration for logging -Logging.logger['Google::RPC'].level = :debug -Logging.logger['Google::RPC::ActiveCall'].level = :info -Logging.logger['Google::RPC::BidiCall'].level = :info +Logging.logger['GRPC'].level = :debug +Logging.logger['GRPC::ActiveCall'].level = :info +Logging.logger['GRPC::BidiCall'].level = :info diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index dd526e583a6..0107bc4ada0 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -27,9 +27,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -module Google - # Google::RPC contains the General RPC module. - module RPC - VERSION = '0.0.1' - end +# GRPC contains the General RPC module. +module GRPC + VERSION = '0.0.1' end diff --git a/src/ruby/spec/auth/compute_engine_spec.rb b/src/ruby/spec/auth/compute_engine_spec.rb index 9e0b4660fa5..c43214d0861 100644 --- a/src/ruby/spec/auth/compute_engine_spec.rb +++ b/src/ruby/spec/auth/compute_engine_spec.rb @@ -36,9 +36,9 @@ require 'faraday' require 'grpc/auth/compute_engine' require 'spec_helper' -describe Google::RPC::Auth::GCECredentials do +describe GRPC::Auth::GCECredentials do MD_URI = '/computeMetadata/v1/instance/service-accounts/default/token' - GCECredentials = Google::RPC::Auth::GCECredentials + GCECredentials = GRPC::Auth::GCECredentials before(:example) do @client = GCECredentials.new diff --git a/src/ruby/spec/auth/service_account_spec.rb b/src/ruby/spec/auth/service_account_spec.rb index cbc6a73ac20..2f14a1ae053 100644 --- a/src/ruby/spec/auth/service_account_spec.rb +++ b/src/ruby/spec/auth/service_account_spec.rb @@ -38,7 +38,7 @@ require 'multi_json' require 'openssl' require 'spec_helper' -describe Google::RPC::Auth::ServiceAccountCredentials do +describe GRPC::Auth::ServiceAccountCredentials do before(:example) do @key = OpenSSL::PKey::RSA.new(2048) cred_json = { @@ -49,7 +49,7 @@ describe Google::RPC::Auth::ServiceAccountCredentials do type: 'service_account' } cred_json_text = MultiJson.dump(cred_json) - @client = Google::RPC::Auth::ServiceAccountCredentials.new( + @client = GRPC::Auth::ServiceAccountCredentials.new( 'https://www.googleapis.com/auth/userinfo.profile', StringIO.new(cred_json_text)) end