|
|
|
@ -98,36 +98,9 @@ function ClientWritableObjectStream(stream) { |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
util.inherits(ClientBidiObjectStream, Duplex); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Class for representing a gRPC bidi streaming call as a Node stream on the |
|
|
|
|
* client side. Extends from stream.Duplex. |
|
|
|
|
* @constructor |
|
|
|
|
* @param {stream} stream Underlying binary Duplex stream for the call |
|
|
|
|
*/ |
|
|
|
|
function ClientBidiObjectStream(stream) { |
|
|
|
|
var options = {objectMode: true}; |
|
|
|
|
Duplex.call(this, options); |
|
|
|
|
this._stream = stream; |
|
|
|
|
var self = this; |
|
|
|
|
forwardEvent(stream, this, 'status'); |
|
|
|
|
forwardEvent(stream, this, 'metadata'); |
|
|
|
|
this._stream.on('data', function forwardData(chunk) { |
|
|
|
|
if (!self.push(chunk)) { |
|
|
|
|
self._stream.pause(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
this._stream.pause(); |
|
|
|
|
this.on('finish', function() { |
|
|
|
|
this._stream.end(); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* _read implementation for both types of streams that allow reading. |
|
|
|
|
* @this {ClientReadableObjectStream|ClientBidiObjectStream} |
|
|
|
|
* @this {ClientReadableObjectStream} |
|
|
|
|
* @param {number} size Ignored |
|
|
|
|
*/ |
|
|
|
|
function _read(size) { |
|
|
|
@ -138,14 +111,10 @@ function _read(size) { |
|
|
|
|
* See docs for _read |
|
|
|
|
*/ |
|
|
|
|
ClientReadableObjectStream.prototype._read = _read; |
|
|
|
|
/** |
|
|
|
|
* See docs for _read |
|
|
|
|
*/ |
|
|
|
|
ClientBidiObjectStream.prototype._read = _read; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* _write implementation for both types of streams that allow writing |
|
|
|
|
* @this {ClientWritableObjectStream|ClientBidiObjectStream} |
|
|
|
|
* @this {ClientWritableObjectStream} |
|
|
|
|
* @param {*} chunk The value to write to the stream |
|
|
|
|
* @param {string} encoding Ignored |
|
|
|
|
* @param {function(Error)} callback Callback to call when finished writing |
|
|
|
@ -158,10 +127,6 @@ function _write(chunk, encoding, callback) { |
|
|
|
|
* See docs for _write |
|
|
|
|
*/ |
|
|
|
|
ClientWritableObjectStream.prototype._write = _write; |
|
|
|
|
/** |
|
|
|
|
* See docs for _write |
|
|
|
|
*/ |
|
|
|
|
ClientBidiObjectStream.prototype._write = _write; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Get a function that can make unary requests to the specified method. |
|
|
|
@ -297,8 +262,7 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { |
|
|
|
|
function makeBidiStreamRequest(metadata, deadline) { |
|
|
|
|
var stream = client.makeRequest(this.channel, method, serialize, |
|
|
|
|
deserialize, metadata, deadline); |
|
|
|
|
var obj_stream = new ClientBidiObjectStream(stream); |
|
|
|
|
return obj_stream; |
|
|
|
|
return stream; |
|
|
|
|
} |
|
|
|
|
return makeBidiStreamRequest; |
|
|
|
|
} |
|
|
|
|