diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index ce18f77fe71..8737af6cde9 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -145,8 +145,8 @@ function serverStreaming(client, done) { resp_index += 1; }); call.on('status', function(status) { - assert.strictEqual(resp_index, 4); assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(resp_index, 4); if (done) { done(); } diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js index 54e9715d1e3..271fe5962e3 100644 --- a/src/node/interop/interop_server.js +++ b/src/node/interop/interop_server.js @@ -106,6 +106,7 @@ function handleStreamingOutput(call) { testProto.PayloadType.COMPRESSABLE, testProto.PayloadType.UNCOMPRESSABLE][Math.random() < 0.5 ? 0 : 1]; } + console.log('req:', req); _.each(req.response_parameters, function(resp_param) { call.write({ payload: { diff --git a/src/node/src/client.js b/src/node/src/client.js index 88fa9dc2e25..36a6f41d7bb 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -56,6 +56,7 @@ function ClientWritableStream(call, serialize) { this.call = call; this.serialize = common.wrapIgnoreNull(serialize); this.on('finish', function() { + console.log('Send close from client'); var batch = {}; batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; call.startBatch(batch, function() {}); @@ -90,7 +91,7 @@ function ClientReadableStream(call, deserialize) { this.call = call; this.finished = false; this.reading = false; - this.serialize = common.wrapIgnoreNull(deserialize); + this.deserialize = common.wrapIgnoreNull(deserialize); } function _read(size) { @@ -100,12 +101,15 @@ function _read(size) { * the read queue and starts reading again if applicable * @param {grpc.Event} event READ event object */ - function readCallback(event) { + function readCallback(err, event) { + if (err) { + throw err; + } if (self.finished) { self.push(null); return; } - var data = event.data; + var data = event.read; if (self.push(self.deserialize(data)) && data != null) { var read_batch = {}; read_batch[grpc.opType.RECV_MESSAGE] = true; @@ -142,12 +146,18 @@ util.inherits(ClientDuplexStream, Duplex); function ClientDuplexStream(call, serialize, deserialize) { Duplex.call(this, {objectMode: true}); this.serialize = common.wrapIgnoreNull(serialize); - this.serialize = common.wrapIgnoreNull(deserialize); + this.deserialize = common.wrapIgnoreNull(deserialize); var self = this; var finished = false; // Indicates that a read is currently pending var reading = false; this.call = call; + this.on('finish', function() { + console.log('Send close from client'); + var batch = {}; + batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + call.startBatch(batch, function() {}); + }); } ClientDuplexStream.prototype._read = _read; @@ -208,6 +218,10 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { callback(err); return; } + if (response.status.code != grpc.status.OK) { + callback(response.status); + return; + } emitter.emit('status', response.status); emitter.emit('metadata', response.metadata); callback(null, deserialize(response.read)); @@ -265,6 +279,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { callback(err); return; } + console.log(response); + if (response.status.code != grpc.status.OK) { + callback(response.status); + return; + } stream.emit('status', response.status); callback(null, deserialize(response.read)); }); diff --git a/src/node/src/server.js b/src/node/src/server.js index 2d5396e3b71..3c214913195 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -89,11 +89,13 @@ function sendUnaryResponse(call, value, serialize) { function setUpWritable(stream, serialize) { stream.finished = false; stream.status = { - 'code' : grpc.status.OK, - 'details' : 'OK' + code : grpc.status.OK, + details : 'OK', + metadata : {} }; stream.serialize = common.wrapIgnoreNull(serialize); function sendStatus() { + console.log('Server sending status'); var batch = {}; batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; stream.call.startBatch(batch, function(){}); @@ -115,7 +117,7 @@ function setUpWritable(stream, serialize) { details = err.details; } } - stream.status = {'code': code, 'details': details}; + stream.status = {code: code, details: details, metadata: {}}; } /** * Terminate the call. This includes indicating that reads are done, draining @@ -167,6 +169,7 @@ function ServerWritableStream(call, serialize) { function _write(chunk, encoding, callback) { var batch = {}; batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + console.log('Server writing', batch); this.call.startBatch(batch, function(err, value) { if (err) { this.emit('error', err); @@ -204,11 +207,14 @@ function _read(size) { return; } if (self.finished) { + console.log('Pushing null'); self.push(null); return; } var data = event.read; + console.log(data); if (self.push(self.deserialize(data)) && data != null) { + console.log('Reading again'); var read_batch = {}; read_batch[grpc.opType.RECV_MESSAGE] = true; self.call.startBatch(read_batch, readCallback); @@ -234,6 +240,7 @@ util.inherits(ServerDuplexStream, Duplex); function ServerDuplexStream(call, serialize, deserialize) { Duplex.call(this, {objectMode: true}); + this.call = call; setUpWritable(this, serialize); setUpReadable(this, deserialize); } @@ -280,7 +287,7 @@ function handleServerStreaming(call, handler, metadata) { stream.emit('error', err); return; } - stream.request = result.read; + stream.request = handler.deserialize(result.read); handler.func(stream); }); } diff --git a/src/node/src/surface_client.js b/src/node/src/surface_client.js deleted file mode 100644 index 16c31809f43..00000000000 --- a/src/node/src/surface_client.js +++ /dev/null @@ -1,357 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var _ = require('underscore'); - -var capitalize = require('underscore.string/capitalize'); -var decapitalize = require('underscore.string/decapitalize'); - -var client = require('./client.js'); - -var common = require('./common.js'); - -var EventEmitter = require('events').EventEmitter; - -var stream = require('stream'); - -var Readable = stream.Readable; -var Writable = stream.Writable; -var Duplex = stream.Duplex; -var util = require('util'); - - -function forwardEvent(fromEmitter, toEmitter, event) { - fromEmitter.on(event, function forward() { - _.partial(toEmitter.emit, event).apply(toEmitter, arguments); - }); -} - -util.inherits(ClientReadableObjectStream, Readable); - -/** - * Class for representing a gRPC server streaming call as a Node stream on the - * client side. Extends from stream.Readable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ClientReadableObjectStream(stream) { - var options = {objectMode: true}; - Readable.call(this, options); - this._stream = stream; - var self = this; - forwardEvent(stream, this, 'status'); - forwardEvent(stream, this, 'metadata'); - this._stream.on('data', function forwardData(chunk) { - if (!self.push(chunk)) { - self._stream.pause(); - } - }); - this._stream.pause(); -} - -/** - * _read implementation for both types of streams that allow reading. - * @this {ClientReadableObjectStream} - * @param {number} size Ignored - */ -function _read(size) { - this._stream.resume(); -} - -/** - * See docs for _read - */ -ClientReadableObjectStream.prototype._read = _read; - -util.inherits(ClientWritableObjectStream, Writable); - -/** - * Class for representing a gRPC client streaming call as a Node stream on the - * client side. Extends from stream.Writable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ClientWritableObjectStream(stream) { - var options = {objectMode: true}; - Writable.call(this, options); - this._stream = stream; - forwardEvent(stream, this, 'status'); - forwardEvent(stream, this, 'metadata'); - this.on('finish', function() { - this._stream.end(); - }); -} - -/** - * _write implementation for both types of streams that allow writing - * @this {ClientWritableObjectStream} - * @param {*} chunk The value to write to the stream - * @param {string} encoding Ignored - * @param {function(Error)} callback Callback to call when finished writing - */ -function _write(chunk, encoding, callback) { - this._stream.write(chunk, encoding, callback); -} - -/** - * See docs for _write - */ -ClientWritableObjectStream.prototype._write = _write; - -/** - * Cancel the underlying call - */ -function cancel() { - this._stream.cancel(); -} - -ClientReadableObjectStream.prototype.cancel = cancel; -ClientWritableObjectStream.prototype.cancel = cancel; - -/** - * Get a function that can make unary requests to the specified method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeUnaryRequest - */ -function makeUnaryRequestFunction(method, serialize, deserialize) { - /** - * Make a unary request with this method on the given channel with the given - * argument, callback, etc. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {*} argument The argument to the call. Should be serializable with - * serialize - * @param {function(?Error, value=)} callback The callback to for when the - * response is received - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeUnaryRequest(argument, callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - var emitter = new EventEmitter(); - emitter.cancel = function cancel() { - stream.cancel(); - }; - forwardEvent(stream, emitter, 'status'); - forwardEvent(stream, emitter, 'metadata'); - stream.write(argument); - stream.end(); - stream.on('data', function forwardData(chunk) { - try { - callback(null, chunk); - } catch (e) { - callback(e); - } - }); - stream.on('status', function forwardStatus(status) { - if (status.code !== client.status.OK) { - callback(status); - } - }); - return emitter; - } - return makeUnaryRequest; -} - -/** - * Get a function that can make client stream requests to the specified method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeClientStreamRequest - */ -function makeClientStreamRequestFunction(method, serialize, deserialize) { - /** - * Make a client stream request with this method on the given channel with the - * given callback, etc. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {function(?Error, value=)} callback The callback to for when the - * response is received - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeClientStreamRequest(callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - var obj_stream = new ClientWritableObjectStream(stream); - stream.on('data', function forwardData(chunk) { - try { - callback(null, chunk); - } catch (e) { - callback(e); - } - }); - stream.on('status', function forwardStatus(status) { - if (status.code !== client.status.OK) { - callback(status); - } - }); - return obj_stream; - } - return makeClientStreamRequest; -} - -/** - * Get a function that can make server stream requests to the specified method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeServerStreamRequest - */ -function makeServerStreamRequestFunction(method, serialize, deserialize) { - /** - * Make a server stream request with this method on the given channel with the - * given argument, etc. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {*} argument The argument to the call. Should be serializable with - * serialize - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeServerStreamRequest(argument, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - var obj_stream = new ClientReadableObjectStream(stream); - stream.write(argument); - stream.end(); - return obj_stream; - } - return makeServerStreamRequest; -} - -/** - * Get a function that can make bidirectional stream requests to the specified - * method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeBidiStreamRequest - */ -function makeBidiStreamRequestFunction(method, serialize, deserialize) { - /** - * Make a bidirectional stream request with this method on the given channel. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeBidiStreamRequest(metadata, deadline) { - return client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - } - return makeBidiStreamRequest; -} - -/** - * Map with short names for each of the requester maker functions. Used in - * makeClientConstructor - */ -var requester_makers = { - unary: makeUnaryRequestFunction, - server_stream: makeServerStreamRequestFunction, - client_stream: makeClientStreamRequestFunction, - bidi: makeBidiStreamRequestFunction -} - -/** - * Creates a constructor for clients for the given service - * @param {ProtoBuf.Reflect.Service} service The service to generate a client - * for - * @return {function(string, Object)} New client constructor - */ -function makeClientConstructor(service) { - var prefix = '/' + common.fullyQualifiedName(service) + '/'; - /** - * Create a client with the given methods - * @constructor - * @param {string} address The address of the server to connect to - * @param {Object} options Options to pass to the underlying channel - */ - function SurfaceClient(address, options) { - this.channel = new client.Channel(address, options); - } - - _.each(service.children, function(method) { - var method_type; - if (method.requestStream) { - if (method.responseStream) { - method_type = 'bidi'; - } else { - method_type = 'client_stream'; - } - } else { - if (method.responseStream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; - } - } - SurfaceClient.prototype[decapitalize(method.name)] = - requester_makers[method_type]( - prefix + capitalize(method.name), - common.serializeCls(method.resolvedRequestType.build()), - common.deserializeCls(method.resolvedResponseType.build())); - }); - - SurfaceClient.service = service; - - return SurfaceClient; -} - -exports.makeClientConstructor = makeClientConstructor; - -/** - * See docs for client.status - */ -exports.status = client.status; -/** - * See docs for client.callError - */ -exports.callError = client.callError; diff --git a/src/node/src/surface_server.js b/src/node/src/surface_server.js deleted file mode 100644 index a47d1fa23d5..00000000000 --- a/src/node/src/surface_server.js +++ /dev/null @@ -1,340 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var _ = require('underscore'); - -var capitalize = require('underscore.string/capitalize'); -var decapitalize = require('underscore.string/decapitalize'); - -var Server = require('./server.js'); - -var stream = require('stream'); - -var Readable = stream.Readable; -var Writable = stream.Writable; -var Duplex = stream.Duplex; -var util = require('util'); - -var common = require('./common.js'); - -util.inherits(ServerReadableObjectStream, Readable); - -/** - * Class for representing a gRPC client streaming call as a Node stream on the - * server side. Extends from stream.Readable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ServerReadableObjectStream(stream) { - var options = {objectMode: true}; - Readable.call(this, options); - this._stream = stream; - Object.defineProperty(this, 'cancelled', { - get: function() { return stream.cancelled; } - }); - var self = this; - this._stream.on('cancelled', function() { - self.emit('cancelled'); - }); - this._stream.on('data', function forwardData(chunk) { - if (!self.push(chunk)) { - self._stream.pause(); - } - }); - this._stream.on('end', function forwardEnd() { - self.push(null); - }); - this._stream.pause(); -} - -/** - * _read implementation for both types of streams that allow reading. - * @this {ServerReadableObjectStream|ServerBidiObjectStream} - * @param {number} size Ignored - */ -function _read(size) { - this._stream.resume(); -} - -/** - * See docs for _read - */ -ServerReadableObjectStream.prototype._read = _read; - -util.inherits(ServerWritableObjectStream, Writable); - -/** - * Class for representing a gRPC server streaming call as a Node stream on the - * server side. Extends from stream.Writable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ServerWritableObjectStream(stream) { - var options = {objectMode: true}; - Writable.call(this, options); - this._stream = stream; - this._stream.on('cancelled', function() { - self.emit('cancelled'); - }); - this.on('finish', function() { - this._stream.end(); - }); -} - -/** - * _write implementation for both types of streams that allow writing - * @this {ServerWritableObjectStream} - * @param {*} chunk The value to write to the stream - * @param {string} encoding Ignored - * @param {function(Error)} callback Callback to call when finished writing - */ -function _write(chunk, encoding, callback) { - this._stream.write(chunk, encoding, callback); -} - -/** - * See docs for _write - */ -ServerWritableObjectStream.prototype._write = _write; - -/** - * Creates a binary stream handler function from a unary handler function - * @param {function(Object, function(Error, *), metadata=)} handler Unary call - * handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeUnaryHandler(handler) { - /** - * Handles a stream by reading a single data value, passing it to the handler, - * and writing the response back to the stream. - * @param {stream} stream Binary data stream - * @param {metadata=} metadata Incoming metadata array - */ - return function handleUnaryCall(stream, metadata) { - stream.on('data', function handleUnaryData(value) { - var call = {request: value}; - Object.defineProperty(call, 'cancelled', { - get: function() { return stream.cancelled;} - }); - stream.on('cancelled', function() { - call.emit('cancelled'); - }); - handler(call, function sendUnaryData(err, value) { - if (err) { - stream.emit('error', err); - } else { - stream.write(value); - stream.end(); - } - }, metadata); - }); - }; -} - -/** - * Creates a binary stream handler function from a client stream handler - * function - * @param {function(Readable, function(Error, *), metadata=)} handler Client - * stream call handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeClientStreamHandler(handler) { - /** - * Handles a stream by passing a deserializing stream to the handler and - * writing the response back to the stream. - * @param {stream} stream Binary data stream - * @param {metadata=} metadata Incoming metadata array - */ - return function handleClientStreamCall(stream, metadata) { - var object_stream = new ServerReadableObjectStream(stream); - handler(object_stream, function sendClientStreamData(err, value) { - if (err) { - stream.emit('error', err); - } else { - stream.write(value); - stream.end(); - } - }, metadata); - }; -} - -/** - * Creates a binary stream handler function from a server stream handler - * function - * @param {function(Writable, metadata=)} handler Server stream call handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeServerStreamHandler(handler) { - /** - * Handles a stream by attaching it to a serializing stream, and passing it to - * the handler. - * @param {stream} stream Binary data stream - * @param {metadata=} metadata Incoming metadata array - */ - return function handleServerStreamCall(stream, metadata) { - stream.on('data', function handleClientData(value) { - var object_stream = new ServerWritableObjectStream(stream); - object_stream.request = value; - handler(object_stream, metadata); - }); - }; -} - -/** - * Creates a binary stream handler function from a bidi stream handler function - * @param {function(Duplex, metadata=)} handler Unary call handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeBidiStreamHandler(handler) { - return handler; -} - -/** - * Map with short names for each of the handler maker functions. Used in - * makeServerConstructor - */ -var handler_makers = { - unary: makeUnaryHandler, - server_stream: makeServerStreamHandler, - client_stream: makeClientStreamHandler, - bidi: makeBidiStreamHandler -}; - -/** - * Creates a constructor for servers with a service defined by the methods - * object. The methods object has string keys and values of this form: - * {serialize: function, deserialize: function, client_stream: bool, - * server_stream: bool} - * @param {Object} methods Method descriptor for each method the server should - * expose - * @param {string} prefix The prefex to prepend to each method name - * @return {function(Object, Object)} New server constructor - */ -function makeServerConstructor(services) { - var qual_names = []; - _.each(services, function(service) { - _.each(service.children, function(method) { - var name = common.fullyQualifiedName(method); - if (_.indexOf(qual_names, name) !== -1) { - throw new Error('Method ' + name + ' exposed by more than one service'); - } - qual_names.push(name); - }); - }); - /** - * Create a server with the given handlers for all of the methods. - * @constructor - * @param {Object} service_handlers Map from service names to map from method - * names to handlers - * @param {function(string, Object>): - Object>=} getMetadata Callback that - * gets metatada for a given method - * @param {Object=} options Options to pass to the underlying server - */ - function SurfaceServer(service_handlers, getMetadata, options) { - var server = new Server(getMetadata, options); - this.inner_server = server; - _.each(services, function(service) { - var service_name = common.fullyQualifiedName(service); - if (service_handlers[service_name] === undefined) { - throw new Error('Handlers for service ' + - service_name + ' not provided.'); - } - var prefix = '/' + common.fullyQualifiedName(service) + '/'; - _.each(service.children, function(method) { - var method_type; - if (method.requestStream) { - if (method.responseStream) { - method_type = 'bidi'; - } else { - method_type = 'client_stream'; - } - } else { - if (method.responseStream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; - } - } - if (service_handlers[service_name][decapitalize(method.name)] === - undefined) { - throw new Error('Method handler for ' + - common.fullyQualifiedName(method) + ' not provided.'); - } - var binary_handler = handler_makers[method_type]( - service_handlers[service_name][decapitalize(method.name)]); - var serialize = common.serializeCls( - method.resolvedResponseType.build()); - var deserialize = common.deserializeCls( - method.resolvedRequestType.build()); - server.register(prefix + capitalize(method.name), binary_handler, - serialize, deserialize); - }); - }, this); - } - - /** - * Binds the server to the given port, with SSL enabled if secure is specified - * @param {string} port The port that the server should bind on, in the format - * "address:port" - * @param {boolean=} secure Whether the server should open a secure port - * @return {SurfaceServer} this - */ - SurfaceServer.prototype.bind = function(port, secure) { - return this.inner_server.bind(port, secure); - }; - - /** - * Starts the server listening on any bound ports - * @return {SurfaceServer} this - */ - SurfaceServer.prototype.listen = function() { - this.inner_server.start(); - return this; - }; - - /** - * Shuts the server down; tells it to stop listening for new requests and to - * kill old requests. - */ - SurfaceServer.prototype.shutdown = function() { - this.inner_server.shutdown(); - }; - - return SurfaceServer; -} - -/** - * See documentation for makeServerConstructor - */ -exports.makeServerConstructor = makeServerConstructor; diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js deleted file mode 100644 index 1db9f694678..00000000000 --- a/src/node/test/client_server_test.js +++ /dev/null @@ -1,255 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var assert = require('assert'); -var fs = require('fs'); -var path = require('path'); -var grpc = require('bindings')('grpc.node'); -var Server = require('../src/server'); -var client = require('../src/client'); -var common = require('../src/common'); - -var ca_path = path.join(__dirname, 'data/ca.pem'); - -var key_path = path.join(__dirname, 'data/server1.key'); - -var pem_path = path.join(__dirname, 'data/server1.pem'); - -/** - * Helper function to return an absolute deadline given a relative timeout in - * seconds. - * @param {number} timeout_secs The number of seconds to wait before timing out - * @return {Date} A date timeout_secs in the future - */ -function getDeadline(timeout_secs) { - var deadline = new Date(); - deadline.setSeconds(deadline.getSeconds() + timeout_secs); - return deadline; -} - -/** - * Responds to every request with the same data as a response - * @param {Stream} stream - */ -function echoHandler(stream) { - stream.pipe(stream); -} - -/** - * Responds to every request with an error status - * @param {Stream} stream - */ -function errorHandler(stream) { - throw { - 'code' : grpc.status.UNIMPLEMENTED, - 'details' : 'error details' - }; -} - -/** - * Wait for a cancellation instead of responding - * @param {Stream} stream - */ -function cancelHandler(stream) { - // do nothing -} - -function metadataHandler(stream, metadata) { - stream.end(); -} - -/** - * Serialize a string to a Buffer - * @param {string} value The string to serialize - * @return {Buffer} The serialized value - */ -function stringSerialize(value) { - return new Buffer(value); -} - -/** - * Deserialize a Buffer to a string - * @param {Buffer} buffer The buffer to deserialize - * @return {string} The string value of the buffer - */ -function stringDeserialize(buffer) { - return buffer.toString(); -} - -describe('echo client', function() { - var server; - var channel; - before(function() { - server = new Server(function getMetadata(method, metadata) { - return {method: [method]}; - }); - var port_num = server.bind('0.0.0.0:0'); - server.register('echo', echoHandler); - server.register('error', errorHandler); - server.register('cancellation', cancelHandler); - server.register('metadata', metadataHandler); - server.start(); - - channel = new grpc.Channel('localhost:' + port_num); - }); - after(function() { - server.shutdown(); - }); - it('should receive echo responses', function(done) { - var messages = ['echo1', 'echo2', 'echo3', 'echo4']; - var stream = client.makeRequest( - channel, - 'echo', - stringSerialize, - stringDeserialize); - for (var i = 0; i < messages.length; i++) { - stream.write(messages[i]); - } - stream.end(); - var index = 0; - stream.on('data', function(chunk) { - assert.equal(messages[index], chunk); - index += 1; - }); - stream.on('status', function(status) { - assert.equal(status.code, client.status.OK); - }); - stream.on('end', function() { - assert.equal(index, messages.length); - done(); - }); - }); - it('should recieve metadata set by the server', function(done) { - var stream = client.makeRequest(channel, 'metadata'); - stream.on('metadata', function(metadata) { - assert.strictEqual(metadata.method[0].toString(), 'metadata'); - }); - stream.on('status', function(status) { - assert.equal(status.code, client.status.OK); - done(); - }); - stream.end(); - }); - it('should get an error status that the server throws', function(done) { - var stream = client.makeRequest(channel, 'error'); - - stream.on('data', function() {}); - stream.write(new Buffer('test')); - stream.end(); - stream.on('status', function(status) { - assert.equal(status.code, grpc.status.UNIMPLEMENTED); - assert.equal(status.details, 'error details'); - done(); - }); - }); - it('should be able to cancel a call', function(done) { - var stream = client.makeRequest( - channel, - 'cancellation', - null, - getDeadline(1)); - - stream.cancel(); - stream.on('status', function(status) { - assert.equal(status.code, grpc.status.CANCELLED); - done(); - }); - }); - it('should get correct status for unimplemented method', function(done) { - var stream = client.makeRequest(channel, 'unimplemented_method'); - stream.end(); - stream.on('status', function(status) { - assert.equal(status.code, grpc.status.UNIMPLEMENTED); - done(); - }); - }); -}); -/* TODO(mlumish): explore options for reducing duplication between this test - * and the insecure echo client test */ -describe('secure echo client', function() { - var server; - var channel; - before(function(done) { - fs.readFile(ca_path, function(err, ca_data) { - assert.ifError(err); - fs.readFile(key_path, function(err, key_data) { - assert.ifError(err); - fs.readFile(pem_path, function(err, pem_data) { - assert.ifError(err); - var creds = grpc.Credentials.createSsl(ca_data); - var server_creds = grpc.ServerCredentials.createSsl(null, - key_data, - pem_data); - - server = new Server(null, {'credentials' : server_creds}); - var port_num = server.bind('0.0.0.0:0', true); - server.register('echo', echoHandler); - server.start(); - - channel = new grpc.Channel('localhost:' + port_num, { - 'grpc.ssl_target_name_override' : 'foo.test.google.com', - 'credentials' : creds - }); - done(); - }); - }); - }); - }); - after(function() { - server.shutdown(); - }); - it('should recieve echo responses', function(done) { - var messages = ['echo1', 'echo2', 'echo3', 'echo4']; - var stream = client.makeRequest( - channel, - 'echo', - stringSerialize, - stringDeserialize); - for (var i = 0; i < messages.length; i++) { - stream.write(messages[i]); - } - stream.end(); - var index = 0; - stream.on('data', function(chunk) { - assert.equal(messages[index], chunk); - index += 1; - }); - stream.on('status', function(status) { - assert.equal(status.code, client.status.OK); - }); - stream.on('end', function() { - assert.equal(index, messages.length); - done(); - }); - }); -}); diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index 7ecaad833d4..81cd9fa5b95 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -56,7 +56,7 @@ describe('Interop tests', function() { interop_client.runTest(port, name_override, 'empty_unary', true, done); }); // This fails due to an unknown bug - it.skip('should pass large_unary', function(done) { + it('should pass large_unary', function(done) { interop_client.runTest(port, name_override, 'large_unary', true, done); }); it('should pass client_streaming', function(done) { diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js index f347b18ea0f..61b4a2fa2fe 100644 --- a/src/node/test/math_client_test.js +++ b/src/node/test/math_client_test.js @@ -66,7 +66,7 @@ describe('Math client', function() { done(); }); }); - it.only('should handle a server streaming request', function(done) { + it('should handle a server streaming request', function(done) { var call = math_client.fib({limit: 7}); var expected_results = [1, 1, 2, 3, 5, 8, 13]; var next_expected = 0; diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js deleted file mode 100644 index a3e1edf50f9..00000000000 --- a/src/node/test/server_test.js +++ /dev/null @@ -1,122 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var assert = require('assert'); -var grpc = require('bindings')('grpc.node'); -var Server = require('../src/server'); - -/** - * This is used for testing functions with multiple asynchronous calls that - * can happen in different orders. This should be passed the number of async - * function invocations that can occur last, and each of those should call this - * function's return value - * @param {function()} done The function that should be called when a test is - * complete. - * @param {number} count The number of calls to the resulting function if the - * test passes. - * @return {function()} The function that should be called at the end of each - * sequence of asynchronous functions. - */ -function multiDone(done, count) { - return function() { - count -= 1; - if (count <= 0) { - done(); - } - }; -} - -/** - * Responds to every request with the same data as a response - * @param {Stream} stream - */ -function echoHandler(stream) { - stream.pipe(stream); -} - -describe('echo server', function() { - var server; - var channel; - before(function() { - server = new Server(); - var port_num = server.bind('[::]:0'); - server.register('echo', echoHandler); - server.start(); - - channel = new grpc.Channel('localhost:' + port_num); - }); - after(function() { - server.shutdown(); - }); - it('should echo inputs as responses', function(done) { - done = multiDone(done, 4); - - var req_text = 'echo test string'; - var status_text = 'OK'; - - var deadline = new Date(); - deadline.setSeconds(deadline.getSeconds() + 3); - var call = new grpc.Call(channel, - 'echo', - deadline); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - done(); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); - done(); - }, 0); - call.startWrite( - new Buffer(req_text), - function(event) { - assert.strictEqual(event.type, - grpc.completionType.WRITE_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }, 0); - call.startRead(function(event) { - assert.strictEqual(event.type, grpc.completionType.READ); - assert.strictEqual(event.data.toString(), req_text); - done(); - }); - }); -}); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 1038f9ab337..34e4ab4013c 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -33,9 +33,9 @@ var assert = require('assert'); -var surface_server = require('../src/surface_server.js'); +var surface_server = require('../src/server.js'); -var surface_client = require('../src/surface_client.js'); +var surface_client = require('../src/client.js'); var ProtoBuf = require('protobufjs');