From c92499d1d91091d4a2cd437f46534a353fb0bee6 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 23 Jan 2015 14:52:01 -0800 Subject: [PATCH 1/4] Removed some duplicate stream code --- src/node/client.js | 6 +-- src/node/server.js | 2 +- src/node/surface_client.js | 56 +++++++++------------ src/node/surface_server.js | 73 ++++++++++------------------ src/node/test/interop_sanity_test.js | 2 +- 5 files changed, 56 insertions(+), 83 deletions(-) diff --git a/src/node/client.js b/src/node/client.js index 2fefd14bbc5..7007852b939 100644 --- a/src/node/client.js +++ b/src/node/client.js @@ -105,7 +105,7 @@ function GrpcClientStream(call, serialize, deserialize) { return; } var data = event.data; - if (self.push(data) && data != null) { + if (self.push(self.deserialize(data)) && data != null) { self._call.startRead(readCallback); } else { reading = false; @@ -155,7 +155,7 @@ GrpcClientStream.prototype._read = function(size) { */ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { var self = this; - self._call.startWrite(chunk, function(event) { + self._call.startWrite(self.serialize(chunk), function(event) { callback(); }, 0); }; @@ -185,7 +185,7 @@ function makeRequest(channel, if (metadata) { call.addMetadata(metadata); } - return new GrpcClientStream(call); + return new GrpcClientStream(call, serialize, deserialize); } /** diff --git a/src/node/server.js b/src/node/server.js index eca20aa5fd0..fc7144a9c14 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -151,7 +151,7 @@ function GrpcServerStream(call, serialize, deserialize) { return; } var data = event.data; - if (self.push(deserialize(data)) && data != null) { + if (self.push(self.deserialize(data)) && data != null) { self._call.startRead(readCallback); } else { reading = false; diff --git a/src/node/surface_client.js b/src/node/surface_client.js index 996e3d101fc..8996f0be40e 100644 --- a/src/node/surface_client.js +++ b/src/node/surface_client.js @@ -63,18 +63,16 @@ util.inherits(ClientReadableObjectStream, Readable); * client side. Extends from stream.Readable. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options */ -function ClientReadableObjectStream(stream, deserialize, options) { - options = _.extend(options, {objectMode: true}); +function ClientReadableObjectStream(stream) { + var options = {objectMode: true}; Readable.call(this, options); this._stream = stream; var self = this; forwardEvent(stream, this, 'status'); forwardEvent(stream, this, 'metadata'); this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { + if (!self.push(chunk)) { self._stream.pause(); } }); @@ -88,14 +86,11 @@ util.inherits(ClientWritableObjectStream, Writable); * client side. Extends from stream.Writable. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {object} options Stream options */ -function ClientWritableObjectStream(stream, serialize, options) { - options = _.extend(options, {objectMode: true}); +function ClientWritableObjectStream(stream) { + var options = {objectMode: true}; Writable.call(this, options); this._stream = stream; - this._serialize = serialize; forwardEvent(stream, this, 'status'); forwardEvent(stream, this, 'metadata'); this.on('finish', function() { @@ -111,20 +106,16 @@ util.inherits(ClientBidiObjectStream, Duplex); * client side. Extends from stream.Duplex. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options */ -function ClientBidiObjectStream(stream, serialize, deserialize, options) { - options = _.extend(options, {objectMode: true}); +function ClientBidiObjectStream(stream) { + var options = {objectMode: true}; Duplex.call(this, options); this._stream = stream; - this._serialize = serialize; var self = this; forwardEvent(stream, this, 'status'); forwardEvent(stream, this, 'metadata'); this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { + if (!self.push(chunk)) { self._stream.pause(); } }); @@ -160,7 +151,7 @@ ClientBidiObjectStream.prototype._read = _read; * @param {function(Error)} callback Callback to call when finished writing */ function _write(chunk, encoding, callback) { - this._stream.write(this._serialize(chunk), encoding, callback); + this._stream.write(chunk, encoding, callback); } /** @@ -196,15 +187,16 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeUnaryRequest(argument, callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); var emitter = new EventEmitter(); forwardEvent(stream, emitter, 'status'); forwardEvent(stream, emitter, 'metadata'); - stream.write(serialize(argument)); + stream.write(argument); stream.end(); stream.on('data', function forwardData(chunk) { try { - callback(null, deserialize(chunk)); + callback(null, chunk); } catch (e) { callback(e); } @@ -236,11 +228,12 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeClientStreamRequest(callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientWritableObjectStream(stream, serialize, {}); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); + var obj_stream = new ClientWritableObjectStream(stream); stream.on('data', function forwardData(chunk) { try { - callback(null, deserialize(chunk)); + callback(null, chunk); } catch (e) { callback(e); } @@ -272,9 +265,10 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeServerStreamRequest(argument, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientReadableObjectStream(stream, deserialize, {}); - stream.write(serialize(argument)); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); + var obj_stream = new ClientReadableObjectStream(stream); + stream.write(argument); stream.end(); return obj_stream; } @@ -301,11 +295,9 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeBidiStreamRequest(metadata, deadline) { - var stream = client.makeRequest(this.channel, method, metadata, deadline); - var obj_stream = new ClientBidiObjectStream(stream, - serialize, - deserialize, - {}); + var stream = client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); + var obj_stream = new ClientBidiObjectStream(stream); return obj_stream; } return makeBidiStreamRequest; diff --git a/src/node/surface_server.js b/src/node/surface_server.js index bc688839fe5..28af5ab42f9 100644 --- a/src/node/surface_server.js +++ b/src/node/surface_server.js @@ -54,11 +54,9 @@ util.inherits(ServerReadableObjectStream, Readable); * server side. Extends from stream.Readable. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options */ -function ServerReadableObjectStream(stream, deserialize, options) { - options = _.extend(options, {objectMode: true}); +function ServerReadableObjectStream(stream) { + var options = {objectMode: true}; Readable.call(this, options); this._stream = stream; Object.defineProperty(this, 'cancelled', { @@ -66,7 +64,7 @@ function ServerReadableObjectStream(stream, deserialize, options) { }); var self = this; this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { + if (!self.push(chunk)) { self._stream.pause(); } }); @@ -83,14 +81,11 @@ util.inherits(ServerWritableObjectStream, Writable); * server side. Extends from stream.Writable. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {object} options Stream options */ -function ServerWritableObjectStream(stream, serialize, options) { - options = _.extend(options, {objectMode: true}); +function ServerWritableObjectStream(stream) { + var options = {objectMode: true}; Writable.call(this, options); this._stream = stream; - this._serialize = serialize; this.on('finish', function() { this._stream.end(); }); @@ -103,18 +98,14 @@ util.inherits(ServerBidiObjectStream, Duplex); * server side. Extends from stream.Duplex. * @constructor * @param {stream} stream Underlying binary Duplex stream for the call - * @param {function(*):Buffer} serialize Function for serializing objects - * @param {function(Buffer)} deserialize Function for deserializing binary data - * @param {object} options Stream options */ -function ServerBidiObjectStream(stream, serialize, deserialize, options) { - options = _.extend(options, {objectMode: true}); +function ServerBidiObjectStream(stream) { + var options = {objectMode: true}; Duplex.call(this, options); this._stream = stream; - this._serialize = serialize; var self = this; this._stream.on('data', function forwardData(chunk) { - if (!self.push(deserialize(chunk))) { + if (!self.push(chunk)) { self._stream.pause(); } }); @@ -153,7 +144,7 @@ ServerBidiObjectStream.prototype._read = _read; * @param {function(Error)} callback Callback to call when finished writing */ function _write(chunk, encoding, callback) { - this._stream.write(this._serialize(chunk), encoding, callback); + this._stream.write(chunk, encoding, callback); } /** @@ -168,11 +159,9 @@ ServerBidiObjectStream.prototype._write = _write; /** * Creates a binary stream handler function from a unary handler function * @param {function(Object, function(Error, *))} handler Unary call handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeUnaryHandler(handler, serialize, deserialize) { +function makeUnaryHandler(handler) { /** * Handles a stream by reading a single data value, passing it to the handler, * and writing the response back to the stream. @@ -180,7 +169,7 @@ function makeUnaryHandler(handler, serialize, deserialize) { */ return function handleUnaryCall(stream) { stream.on('data', function handleUnaryData(value) { - var call = {request: deserialize(value)}; + var call = {request: value}; Object.defineProperty(call, 'cancelled', { get: function() { return stream.cancelled;} }); @@ -188,7 +177,7 @@ function makeUnaryHandler(handler, serialize, deserialize) { if (err) { stream.emit('error', err); } else { - stream.write(serialize(value)); + stream.write(value); stream.end(); } }); @@ -201,23 +190,21 @@ function makeUnaryHandler(handler, serialize, deserialize) { * function * @param {function(Readable, function(Error, *))} handler Client stream call * handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeClientStreamHandler(handler, serialize, deserialize) { +function makeClientStreamHandler(handler) { /** * Handles a stream by passing a deserializing stream to the handler and * writing the response back to the stream. * @param {stream} stream Binary data stream */ return function handleClientStreamCall(stream) { - var object_stream = new ServerReadableObjectStream(stream, deserialize, {}); + var object_stream = new ServerReadableObjectStream(stream); handler(object_stream, function sendClientStreamData(err, value) { if (err) { stream.emit('error', err); } else { - stream.write(serialize(value)); + stream.write(value); stream.end(); } }); @@ -228,11 +215,9 @@ function makeClientStreamHandler(handler, serialize, deserialize) { * Creates a binary stream handler function from a server stream handler * function * @param {function(Writable)} handler Server stream call handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeServerStreamHandler(handler, serialize, deserialize) { +function makeServerStreamHandler(handler) { /** * Handles a stream by attaching it to a serializing stream, and passing it to * the handler. @@ -240,10 +225,8 @@ function makeServerStreamHandler(handler, serialize, deserialize) { */ return function handleServerStreamCall(stream) { stream.on('data', function handleClientData(value) { - var object_stream = new ServerWritableObjectStream(stream, - serialize, - {}); - object_stream.request = deserialize(value); + var object_stream = new ServerWritableObjectStream(stream); + object_stream.request = value; handler(object_stream); }); }; @@ -252,21 +235,16 @@ function makeServerStreamHandler(handler, serialize, deserialize) { /** * Creates a binary stream handler function from a bidi stream handler function * @param {function(Duplex)} handler Unary call handler - * @param {function(*):Buffer} serialize Serialization function - * @param {function(Buffer):*} deserialize Deserialization function * @return {function(stream)} Binary stream handler */ -function makeBidiStreamHandler(handler, serialize, deserialize) { +function makeBidiStreamHandler(handler) { /** * Handles a stream by wrapping it in a serializing and deserializing object * stream, and passing it to the handler. * @param {stream} stream Binary data stream */ return function handleBidiStreamCall(stream) { - var object_stream = new ServerBidiObjectStream(stream, - serialize, - deserialize, - {}); + var object_stream = new ServerBidiObjectStream(stream); handler(object_stream); }; } @@ -341,10 +319,13 @@ function makeServerConstructor(services) { common.fullyQualifiedName(method) + ' not provided.'); } var binary_handler = handler_makers[method_type]( - service_handlers[service_name][decapitalize(method.name)], - common.serializeCls(method.resolvedResponseType.build()), - common.deserializeCls(method.resolvedRequestType.build())); - server.register(prefix + capitalize(method.name), binary_handler); + service_handlers[service_name][decapitalize(method.name)]); + var serialize = common.serializeCls( + method.resolvedResponseType.build()); + var deserialize = common.deserializeCls( + method.resolvedRequestType.build()); + server.register(prefix + capitalize(method.name), binary_handler, + serialize, deserialize); }); }, this); } diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index 3c062b97882..8ea48c359f8 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -65,7 +65,7 @@ describe('Interop tests', function() { it('should pass ping_pong', function(done) { interop_client.runTest(port, name_override, 'ping_pong', true, done); }); - it.skip('should pass empty_stream', function(done) { + it('should pass empty_stream', function(done) { interop_client.runTest(port, name_override, 'empty_stream', true, done); }); }); From 46c2461c8bfc60311f2eb607c35389f6437a02b1 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 23 Jan 2015 15:00:38 -0800 Subject: [PATCH 2/4] Removed unnecessary bidi stream wrappers --- src/node/surface_client.js | 42 +++------------------------------ src/node/surface_server.js | 48 ++------------------------------------ 2 files changed, 5 insertions(+), 85 deletions(-) diff --git a/src/node/surface_client.js b/src/node/surface_client.js index 8996f0be40e..abec999cabb 100644 --- a/src/node/surface_client.js +++ b/src/node/surface_client.js @@ -98,36 +98,9 @@ function ClientWritableObjectStream(stream) { }); } - -util.inherits(ClientBidiObjectStream, Duplex); - -/** - * Class for representing a gRPC bidi streaming call as a Node stream on the - * client side. Extends from stream.Duplex. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ClientBidiObjectStream(stream) { - var options = {objectMode: true}; - Duplex.call(this, options); - this._stream = stream; - var self = this; - forwardEvent(stream, this, 'status'); - forwardEvent(stream, this, 'metadata'); - this._stream.on('data', function forwardData(chunk) { - if (!self.push(chunk)) { - self._stream.pause(); - } - }); - this._stream.pause(); - this.on('finish', function() { - this._stream.end(); - }); -} - /** * _read implementation for both types of streams that allow reading. - * @this {ClientReadableObjectStream|ClientBidiObjectStream} + * @this {ClientReadableObjectStream} * @param {number} size Ignored */ function _read(size) { @@ -138,14 +111,10 @@ function _read(size) { * See docs for _read */ ClientReadableObjectStream.prototype._read = _read; -/** - * See docs for _read - */ -ClientBidiObjectStream.prototype._read = _read; /** * _write implementation for both types of streams that allow writing - * @this {ClientWritableObjectStream|ClientBidiObjectStream} + * @this {ClientWritableObjectStream} * @param {*} chunk The value to write to the stream * @param {string} encoding Ignored * @param {function(Error)} callback Callback to call when finished writing @@ -158,10 +127,6 @@ function _write(chunk, encoding, callback) { * See docs for _write */ ClientWritableObjectStream.prototype._write = _write; -/** - * See docs for _write - */ -ClientBidiObjectStream.prototype._write = _write; /** * Get a function that can make unary requests to the specified method. @@ -297,8 +262,7 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { function makeBidiStreamRequest(metadata, deadline) { var stream = client.makeRequest(this.channel, method, serialize, deserialize, metadata, deadline); - var obj_stream = new ClientBidiObjectStream(stream); - return obj_stream; + return stream; } return makeBidiStreamRequest; } diff --git a/src/node/surface_server.js b/src/node/surface_server.js index 28af5ab42f9..e3c48b13e19 100644 --- a/src/node/surface_server.js +++ b/src/node/surface_server.js @@ -90,34 +90,6 @@ function ServerWritableObjectStream(stream) { this._stream.end(); }); } - -util.inherits(ServerBidiObjectStream, Duplex); - -/** - * Class for representing a gRPC bidi streaming call as a Node stream on the - * server side. Extends from stream.Duplex. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ServerBidiObjectStream(stream) { - var options = {objectMode: true}; - Duplex.call(this, options); - this._stream = stream; - var self = this; - this._stream.on('data', function forwardData(chunk) { - if (!self.push(chunk)) { - self._stream.pause(); - } - }); - this._stream.on('end', function forwardEnd() { - self.push(null); - }); - this._stream.pause(); - this.on('finish', function() { - this._stream.end(); - }); -} - /** * _read implementation for both types of streams that allow reading. * @this {ServerReadableObjectStream|ServerBidiObjectStream} @@ -131,14 +103,10 @@ function _read(size) { * See docs for _read */ ServerReadableObjectStream.prototype._read = _read; -/** - * See docs for _read - */ -ServerBidiObjectStream.prototype._read = _read; /** * _write implementation for both types of streams that allow writing - * @this {ServerWritableObjectStream|ServerBidiObjectStream} + * @this {ServerWritableObjectStream} * @param {*} chunk The value to write to the stream * @param {string} encoding Ignored * @param {function(Error)} callback Callback to call when finished writing @@ -151,10 +119,6 @@ function _write(chunk, encoding, callback) { * See docs for _write */ ServerWritableObjectStream.prototype._write = _write; -/** - * See docs for _write - */ -ServerBidiObjectStream.prototype._write = _write; /** * Creates a binary stream handler function from a unary handler function @@ -238,15 +202,7 @@ function makeServerStreamHandler(handler) { * @return {function(stream)} Binary stream handler */ function makeBidiStreamHandler(handler) { - /** - * Handles a stream by wrapping it in a serializing and deserializing object - * stream, and passing it to the handler. - * @param {stream} stream Binary data stream - */ - return function handleBidiStreamCall(stream) { - var object_stream = new ServerBidiObjectStream(stream); - handler(object_stream); - }; + return handler; } /** From 2ed57b48588270d37846c35694b0d484e912ff4b Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 23 Jan 2015 15:15:46 -0800 Subject: [PATCH 3/4] Moved some code around for clarity --- src/node/surface_client.js | 28 ++++++++++++++-------------- src/node/surface_server.js | 27 ++++++++++++++------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/src/node/surface_client.js b/src/node/surface_client.js index abec999cabb..aba8feeada9 100644 --- a/src/node/surface_client.js +++ b/src/node/surface_client.js @@ -79,6 +79,20 @@ function ClientReadableObjectStream(stream) { this._stream.pause(); } +/** + * _read implementation for both types of streams that allow reading. + * @this {ClientReadableObjectStream} + * @param {number} size Ignored + */ +function _read(size) { + this._stream.resume(); +} + +/** + * See docs for _read + */ +ClientReadableObjectStream.prototype._read = _read; + util.inherits(ClientWritableObjectStream, Writable); /** @@ -98,20 +112,6 @@ function ClientWritableObjectStream(stream) { }); } -/** - * _read implementation for both types of streams that allow reading. - * @this {ClientReadableObjectStream} - * @param {number} size Ignored - */ -function _read(size) { - this._stream.resume(); -} - -/** - * See docs for _read - */ -ClientReadableObjectStream.prototype._read = _read; - /** * _write implementation for both types of streams that allow writing * @this {ClientWritableObjectStream} diff --git a/src/node/surface_server.js b/src/node/surface_server.js index e3c48b13e19..07c5339f62f 100644 --- a/src/node/surface_server.js +++ b/src/node/surface_server.js @@ -74,6 +74,20 @@ function ServerReadableObjectStream(stream) { this._stream.pause(); } +/** + * _read implementation for both types of streams that allow reading. + * @this {ServerReadableObjectStream|ServerBidiObjectStream} + * @param {number} size Ignored + */ +function _read(size) { + this._stream.resume(); +} + +/** + * See docs for _read + */ +ServerReadableObjectStream.prototype._read = _read; + util.inherits(ServerWritableObjectStream, Writable); /** @@ -90,19 +104,6 @@ function ServerWritableObjectStream(stream) { this._stream.end(); }); } -/** - * _read implementation for both types of streams that allow reading. - * @this {ServerReadableObjectStream|ServerBidiObjectStream} - * @param {number} size Ignored - */ -function _read(size) { - this._stream.resume(); -} - -/** - * See docs for _read - */ -ServerReadableObjectStream.prototype._read = _read; /** * _write implementation for both types of streams that allow writing From 4097c1d2010ef378b8bdea65a5e7e4fd6788a698 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 26 Jan 2015 09:21:22 -0800 Subject: [PATCH 4/4] Shortened a function --- src/node/surface_client.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/node/surface_client.js b/src/node/surface_client.js index aba8feeada9..b63ae13e8dc 100644 --- a/src/node/surface_client.js +++ b/src/node/surface_client.js @@ -260,9 +260,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { * @return {EventEmitter} An event emitter for stream related events */ function makeBidiStreamRequest(metadata, deadline) { - var stream = client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - return stream; + return client.makeRequest(this.channel, method, serialize, + deserialize, metadata, deadline); } return makeBidiStreamRequest; }