|
|
|
@ -54,11 +54,9 @@ util.inherits(ServerReadableObjectStream, Readable); |
|
|
|
|
* server side. Extends from stream.Readable. |
|
|
|
|
* @constructor |
|
|
|
|
* @param {stream} stream Underlying binary Duplex stream for the call |
|
|
|
|
* @param {function(Buffer)} deserialize Function for deserializing binary data |
|
|
|
|
* @param {object} options Stream options |
|
|
|
|
*/ |
|
|
|
|
function ServerReadableObjectStream(stream, deserialize, options) { |
|
|
|
|
options = _.extend(options, {objectMode: true}); |
|
|
|
|
function ServerReadableObjectStream(stream) { |
|
|
|
|
var options = {objectMode: true}; |
|
|
|
|
Readable.call(this, options); |
|
|
|
|
this._stream = stream; |
|
|
|
|
Object.defineProperty(this, 'cancelled', { |
|
|
|
@ -66,7 +64,7 @@ function ServerReadableObjectStream(stream, deserialize, options) { |
|
|
|
|
}); |
|
|
|
|
var self = this; |
|
|
|
|
this._stream.on('data', function forwardData(chunk) { |
|
|
|
|
if (!self.push(deserialize(chunk))) { |
|
|
|
|
if (!self.push(chunk)) { |
|
|
|
|
self._stream.pause(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
@ -76,57 +74,6 @@ function ServerReadableObjectStream(stream, deserialize, options) { |
|
|
|
|
this._stream.pause(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
util.inherits(ServerWritableObjectStream, Writable); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Class for representing a gRPC server streaming call as a Node stream on the |
|
|
|
|
* server side. Extends from stream.Writable. |
|
|
|
|
* @constructor |
|
|
|
|
* @param {stream} stream Underlying binary Duplex stream for the call |
|
|
|
|
* @param {function(*):Buffer} serialize Function for serializing objects |
|
|
|
|
* @param {object} options Stream options |
|
|
|
|
*/ |
|
|
|
|
function ServerWritableObjectStream(stream, serialize, options) { |
|
|
|
|
options = _.extend(options, {objectMode: true}); |
|
|
|
|
Writable.call(this, options); |
|
|
|
|
this._stream = stream; |
|
|
|
|
this._serialize = serialize; |
|
|
|
|
this.on('finish', function() { |
|
|
|
|
this._stream.end(); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
util.inherits(ServerBidiObjectStream, Duplex); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Class for representing a gRPC bidi streaming call as a Node stream on the |
|
|
|
|
* server side. Extends from stream.Duplex. |
|
|
|
|
* @constructor |
|
|
|
|
* @param {stream} stream Underlying binary Duplex stream for the call |
|
|
|
|
* @param {function(*):Buffer} serialize Function for serializing objects |
|
|
|
|
* @param {function(Buffer)} deserialize Function for deserializing binary data |
|
|
|
|
* @param {object} options Stream options |
|
|
|
|
*/ |
|
|
|
|
function ServerBidiObjectStream(stream, serialize, deserialize, options) { |
|
|
|
|
options = _.extend(options, {objectMode: true}); |
|
|
|
|
Duplex.call(this, options); |
|
|
|
|
this._stream = stream; |
|
|
|
|
this._serialize = serialize; |
|
|
|
|
var self = this; |
|
|
|
|
this._stream.on('data', function forwardData(chunk) { |
|
|
|
|
if (!self.push(deserialize(chunk))) { |
|
|
|
|
self._stream.pause(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
this._stream.on('end', function forwardEnd() { |
|
|
|
|
self.push(null); |
|
|
|
|
}); |
|
|
|
|
this._stream.pause(); |
|
|
|
|
this.on('finish', function() { |
|
|
|
|
this._stream.end(); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* _read implementation for both types of streams that allow reading. |
|
|
|
|
* @this {ServerReadableObjectStream|ServerBidiObjectStream} |
|
|
|
@ -140,39 +87,46 @@ function _read(size) { |
|
|
|
|
* See docs for _read |
|
|
|
|
*/ |
|
|
|
|
ServerReadableObjectStream.prototype._read = _read; |
|
|
|
|
|
|
|
|
|
util.inherits(ServerWritableObjectStream, Writable); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* See docs for _read |
|
|
|
|
* Class for representing a gRPC server streaming call as a Node stream on the |
|
|
|
|
* server side. Extends from stream.Writable. |
|
|
|
|
* @constructor |
|
|
|
|
* @param {stream} stream Underlying binary Duplex stream for the call |
|
|
|
|
*/ |
|
|
|
|
ServerBidiObjectStream.prototype._read = _read; |
|
|
|
|
function ServerWritableObjectStream(stream) { |
|
|
|
|
var options = {objectMode: true}; |
|
|
|
|
Writable.call(this, options); |
|
|
|
|
this._stream = stream; |
|
|
|
|
this.on('finish', function() { |
|
|
|
|
this._stream.end(); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* _write implementation for both types of streams that allow writing |
|
|
|
|
* @this {ServerWritableObjectStream|ServerBidiObjectStream} |
|
|
|
|
* @this {ServerWritableObjectStream} |
|
|
|
|
* @param {*} chunk The value to write to the stream |
|
|
|
|
* @param {string} encoding Ignored |
|
|
|
|
* @param {function(Error)} callback Callback to call when finished writing |
|
|
|
|
*/ |
|
|
|
|
function _write(chunk, encoding, callback) { |
|
|
|
|
this._stream.write(this._serialize(chunk), encoding, callback); |
|
|
|
|
this._stream.write(chunk, encoding, callback); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* See docs for _write |
|
|
|
|
*/ |
|
|
|
|
ServerWritableObjectStream.prototype._write = _write; |
|
|
|
|
/** |
|
|
|
|
* See docs for _write |
|
|
|
|
*/ |
|
|
|
|
ServerBidiObjectStream.prototype._write = _write; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Creates a binary stream handler function from a unary handler function |
|
|
|
|
* @param {function(Object, function(Error, *))} handler Unary call handler |
|
|
|
|
* @param {function(*):Buffer} serialize Serialization function |
|
|
|
|
* @param {function(Buffer):*} deserialize Deserialization function |
|
|
|
|
* @return {function(stream)} Binary stream handler |
|
|
|
|
*/ |
|
|
|
|
function makeUnaryHandler(handler, serialize, deserialize) { |
|
|
|
|
function makeUnaryHandler(handler) { |
|
|
|
|
/** |
|
|
|
|
* Handles a stream by reading a single data value, passing it to the handler, |
|
|
|
|
* and writing the response back to the stream. |
|
|
|
@ -180,7 +134,7 @@ function makeUnaryHandler(handler, serialize, deserialize) { |
|
|
|
|
*/ |
|
|
|
|
return function handleUnaryCall(stream) { |
|
|
|
|
stream.on('data', function handleUnaryData(value) { |
|
|
|
|
var call = {request: deserialize(value)}; |
|
|
|
|
var call = {request: value}; |
|
|
|
|
Object.defineProperty(call, 'cancelled', { |
|
|
|
|
get: function() { return stream.cancelled;} |
|
|
|
|
}); |
|
|
|
@ -188,7 +142,7 @@ function makeUnaryHandler(handler, serialize, deserialize) { |
|
|
|
|
if (err) { |
|
|
|
|
stream.emit('error', err); |
|
|
|
|
} else { |
|
|
|
|
stream.write(serialize(value)); |
|
|
|
|
stream.write(value); |
|
|
|
|
stream.end(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
@ -201,23 +155,21 @@ function makeUnaryHandler(handler, serialize, deserialize) { |
|
|
|
|
* function |
|
|
|
|
* @param {function(Readable, function(Error, *))} handler Client stream call |
|
|
|
|
* handler |
|
|
|
|
* @param {function(*):Buffer} serialize Serialization function |
|
|
|
|
* @param {function(Buffer):*} deserialize Deserialization function |
|
|
|
|
* @return {function(stream)} Binary stream handler |
|
|
|
|
*/ |
|
|
|
|
function makeClientStreamHandler(handler, serialize, deserialize) { |
|
|
|
|
function makeClientStreamHandler(handler) { |
|
|
|
|
/** |
|
|
|
|
* Handles a stream by passing a deserializing stream to the handler and |
|
|
|
|
* writing the response back to the stream. |
|
|
|
|
* @param {stream} stream Binary data stream |
|
|
|
|
*/ |
|
|
|
|
return function handleClientStreamCall(stream) { |
|
|
|
|
var object_stream = new ServerReadableObjectStream(stream, deserialize, {}); |
|
|
|
|
var object_stream = new ServerReadableObjectStream(stream); |
|
|
|
|
handler(object_stream, function sendClientStreamData(err, value) { |
|
|
|
|
if (err) { |
|
|
|
|
stream.emit('error', err); |
|
|
|
|
} else { |
|
|
|
|
stream.write(serialize(value)); |
|
|
|
|
stream.write(value); |
|
|
|
|
stream.end(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
@ -228,11 +180,9 @@ function makeClientStreamHandler(handler, serialize, deserialize) { |
|
|
|
|
* Creates a binary stream handler function from a server stream handler |
|
|
|
|
* function |
|
|
|
|
* @param {function(Writable)} handler Server stream call handler |
|
|
|
|
* @param {function(*):Buffer} serialize Serialization function |
|
|
|
|
* @param {function(Buffer):*} deserialize Deserialization function |
|
|
|
|
* @return {function(stream)} Binary stream handler |
|
|
|
|
*/ |
|
|
|
|
function makeServerStreamHandler(handler, serialize, deserialize) { |
|
|
|
|
function makeServerStreamHandler(handler) { |
|
|
|
|
/** |
|
|
|
|
* Handles a stream by attaching it to a serializing stream, and passing it to |
|
|
|
|
* the handler. |
|
|
|
@ -240,10 +190,8 @@ function makeServerStreamHandler(handler, serialize, deserialize) { |
|
|
|
|
*/ |
|
|
|
|
return function handleServerStreamCall(stream) { |
|
|
|
|
stream.on('data', function handleClientData(value) { |
|
|
|
|
var object_stream = new ServerWritableObjectStream(stream, |
|
|
|
|
serialize, |
|
|
|
|
{}); |
|
|
|
|
object_stream.request = deserialize(value); |
|
|
|
|
var object_stream = new ServerWritableObjectStream(stream); |
|
|
|
|
object_stream.request = value; |
|
|
|
|
handler(object_stream); |
|
|
|
|
}); |
|
|
|
|
}; |
|
|
|
@ -252,23 +200,10 @@ function makeServerStreamHandler(handler, serialize, deserialize) { |
|
|
|
|
/** |
|
|
|
|
* Creates a binary stream handler function from a bidi stream handler function |
|
|
|
|
* @param {function(Duplex)} handler Unary call handler |
|
|
|
|
* @param {function(*):Buffer} serialize Serialization function |
|
|
|
|
* @param {function(Buffer):*} deserialize Deserialization function |
|
|
|
|
* @return {function(stream)} Binary stream handler |
|
|
|
|
*/ |
|
|
|
|
function makeBidiStreamHandler(handler, serialize, deserialize) { |
|
|
|
|
/** |
|
|
|
|
* Handles a stream by wrapping it in a serializing and deserializing object |
|
|
|
|
* stream, and passing it to the handler. |
|
|
|
|
* @param {stream} stream Binary data stream |
|
|
|
|
*/ |
|
|
|
|
return function handleBidiStreamCall(stream) { |
|
|
|
|
var object_stream = new ServerBidiObjectStream(stream, |
|
|
|
|
serialize, |
|
|
|
|
deserialize, |
|
|
|
|
{}); |
|
|
|
|
handler(object_stream); |
|
|
|
|
}; |
|
|
|
|
function makeBidiStreamHandler(handler) { |
|
|
|
|
return handler; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -341,10 +276,13 @@ function makeServerConstructor(services) { |
|
|
|
|
common.fullyQualifiedName(method) + ' not provided.'); |
|
|
|
|
} |
|
|
|
|
var binary_handler = handler_makers[method_type]( |
|
|
|
|
service_handlers[service_name][decapitalize(method.name)], |
|
|
|
|
common.serializeCls(method.resolvedResponseType.build()), |
|
|
|
|
common.deserializeCls(method.resolvedRequestType.build())); |
|
|
|
|
server.register(prefix + capitalize(method.name), binary_handler); |
|
|
|
|
service_handlers[service_name][decapitalize(method.name)]); |
|
|
|
|
var serialize = common.serializeCls( |
|
|
|
|
method.resolvedResponseType.build()); |
|
|
|
|
var deserialize = common.deserializeCls( |
|
|
|
|
method.resolvedRequestType.build()); |
|
|
|
|
server.register(prefix + capitalize(method.name), binary_handler, |
|
|
|
|
serialize, deserialize); |
|
|
|
|
}); |
|
|
|
|
}, this); |
|
|
|
|
} |
|
|
|
|