|
|
|
@ -37,7 +37,7 @@ |
|
|
|
|
* This module contains the factory method for creating Client classes, and the |
|
|
|
|
* method calling code for all types of methods. |
|
|
|
|
* |
|
|
|
|
* For example, to create a client and call a method on it: |
|
|
|
|
* @example <caption>Create a client and call a method on it</caption> |
|
|
|
|
* |
|
|
|
|
* var proto_obj = grpc.load(proto_file_path); |
|
|
|
|
* var Client = proto_obj.package.subpackage.ServiceName; |
|
|
|
@ -68,14 +68,33 @@ var Duplex = stream.Duplex; |
|
|
|
|
var util = require('util'); |
|
|
|
|
var version = require('../../../package.json').version; |
|
|
|
|
|
|
|
|
|
util.inherits(ClientUnaryCall, EventEmitter); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* An EventEmitter. Used for unary calls |
|
|
|
|
* @constructor |
|
|
|
|
* @extends external:EventEmitter |
|
|
|
|
* @param {grpc.Call} call The call object associated with the request |
|
|
|
|
*/ |
|
|
|
|
function ClientUnaryCall(call) { |
|
|
|
|
EventEmitter.call(this); |
|
|
|
|
this.call = call; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
util.inherits(ClientWritableStream, Writable); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* A stream that the client can write to. Used for calls that are streaming from |
|
|
|
|
* the client side. |
|
|
|
|
* @constructor |
|
|
|
|
* @extends external:Writable |
|
|
|
|
* @borrows module:src/client~ClientUnaryCall#cancel as |
|
|
|
|
* module:src/client~ClientWritableStream#cancel |
|
|
|
|
* @borrows module:src/client~ClientUnaryCall#getPeer as |
|
|
|
|
* module:src/client~ClientWritableStream#getPeer |
|
|
|
|
* @param {grpc.Call} call The call object to send data with |
|
|
|
|
* @param {function(*):Buffer=} serialize Serialization function for writes. |
|
|
|
|
* @param {module:src/common~serialize=} [serialize=identity] Serialization |
|
|
|
|
* function for writes. |
|
|
|
|
*/ |
|
|
|
|
function ClientWritableStream(call, serialize) { |
|
|
|
|
Writable.call(this, {objectMode: true}); |
|
|
|
@ -134,8 +153,14 @@ util.inherits(ClientReadableStream, Readable); |
|
|
|
|
* A stream that the client can read from. Used for calls that are streaming |
|
|
|
|
* from the server side. |
|
|
|
|
* @constructor |
|
|
|
|
* @extends external:Readable |
|
|
|
|
* @borrows module:src/client~ClientUnaryCall#cancel as |
|
|
|
|
* module:src/client~ClientReadableStream#cancel |
|
|
|
|
* @borrows module:src/client~ClientUnaryCall#getPeer as |
|
|
|
|
* module:src/client~ClientReadableStream#getPeer |
|
|
|
|
* @param {grpc.Call} call The call object to read data with |
|
|
|
|
* @param {function(Buffer):*=} deserialize Deserialization function for reads |
|
|
|
|
* @param {module:src/common~deserialize=} [deserialize=identity] |
|
|
|
|
* Deserialization function for reads |
|
|
|
|
*/ |
|
|
|
|
function ClientReadableStream(call, deserialize) { |
|
|
|
|
Readable.call(this, {objectMode: true}); |
|
|
|
@ -155,6 +180,7 @@ function ClientReadableStream(call, deserialize) { |
|
|
|
|
* parameter indicates that the call should end with that status. status |
|
|
|
|
* defaults to OK if not provided. |
|
|
|
|
* @param {Object!} status The status that the call should end with |
|
|
|
|
* @access private |
|
|
|
|
*/ |
|
|
|
|
function _readsDone(status) { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
@ -173,6 +199,7 @@ ClientReadableStream.prototype._readsDone = _readsDone; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Called to indicate that we have received a status from the server. |
|
|
|
|
* @access private |
|
|
|
|
*/ |
|
|
|
|
function _receiveStatus(status) { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
@ -185,6 +212,7 @@ ClientReadableStream.prototype._receiveStatus = _receiveStatus; |
|
|
|
|
/** |
|
|
|
|
* If we have both processed all incoming messages and received the status from |
|
|
|
|
* the server, emit the status. Otherwise, do nothing. |
|
|
|
|
* @access private |
|
|
|
|
*/ |
|
|
|
|
function _emitStatusIfDone() { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
@ -270,10 +298,16 @@ util.inherits(ClientDuplexStream, Duplex); |
|
|
|
|
* A stream that the client can read from or write to. Used for calls with |
|
|
|
|
* duplex streaming. |
|
|
|
|
* @constructor |
|
|
|
|
* @extends external:Duplex |
|
|
|
|
* @borrows module:src/client~ClientUnaryCall#cancel as |
|
|
|
|
* module:src/client~ClientDuplexStream#cancel |
|
|
|
|
* @borrows module:src/client~ClientUnaryCall#getPeer as |
|
|
|
|
* module:src/client~ClientDuplexStream#getPeer |
|
|
|
|
* @param {grpc.Call} call Call object to proxy |
|
|
|
|
* @param {function(*):Buffer=} serialize Serialization function for requests |
|
|
|
|
* @param {function(Buffer):*=} deserialize Deserialization function for |
|
|
|
|
* responses |
|
|
|
|
* @param {module:src/common~serialize=} [serialize=identity] Serialization |
|
|
|
|
* function for requests |
|
|
|
|
* @param {module:src/common~deserialize=} [deserialize=identity] |
|
|
|
|
* Deserialization function for responses |
|
|
|
|
*/ |
|
|
|
|
function ClientDuplexStream(call, serialize, deserialize) { |
|
|
|
|
Duplex.call(this, {objectMode: true}); |
|
|
|
@ -300,12 +334,14 @@ ClientDuplexStream.prototype._write = _write; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Cancel the ongoing call |
|
|
|
|
* @alias module:src/client~ClientUnaryCall#cancel |
|
|
|
|
*/ |
|
|
|
|
function cancel() { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
|
this.call.cancel(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ClientUnaryCall.prototype.cancel = cancel; |
|
|
|
|
ClientReadableStream.prototype.cancel = cancel; |
|
|
|
|
ClientWritableStream.prototype.cancel = cancel; |
|
|
|
|
ClientDuplexStream.prototype.cancel = cancel; |
|
|
|
@ -313,21 +349,49 @@ ClientDuplexStream.prototype.cancel = cancel; |
|
|
|
|
/** |
|
|
|
|
* Get the endpoint this call/stream is connected to. |
|
|
|
|
* @return {string} The URI of the endpoint |
|
|
|
|
* @alias module:src/client~ClientUnaryCall#getPeer |
|
|
|
|
*/ |
|
|
|
|
function getPeer() { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
|
return this.call.getPeer(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ClientUnaryCall.prototype.getPeer = getPeer; |
|
|
|
|
ClientReadableStream.prototype.getPeer = getPeer; |
|
|
|
|
ClientWritableStream.prototype.getPeer = getPeer; |
|
|
|
|
ClientDuplexStream.prototype.getPeer = getPeer; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Any client call type |
|
|
|
|
* @typedef {(ClientUnaryCall|ClientReadableStream| |
|
|
|
|
* ClientWritableStream|ClientDuplexStream)} |
|
|
|
|
* module:src/client~Call |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Options that can be set on a call. |
|
|
|
|
* @typedef {Object} module:src/client~CallOptions |
|
|
|
|
* @property {(date|number)} deadline The deadline for the entire call to |
|
|
|
|
* complete. A value of Infinity indicates that no deadline should be set. |
|
|
|
|
* @property {(string)} host Server hostname to set on the call. Only meaningful |
|
|
|
|
* if different from the server address used to construct the client. |
|
|
|
|
* @property {module:src/client~Call} parent Parent call. Used in servers when |
|
|
|
|
* making a call as part of the process of handling a call. Used to |
|
|
|
|
* propagate some information automatically, as specified by |
|
|
|
|
* propagate_flags. |
|
|
|
|
* @property {number} propagate_flags Indicates which properties of a parent |
|
|
|
|
* call should propagate to this call. Bitwise combination of flags in |
|
|
|
|
* [grpc.propagate]{@link module:index.propagate}. |
|
|
|
|
* @property {module:src/credentials~CallCredentials} credentials The |
|
|
|
|
* credentials that should be used to make this particular call. |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Get a call object built with the provided options. Keys for options are |
|
|
|
|
* 'deadline', which takes a date or number, and 'host', which takes a string |
|
|
|
|
* and overrides the hostname to connect to. |
|
|
|
|
* @param {Object} options Options map. |
|
|
|
|
* @access private |
|
|
|
|
* @param {module:src/client~CallOptions=} options Options object. |
|
|
|
|
*/ |
|
|
|
|
function getCall(channel, method, options) { |
|
|
|
|
var deadline; |
|
|
|
@ -354,315 +418,380 @@ function getCall(channel, method, options) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Get a function that can make unary requests to the specified method. |
|
|
|
|
* @param {string} method The name of the method to request |
|
|
|
|
* @param {function(*):Buffer} serialize The serialization function for inputs |
|
|
|
|
* @param {function(Buffer)} deserialize The deserialization function for |
|
|
|
|
* outputs |
|
|
|
|
* @return {Function} makeUnaryRequest |
|
|
|
|
* A generic gRPC client. Primarily useful as a base class for generated clients |
|
|
|
|
* @alias module:src/client.Client |
|
|
|
|
* @constructor |
|
|
|
|
* @param {string} address Server address to connect to |
|
|
|
|
* @param {module:src/credentials~ChannelCredentials} credentials Credentials to |
|
|
|
|
* use to connect to the server |
|
|
|
|
* @param {Object} options Options to apply to channel creation |
|
|
|
|
*/ |
|
|
|
|
function makeUnaryRequestFunction(method, serialize, deserialize) { |
|
|
|
|
/** |
|
|
|
|
* Make a unary request with this method on the given channel with the given |
|
|
|
|
* argument, callback, etc. |
|
|
|
|
* @this {Client} Client object. Must have a channel member. |
|
|
|
|
* @param {*} argument The argument to the call. Should be serializable with |
|
|
|
|
* serialize |
|
|
|
|
* @param {Metadata=} metadata Metadata to add to the call |
|
|
|
|
* @param {Object=} options Options map |
|
|
|
|
* @param {function(?Error, value=)} callback The callback to for when the |
|
|
|
|
* response is received |
|
|
|
|
* @return {EventEmitter} An event emitter for stream related events |
|
|
|
|
var Client = exports.Client = function Client(address, credentials, options) { |
|
|
|
|
if (!options) { |
|
|
|
|
options = {}; |
|
|
|
|
} |
|
|
|
|
/* Append the grpc-node user agent string after the application user agent |
|
|
|
|
* string, and put the combination at the beginning of the user agent string |
|
|
|
|
*/ |
|
|
|
|
function makeUnaryRequest(argument, metadata, options, callback) { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
|
/* While the arguments are listed in the function signature, those variables |
|
|
|
|
* are not used directly. Instead, ArgueJS processes the arguments |
|
|
|
|
* object. This allows for simple handling of optional arguments in the |
|
|
|
|
* middle of the argument list, and also provides type checking. */ |
|
|
|
|
var args = arguejs({argument: null, metadata: [Metadata, new Metadata()], |
|
|
|
|
options: [Object], callback: Function}, arguments); |
|
|
|
|
var emitter = new EventEmitter(); |
|
|
|
|
var call = getCall(this.$channel, method, args.options); |
|
|
|
|
metadata = args.metadata.clone(); |
|
|
|
|
emitter.cancel = function cancel() { |
|
|
|
|
call.cancel(); |
|
|
|
|
}; |
|
|
|
|
emitter.getPeer = function getPeer() { |
|
|
|
|
return call.getPeer(); |
|
|
|
|
}; |
|
|
|
|
var client_batch = {}; |
|
|
|
|
var message = serialize(args.argument); |
|
|
|
|
if (args.options) { |
|
|
|
|
message.grpcWriteFlags = args.options.flags; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
client_batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
metadata._getCoreRepresentation(); |
|
|
|
|
client_batch[grpc.opType.SEND_MESSAGE] = message; |
|
|
|
|
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
|
|
|
|
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
|
|
|
|
client_batch[grpc.opType.RECV_MESSAGE] = true; |
|
|
|
|
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
|
|
|
|
call.startBatch(client_batch, function(err, response) { |
|
|
|
|
response.status.metadata = Metadata._fromCoreRepresentation( |
|
|
|
|
response.status.metadata); |
|
|
|
|
var status = response.status; |
|
|
|
|
var error; |
|
|
|
|
var deserialized; |
|
|
|
|
emitter.emit('metadata', Metadata._fromCoreRepresentation( |
|
|
|
|
response.metadata)); |
|
|
|
|
if (status.code === grpc.status.OK) { |
|
|
|
|
if (err) { |
|
|
|
|
// Got a batch error, but OK status. Something went wrong
|
|
|
|
|
args.callback(err); |
|
|
|
|
return; |
|
|
|
|
} else { |
|
|
|
|
try { |
|
|
|
|
deserialized = deserialize(response.read); |
|
|
|
|
} catch (e) { |
|
|
|
|
/* Change status to indicate bad server response. This will result |
|
|
|
|
* in passing an error to the callback */ |
|
|
|
|
status = { |
|
|
|
|
code: grpc.status.INTERNAL, |
|
|
|
|
details: 'Failed to parse server response' |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (status.code !== grpc.status.OK) { |
|
|
|
|
error = new Error(status.details); |
|
|
|
|
error.code = status.code; |
|
|
|
|
error.metadata = status.metadata; |
|
|
|
|
args.callback(error); |
|
|
|
|
} else { |
|
|
|
|
args.callback(null, deserialized); |
|
|
|
|
} |
|
|
|
|
emitter.emit('status', status); |
|
|
|
|
}); |
|
|
|
|
return emitter; |
|
|
|
|
if (options['grpc.primary_user_agent']) { |
|
|
|
|
options['grpc.primary_user_agent'] += ' '; |
|
|
|
|
} else { |
|
|
|
|
options['grpc.primary_user_agent'] = ''; |
|
|
|
|
} |
|
|
|
|
return makeUnaryRequest; |
|
|
|
|
} |
|
|
|
|
options['grpc.primary_user_agent'] += 'grpc-node/' + version; |
|
|
|
|
/* Private fields use $ as a prefix instead of _ because it is an invalid |
|
|
|
|
* prefix of a method name */ |
|
|
|
|
this.$channel = new grpc.Channel(address, credentials, options); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Get a function that can make client stream requests to the specified method. |
|
|
|
|
* @typedef {Error} module:src/client.Client~ServiceError |
|
|
|
|
* @property {number} code The error code, a key of |
|
|
|
|
* [grpc.status]{@link module:src/client.status} |
|
|
|
|
* @property {module:metadata.Metadata} metadata Metadata sent with the status |
|
|
|
|
* by the server, if any |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @callback module:src/client.Client~requestCallback |
|
|
|
|
* @param {?module:src/client.Client~ServiceError} error The error, if the call |
|
|
|
|
* failed |
|
|
|
|
* @param {*} value The response value, if the call succeeded |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Make a unary request to the given method, using the given serialize |
|
|
|
|
* and deserialize functions, with the given argument. |
|
|
|
|
* @param {string} method The name of the method to request |
|
|
|
|
* @param {function(*):Buffer} serialize The serialization function for inputs |
|
|
|
|
* @param {function(Buffer)} deserialize The deserialization function for |
|
|
|
|
* outputs |
|
|
|
|
* @return {Function} makeClientStreamRequest |
|
|
|
|
* @param {module:src/common~serialize} serialize The serialization function for |
|
|
|
|
* inputs |
|
|
|
|
* @param {module:src/common~deserialize} deserialize The deserialization |
|
|
|
|
* function for outputs |
|
|
|
|
* @param {*} argument The argument to the call. Should be serializable with |
|
|
|
|
* serialize |
|
|
|
|
* @param {module:src/metadata.Metadata=} metadata Metadata to add to the call |
|
|
|
|
* @param {module:src/client~CallOptions=} options Options map |
|
|
|
|
* @param {module:src/client.Client~requestCallback} callback The callback to |
|
|
|
|
* for when the response is received |
|
|
|
|
* @return {EventEmitter} An event emitter for stream related events |
|
|
|
|
*/ |
|
|
|
|
function makeClientStreamRequestFunction(method, serialize, deserialize) { |
|
|
|
|
/** |
|
|
|
|
* Make a client stream request with this method on the given channel with the |
|
|
|
|
* given callback, etc. |
|
|
|
|
* @this {Client} Client object. Must have a channel member. |
|
|
|
|
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the |
|
|
|
|
* call |
|
|
|
|
* @param {Object=} options Options map |
|
|
|
|
* @param {function(?Error, value=)} callback The callback to for when the |
|
|
|
|
* response is received |
|
|
|
|
* @return {EventEmitter} An event emitter for stream related events |
|
|
|
|
*/ |
|
|
|
|
function makeClientStreamRequest(metadata, options, callback) { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
|
/* While the arguments are listed in the function signature, those variables |
|
|
|
|
* are not used directly. Instead, ArgueJS processes the arguments |
|
|
|
|
* object. This allows for simple handling of optional arguments in the |
|
|
|
|
* middle of the argument list, and also provides type checking. */ |
|
|
|
|
var args = arguejs({metadata: [Metadata, new Metadata()], |
|
|
|
|
options: [Object], callback: Function}, arguments); |
|
|
|
|
var call = getCall(this.$channel, method, args.options); |
|
|
|
|
metadata = args.metadata.clone(); |
|
|
|
|
var stream = new ClientWritableStream(call, serialize); |
|
|
|
|
var metadata_batch = {}; |
|
|
|
|
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
metadata._getCoreRepresentation(); |
|
|
|
|
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
|
|
|
|
call.startBatch(metadata_batch, function(err, response) { |
|
|
|
|
Client.prototype.makeUnaryRequest = function(method, serialize, deserialize, |
|
|
|
|
argument, metadata, options, |
|
|
|
|
callback) { |
|
|
|
|
/* While the arguments are listed in the function signature, those variables |
|
|
|
|
* are not used directly. Instead, ArgueJS processes the arguments |
|
|
|
|
* object. This allows for simple handling of optional arguments in the |
|
|
|
|
* middle of the argument list, and also provides type checking. */ |
|
|
|
|
var args = arguejs({method: String, serialize: Function, |
|
|
|
|
deserialize: Function, |
|
|
|
|
argument: null, metadata: [Metadata, new Metadata()], |
|
|
|
|
options: [Object], callback: Function}, arguments); |
|
|
|
|
var call = getCall(this.$channel, method, args.options); |
|
|
|
|
var emitter = new ClientUnaryCall(call); |
|
|
|
|
metadata = args.metadata.clone(); |
|
|
|
|
var client_batch = {}; |
|
|
|
|
var message = serialize(args.argument); |
|
|
|
|
if (args.options) { |
|
|
|
|
message.grpcWriteFlags = args.options.flags; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
client_batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
metadata._getCoreRepresentation(); |
|
|
|
|
client_batch[grpc.opType.SEND_MESSAGE] = message; |
|
|
|
|
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
|
|
|
|
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
|
|
|
|
client_batch[grpc.opType.RECV_MESSAGE] = true; |
|
|
|
|
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
|
|
|
|
call.startBatch(client_batch, function(err, response) { |
|
|
|
|
response.status.metadata = Metadata._fromCoreRepresentation( |
|
|
|
|
response.status.metadata); |
|
|
|
|
var status = response.status; |
|
|
|
|
var error; |
|
|
|
|
var deserialized; |
|
|
|
|
emitter.emit('metadata', Metadata._fromCoreRepresentation( |
|
|
|
|
response.metadata)); |
|
|
|
|
if (status.code === grpc.status.OK) { |
|
|
|
|
if (err) { |
|
|
|
|
// The call has stopped for some reason. A non-OK status will arrive
|
|
|
|
|
// in the other batch.
|
|
|
|
|
// Got a batch error, but OK status. Something went wrong
|
|
|
|
|
args.callback(err); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
stream.emit('metadata', Metadata._fromCoreRepresentation( |
|
|
|
|
response.metadata)); |
|
|
|
|
}); |
|
|
|
|
var client_batch = {}; |
|
|
|
|
client_batch[grpc.opType.RECV_MESSAGE] = true; |
|
|
|
|
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
|
|
|
|
call.startBatch(client_batch, function(err, response) { |
|
|
|
|
response.status.metadata = Metadata._fromCoreRepresentation( |
|
|
|
|
response.status.metadata); |
|
|
|
|
var status = response.status; |
|
|
|
|
var error; |
|
|
|
|
var deserialized; |
|
|
|
|
if (status.code === grpc.status.OK) { |
|
|
|
|
if (err) { |
|
|
|
|
// Got a batch error, but OK status. Something went wrong
|
|
|
|
|
args.callback(err); |
|
|
|
|
return; |
|
|
|
|
} else { |
|
|
|
|
try { |
|
|
|
|
deserialized = deserialize(response.read); |
|
|
|
|
} catch (e) { |
|
|
|
|
/* Change status to indicate bad server response. This will result |
|
|
|
|
* in passing an error to the callback */ |
|
|
|
|
status = { |
|
|
|
|
code: grpc.status.INTERNAL, |
|
|
|
|
details: 'Failed to parse server response' |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (status.code !== grpc.status.OK) { |
|
|
|
|
error = new Error(response.status.details); |
|
|
|
|
error.code = status.code; |
|
|
|
|
error.metadata = status.metadata; |
|
|
|
|
args.callback(error); |
|
|
|
|
} else { |
|
|
|
|
args.callback(null, deserialized); |
|
|
|
|
try { |
|
|
|
|
deserialized = deserialize(response.read); |
|
|
|
|
} catch (e) { |
|
|
|
|
/* Change status to indicate bad server response. This will result |
|
|
|
|
* in passing an error to the callback */ |
|
|
|
|
status = { |
|
|
|
|
code: grpc.status.INTERNAL, |
|
|
|
|
details: 'Failed to parse server response' |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
stream.emit('status', status); |
|
|
|
|
}); |
|
|
|
|
return stream; |
|
|
|
|
} |
|
|
|
|
return makeClientStreamRequest; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (status.code !== grpc.status.OK) { |
|
|
|
|
error = new Error(status.details); |
|
|
|
|
error.code = status.code; |
|
|
|
|
error.metadata = status.metadata; |
|
|
|
|
args.callback(error); |
|
|
|
|
} else { |
|
|
|
|
args.callback(null, deserialized); |
|
|
|
|
} |
|
|
|
|
emitter.emit('status', status); |
|
|
|
|
}); |
|
|
|
|
return emitter; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Get a function that can make server stream requests to the specified method. |
|
|
|
|
* Make a client stream request to the given method, using the given serialize |
|
|
|
|
* and deserialize functions, with the given argument. |
|
|
|
|
* @param {string} method The name of the method to request |
|
|
|
|
* @param {function(*):Buffer} serialize The serialization function for inputs |
|
|
|
|
* @param {function(Buffer)} deserialize The deserialization function for |
|
|
|
|
* outputs |
|
|
|
|
* @return {Function} makeServerStreamRequest |
|
|
|
|
* @param {module:src/common~serialize} serialize The serialization function for |
|
|
|
|
* inputs |
|
|
|
|
* @param {module:src/common~deserialize} deserialize The deserialization |
|
|
|
|
* function for outputs |
|
|
|
|
* @param {module:src/metadata.Metadata=} metadata Array of metadata key/value |
|
|
|
|
* pairs to add to the call |
|
|
|
|
* @param {module:src/client~CallOptions=} options Options map |
|
|
|
|
* @param {Client~requestCallback} callback The callback to for when the |
|
|
|
|
* response is received |
|
|
|
|
* @return {module:src/client~ClientWritableStream} An event emitter for stream |
|
|
|
|
* related events |
|
|
|
|
*/ |
|
|
|
|
function makeServerStreamRequestFunction(method, serialize, deserialize) { |
|
|
|
|
/** |
|
|
|
|
* Make a server stream request with this method on the given channel with the |
|
|
|
|
* given argument, etc. |
|
|
|
|
* @this {SurfaceClient} Client object. Must have a channel member. |
|
|
|
|
* @param {*} argument The argument to the call. Should be serializable with |
|
|
|
|
* serialize |
|
|
|
|
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the |
|
|
|
|
* call |
|
|
|
|
* @param {Object} options Options map |
|
|
|
|
* @return {EventEmitter} An event emitter for stream related events |
|
|
|
|
*/ |
|
|
|
|
function makeServerStreamRequest(argument, metadata, options) { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
|
/* While the arguments are listed in the function signature, those variables |
|
|
|
|
* are not used directly. Instead, ArgueJS processes the arguments |
|
|
|
|
* object. */ |
|
|
|
|
var args = arguejs({argument: null, metadata: [Metadata, new Metadata()], |
|
|
|
|
options: [Object]}, arguments); |
|
|
|
|
var call = getCall(this.$channel, method, args.options); |
|
|
|
|
metadata = args.metadata.clone(); |
|
|
|
|
var stream = new ClientReadableStream(call, deserialize); |
|
|
|
|
var start_batch = {}; |
|
|
|
|
var message = serialize(args.argument); |
|
|
|
|
if (args.options) { |
|
|
|
|
message.grpcWriteFlags = args.options.flags; |
|
|
|
|
Client.prototype.makeClientStreamRequest = function(method, serialize, |
|
|
|
|
deserialize, metadata, |
|
|
|
|
options, callback) { |
|
|
|
|
/* While the arguments are listed in the function signature, those variables |
|
|
|
|
* are not used directly. Instead, ArgueJS processes the arguments |
|
|
|
|
* object. This allows for simple handling of optional arguments in the |
|
|
|
|
* middle of the argument list, and also provides type checking. */ |
|
|
|
|
var args = arguejs({method:String, serialize: Function, |
|
|
|
|
deserialize: Function, |
|
|
|
|
metadata: [Metadata, new Metadata()], |
|
|
|
|
options: [Object], callback: Function}, arguments); |
|
|
|
|
var call = getCall(this.$channel, method, args.options); |
|
|
|
|
metadata = args.metadata.clone(); |
|
|
|
|
var stream = new ClientWritableStream(call, serialize); |
|
|
|
|
var metadata_batch = {}; |
|
|
|
|
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
metadata._getCoreRepresentation(); |
|
|
|
|
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
|
|
|
|
call.startBatch(metadata_batch, function(err, response) { |
|
|
|
|
if (err) { |
|
|
|
|
// The call has stopped for some reason. A non-OK status will arrive
|
|
|
|
|
// in the other batch.
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
start_batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
metadata._getCoreRepresentation(); |
|
|
|
|
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
|
|
|
|
start_batch[grpc.opType.SEND_MESSAGE] = message; |
|
|
|
|
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
|
|
|
|
call.startBatch(start_batch, function(err, response) { |
|
|
|
|
if (err) { |
|
|
|
|
// The call has stopped for some reason. A non-OK status will arrive
|
|
|
|
|
// in the other batch.
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
stream.emit('metadata', Metadata._fromCoreRepresentation( |
|
|
|
|
response.metadata)); |
|
|
|
|
}); |
|
|
|
|
var status_batch = {}; |
|
|
|
|
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
|
|
|
|
call.startBatch(status_batch, function(err, response) { |
|
|
|
|
stream.emit('metadata', Metadata._fromCoreRepresentation( |
|
|
|
|
response.metadata)); |
|
|
|
|
}); |
|
|
|
|
var client_batch = {}; |
|
|
|
|
client_batch[grpc.opType.RECV_MESSAGE] = true; |
|
|
|
|
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
|
|
|
|
call.startBatch(client_batch, function(err, response) { |
|
|
|
|
response.status.metadata = Metadata._fromCoreRepresentation( |
|
|
|
|
response.status.metadata); |
|
|
|
|
var status = response.status; |
|
|
|
|
var error; |
|
|
|
|
var deserialized; |
|
|
|
|
if (status.code === grpc.status.OK) { |
|
|
|
|
if (err) { |
|
|
|
|
stream.emit('error', err); |
|
|
|
|
// Got a batch error, but OK status. Something went wrong
|
|
|
|
|
args.callback(err); |
|
|
|
|
return; |
|
|
|
|
} else { |
|
|
|
|
try { |
|
|
|
|
deserialized = deserialize(response.read); |
|
|
|
|
} catch (e) { |
|
|
|
|
/* Change status to indicate bad server response. This will result |
|
|
|
|
* in passing an error to the callback */ |
|
|
|
|
status = { |
|
|
|
|
code: grpc.status.INTERNAL, |
|
|
|
|
details: 'Failed to parse server response' |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
response.status.metadata = Metadata._fromCoreRepresentation( |
|
|
|
|
response.status.metadata); |
|
|
|
|
stream._receiveStatus(response.status); |
|
|
|
|
}); |
|
|
|
|
return stream; |
|
|
|
|
} |
|
|
|
|
return makeServerStreamRequest; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (status.code !== grpc.status.OK) { |
|
|
|
|
error = new Error(response.status.details); |
|
|
|
|
error.code = status.code; |
|
|
|
|
error.metadata = status.metadata; |
|
|
|
|
args.callback(error); |
|
|
|
|
} else { |
|
|
|
|
args.callback(null, deserialized); |
|
|
|
|
} |
|
|
|
|
stream.emit('status', status); |
|
|
|
|
}); |
|
|
|
|
return stream; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Get a function that can make bidirectional stream requests to the specified |
|
|
|
|
* method. |
|
|
|
|
* Make a server stream request to the given method, with the given serialize |
|
|
|
|
* and deserialize function, using the given argument |
|
|
|
|
* @param {string} method The name of the method to request |
|
|
|
|
* @param {function(*):Buffer} serialize The serialization function for inputs |
|
|
|
|
* @param {function(Buffer)} deserialize The deserialization function for |
|
|
|
|
* outputs |
|
|
|
|
* @return {Function} makeBidiStreamRequest |
|
|
|
|
* @param {module:src/common~serialize} serialize The serialization function for |
|
|
|
|
* inputs |
|
|
|
|
* @param {module:src/common~deserialize} deserialize The deserialization |
|
|
|
|
* function for outputs |
|
|
|
|
* @param {*} argument The argument to the call. Should be serializable with |
|
|
|
|
* serialize |
|
|
|
|
* @param {module:src/metadata.Metadata=} metadata Array of metadata key/value |
|
|
|
|
* pairs to add to the call |
|
|
|
|
* @param {module:src/client~CallOptions=} options Options map |
|
|
|
|
* @return {module:src/client~ClientReadableStream} An event emitter for stream |
|
|
|
|
* related events |
|
|
|
|
*/ |
|
|
|
|
function makeBidiStreamRequestFunction(method, serialize, deserialize) { |
|
|
|
|
/** |
|
|
|
|
* Make a bidirectional stream request with this method on the given channel. |
|
|
|
|
* @this {SurfaceClient} Client object. Must have a channel member. |
|
|
|
|
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the |
|
|
|
|
* call |
|
|
|
|
* @param {Options} options Options map |
|
|
|
|
* @return {EventEmitter} An event emitter for stream related events |
|
|
|
|
*/ |
|
|
|
|
function makeBidiStreamRequest(metadata, options) { |
|
|
|
|
/* jshint validthis: true */ |
|
|
|
|
/* While the arguments are listed in the function signature, those variables |
|
|
|
|
* are not used directly. Instead, ArgueJS processes the arguments |
|
|
|
|
* object. */ |
|
|
|
|
var args = arguejs({metadata: [Metadata, new Metadata()], |
|
|
|
|
options: [Object]}, arguments); |
|
|
|
|
var call = getCall(this.$channel, method, args.options); |
|
|
|
|
metadata = args.metadata.clone(); |
|
|
|
|
var stream = new ClientDuplexStream(call, serialize, deserialize); |
|
|
|
|
var start_batch = {}; |
|
|
|
|
start_batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
metadata._getCoreRepresentation(); |
|
|
|
|
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
|
|
|
|
call.startBatch(start_batch, function(err, response) { |
|
|
|
|
if (err) { |
|
|
|
|
// The call has stopped for some reason. A non-OK status will arrive
|
|
|
|
|
// in the other batch.
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
stream.emit('metadata', Metadata._fromCoreRepresentation( |
|
|
|
|
response.metadata)); |
|
|
|
|
}); |
|
|
|
|
var status_batch = {}; |
|
|
|
|
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
|
|
|
|
call.startBatch(status_batch, function(err, response) { |
|
|
|
|
if (err) { |
|
|
|
|
stream.emit('error', err); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
response.status.metadata = Metadata._fromCoreRepresentation( |
|
|
|
|
response.status.metadata); |
|
|
|
|
stream._receiveStatus(response.status); |
|
|
|
|
}); |
|
|
|
|
return stream; |
|
|
|
|
Client.prototype.makeServerStreamRequest = function(method, serialize, |
|
|
|
|
deserialize, argument, |
|
|
|
|
metadata, options) { |
|
|
|
|
/* While the arguments are listed in the function signature, those variables |
|
|
|
|
* are not used directly. Instead, ArgueJS processes the arguments |
|
|
|
|
* object. */ |
|
|
|
|
var args = arguejs({method:String, serialize: Function, |
|
|
|
|
deserialize: Function, |
|
|
|
|
argument: null, metadata: [Metadata, new Metadata()], |
|
|
|
|
options: [Object]}, arguments); |
|
|
|
|
var call = getCall(this.$channel, method, args.options); |
|
|
|
|
metadata = args.metadata.clone(); |
|
|
|
|
var stream = new ClientReadableStream(call, deserialize); |
|
|
|
|
var start_batch = {}; |
|
|
|
|
var message = serialize(args.argument); |
|
|
|
|
if (args.options) { |
|
|
|
|
message.grpcWriteFlags = args.options.flags; |
|
|
|
|
} |
|
|
|
|
return makeBidiStreamRequest; |
|
|
|
|
} |
|
|
|
|
start_batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
metadata._getCoreRepresentation(); |
|
|
|
|
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
|
|
|
|
start_batch[grpc.opType.SEND_MESSAGE] = message; |
|
|
|
|
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
|
|
|
|
call.startBatch(start_batch, function(err, response) { |
|
|
|
|
if (err) { |
|
|
|
|
// The call has stopped for some reason. A non-OK status will arrive
|
|
|
|
|
// in the other batch.
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
stream.emit('metadata', Metadata._fromCoreRepresentation( |
|
|
|
|
response.metadata)); |
|
|
|
|
}); |
|
|
|
|
var status_batch = {}; |
|
|
|
|
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
|
|
|
|
call.startBatch(status_batch, function(err, response) { |
|
|
|
|
if (err) { |
|
|
|
|
stream.emit('error', err); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
response.status.metadata = Metadata._fromCoreRepresentation( |
|
|
|
|
response.status.metadata); |
|
|
|
|
stream._receiveStatus(response.status); |
|
|
|
|
}); |
|
|
|
|
return stream; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Make a bidirectional stream request with this method on the given channel. |
|
|
|
|
* @param {string} method The name of the method to request |
|
|
|
|
* @param {module:src/common~serialize} serialize The serialization function for |
|
|
|
|
* inputs |
|
|
|
|
* @param {module:src/common~deserialize} deserialize The deserialization |
|
|
|
|
* function for outputs |
|
|
|
|
* @param {module:src/metadata.Metadata=} metadata Array of metadata key/value |
|
|
|
|
* pairs to add to the call |
|
|
|
|
* @param {module:src/client~CallOptions=} options Options map |
|
|
|
|
* @return {module:src/client~ClientDuplexStream} An event emitter for stream |
|
|
|
|
* related events |
|
|
|
|
*/ |
|
|
|
|
Client.prototype.makeBidiStreamRequest = function(method, serialize, |
|
|
|
|
deserialize, metadata, |
|
|
|
|
options) { |
|
|
|
|
/* While the arguments are listed in the function signature, those variables |
|
|
|
|
* are not used directly. Instead, ArgueJS processes the arguments |
|
|
|
|
* object. */ |
|
|
|
|
var args = arguejs({method:String, serialize: Function, |
|
|
|
|
deserialize: Function, |
|
|
|
|
metadata: [Metadata, new Metadata()], |
|
|
|
|
options: [Object]}, arguments); |
|
|
|
|
var call = getCall(this.$channel, method, args.options); |
|
|
|
|
metadata = args.metadata.clone(); |
|
|
|
|
var stream = new ClientDuplexStream(call, serialize, deserialize); |
|
|
|
|
var start_batch = {}; |
|
|
|
|
start_batch[grpc.opType.SEND_INITIAL_METADATA] = |
|
|
|
|
metadata._getCoreRepresentation(); |
|
|
|
|
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
|
|
|
|
call.startBatch(start_batch, function(err, response) { |
|
|
|
|
if (err) { |
|
|
|
|
// The call has stopped for some reason. A non-OK status will arrive
|
|
|
|
|
// in the other batch.
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
stream.emit('metadata', Metadata._fromCoreRepresentation( |
|
|
|
|
response.metadata)); |
|
|
|
|
}); |
|
|
|
|
var status_batch = {}; |
|
|
|
|
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
|
|
|
|
call.startBatch(status_batch, function(err, response) { |
|
|
|
|
if (err) { |
|
|
|
|
stream.emit('error', err); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
response.status.metadata = Metadata._fromCoreRepresentation( |
|
|
|
|
response.status.metadata); |
|
|
|
|
stream._receiveStatus(response.status); |
|
|
|
|
}); |
|
|
|
|
return stream; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
Client.prototype.close = function() { |
|
|
|
|
this.$channel.close(); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Return the underlying channel object for the specified client |
|
|
|
|
* @return {Channel} The channel |
|
|
|
|
*/ |
|
|
|
|
Client.prototype.getChannel = function() { |
|
|
|
|
return this.$channel; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Wait for the client to be ready. The callback will be called when the |
|
|
|
|
* client has successfully connected to the server, and it will be called |
|
|
|
|
* with an error if the attempt to connect to the server has unrecoverablly |
|
|
|
|
* failed or if the deadline expires. This function will make the channel |
|
|
|
|
* start connecting if it has not already done so. |
|
|
|
|
* @param {(Date|Number)} deadline When to stop waiting for a connection. Pass |
|
|
|
|
* Infinity to wait forever. |
|
|
|
|
* @param {function(Error)} callback The callback to call when done attempting |
|
|
|
|
* to connect. |
|
|
|
|
*/ |
|
|
|
|
Client.prototype.waitForReady = function(deadline, callback) { |
|
|
|
|
var self = this; |
|
|
|
|
var checkState = function(err) { |
|
|
|
|
if (err) { |
|
|
|
|
callback(new Error('Failed to connect before the deadline')); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
var new_state = self.$channel.getConnectivityState(true); |
|
|
|
|
if (new_state === grpc.connectivityState.READY) { |
|
|
|
|
callback(); |
|
|
|
|
} else if (new_state === grpc.connectivityState.FATAL_FAILURE) { |
|
|
|
|
callback(new Error('Failed to connect to server')); |
|
|
|
|
} else { |
|
|
|
|
self.$channel.watchConnectivityState(new_state, deadline, checkState); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
checkState(); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Map with short names for each of the requester maker functions. Used in |
|
|
|
|
* makeClientConstructor |
|
|
|
|
* @access private |
|
|
|
|
*/ |
|
|
|
|
var requester_makers = { |
|
|
|
|
unary: makeUnaryRequestFunction, |
|
|
|
|
server_stream: makeServerStreamRequestFunction, |
|
|
|
|
client_stream: makeClientStreamRequestFunction, |
|
|
|
|
bidi: makeBidiStreamRequestFunction |
|
|
|
|
var requester_funcs = { |
|
|
|
|
unary: Client.prototype.makeUnaryRequest, |
|
|
|
|
server_stream: Client.prototype.makeServerStreamRequest, |
|
|
|
|
client_stream: Client.prototype.makeClientStreamRequest, |
|
|
|
|
bidi: Client.prototype.makeBidiStreamRequest |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
function getDefaultValues(metadata, options) { |
|
|
|
@ -675,6 +804,7 @@ function getDefaultValues(metadata, options) { |
|
|
|
|
/** |
|
|
|
|
* Map with wrappers for each type of requester function to make it use the old |
|
|
|
|
* argument order with optional arguments after the callback. |
|
|
|
|
* @access private |
|
|
|
|
*/ |
|
|
|
|
var deprecated_request_wrap = { |
|
|
|
|
unary: function(makeUnaryRequest) { |
|
|
|
@ -700,55 +830,33 @@ var deprecated_request_wrap = { |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Creates a constructor for a client with the given methods. The methods object |
|
|
|
|
* maps method name to an object with the following keys: |
|
|
|
|
* path: The path on the server for accessing the method. For example, for |
|
|
|
|
* protocol buffers, we use "/service_name/method_name" |
|
|
|
|
* requestStream: bool indicating whether the client sends a stream |
|
|
|
|
* resonseStream: bool indicating whether the server sends a stream |
|
|
|
|
* requestSerialize: function to serialize request objects |
|
|
|
|
* responseDeserialize: function to deserialize response objects |
|
|
|
|
* @param {Object} methods An object mapping method names to method attributes |
|
|
|
|
* Creates a constructor for a client with the given methods, as specified in |
|
|
|
|
* the methods argument. |
|
|
|
|
* @param {module:src/common~ServiceDefinition} methods An object mapping |
|
|
|
|
* method names to method attributes |
|
|
|
|
* @param {string} serviceName The fully qualified name of the service |
|
|
|
|
* @param {Object} class_options An options object. Currently only uses the key |
|
|
|
|
* deprecatedArgumentOrder, a boolean that Indicates that the old argument |
|
|
|
|
* order should be used for methods, with optional arguments at the end |
|
|
|
|
* instead of the callback at the end. Defaults to false. This option is |
|
|
|
|
* only a temporary stopgap measure to smooth an API breakage. |
|
|
|
|
* @param {Object} class_options An options object. |
|
|
|
|
* @param {boolean=} [class_options.deprecatedArgumentOrder=false] Indicates |
|
|
|
|
* that the old argument order should be used for methods, with optional |
|
|
|
|
* arguments at the end instead of the callback at the end. This option |
|
|
|
|
* is only a temporary stopgap measure to smooth an API breakage. |
|
|
|
|
* It is deprecated, and new code should not use it. |
|
|
|
|
* @return {function(string, Object)} New client constructor |
|
|
|
|
* @return {function(string, Object)} New client constructor, which is a |
|
|
|
|
* subclass of [grpc.Client]{@link module:src/client.Client}, and has the |
|
|
|
|
* same arguments as that constructor. |
|
|
|
|
*/ |
|
|
|
|
exports.makeClientConstructor = function(methods, serviceName, |
|
|
|
|
class_options) { |
|
|
|
|
if (!class_options) { |
|
|
|
|
class_options = {}; |
|
|
|
|
} |
|
|
|
|
/** |
|
|
|
|
* Create a client with the given methods |
|
|
|
|
* @constructor |
|
|
|
|
* @param {string} address The address of the server to connect to |
|
|
|
|
* @param {grpc.Credentials} credentials Credentials to use to connect |
|
|
|
|
* to the server |
|
|
|
|
* @param {Object} options Options to pass to the underlying channel |
|
|
|
|
*/ |
|
|
|
|
function Client(address, credentials, options) { |
|
|
|
|
if (!options) { |
|
|
|
|
options = {}; |
|
|
|
|
} |
|
|
|
|
/* Append the grpc-node user agent string after the application user agent |
|
|
|
|
* string, and put the combination at the beginning of the user agent string |
|
|
|
|
*/ |
|
|
|
|
if (options['grpc.primary_user_agent']) { |
|
|
|
|
options['grpc.primary_user_agent'] += ' '; |
|
|
|
|
} else { |
|
|
|
|
options['grpc.primary_user_agent'] = ''; |
|
|
|
|
} |
|
|
|
|
options['grpc.primary_user_agent'] += 'grpc-node/' + version; |
|
|
|
|
/* Private fields use $ as a prefix instead of _ because it is an invalid |
|
|
|
|
* prefix of a method name */ |
|
|
|
|
this.$channel = new grpc.Channel(address, credentials, options); |
|
|
|
|
|
|
|
|
|
function ServiceClient(address, credentials, options) { |
|
|
|
|
Client.call(this, address, credentials, options); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
util.inherits(ServiceClient, Client); |
|
|
|
|
|
|
|
|
|
_.each(methods, function(attrs, name) { |
|
|
|
|
var method_type; |
|
|
|
|
if (_.startsWith(name, '$')) { |
|
|
|
@ -769,20 +877,20 @@ exports.makeClientConstructor = function(methods, serviceName, |
|
|
|
|
} |
|
|
|
|
var serialize = attrs.requestSerialize; |
|
|
|
|
var deserialize = attrs.responseDeserialize; |
|
|
|
|
var method_func = requester_makers[method_type]( |
|
|
|
|
attrs.path, serialize, deserialize); |
|
|
|
|
var method_func = _.partial(requester_funcs[method_type], attrs.path, |
|
|
|
|
serialize, deserialize); |
|
|
|
|
if (class_options.deprecatedArgumentOrder) { |
|
|
|
|
Client.prototype[name] = deprecated_request_wrap(method_func); |
|
|
|
|
ServiceClient.prototype[name] = deprecated_request_wrap(method_func); |
|
|
|
|
} else { |
|
|
|
|
Client.prototype[name] = method_func; |
|
|
|
|
ServiceClient.prototype[name] = method_func; |
|
|
|
|
} |
|
|
|
|
// Associate all provided attributes with the method
|
|
|
|
|
_.assign(Client.prototype[name], attrs); |
|
|
|
|
_.assign(ServiceClient.prototype[name], attrs); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
Client.service = methods; |
|
|
|
|
ServiceClient.service = methods; |
|
|
|
|
|
|
|
|
|
return Client; |
|
|
|
|
return ServiceClient; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -791,7 +899,7 @@ exports.makeClientConstructor = function(methods, serviceName, |
|
|
|
|
* @return {Channel} The channel |
|
|
|
|
*/ |
|
|
|
|
exports.getClientChannel = function(client) { |
|
|
|
|
return client.$channel; |
|
|
|
|
return Client.prototype.getChannel.call(client); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -807,21 +915,7 @@ exports.getClientChannel = function(client) { |
|
|
|
|
* to connect. |
|
|
|
|
*/ |
|
|
|
|
exports.waitForClientReady = function(client, deadline, callback) { |
|
|
|
|
var checkState = function(err) { |
|
|
|
|
if (err) { |
|
|
|
|
callback(new Error('Failed to connect before the deadline')); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
var new_state = client.$channel.getConnectivityState(true); |
|
|
|
|
if (new_state === grpc.connectivityState.READY) { |
|
|
|
|
callback(); |
|
|
|
|
} else if (new_state === grpc.connectivityState.FATAL_FAILURE) { |
|
|
|
|
callback(new Error('Failed to connect to server')); |
|
|
|
|
} else { |
|
|
|
|
client.$channel.watchConnectivityState(new_state, deadline, checkState); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
checkState(); |
|
|
|
|
Client.prototype.waitForReady.call(client, deadline, callback); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|