All tests but one now pass against new API

pull/504/head
murgatroid99 10 years ago
parent e7879557c6
commit 10ac96cb8f
  1. 2
      src/node/interop/interop_client.js
  2. 1
      src/node/interop/interop_server.js
  3. 27
      src/node/src/client.js
  4. 15
      src/node/src/server.js
  5. 357
      src/node/src/surface_client.js
  6. 340
      src/node/src/surface_server.js
  7. 255
      src/node/test/client_server_test.js
  8. 2
      src/node/test/interop_sanity_test.js
  9. 2
      src/node/test/math_client_test.js
  10. 122
      src/node/test/server_test.js
  11. 4
      src/node/test/surface_test.js

@ -145,8 +145,8 @@ function serverStreaming(client, done) {
resp_index += 1; resp_index += 1;
}); });
call.on('status', function(status) { call.on('status', function(status) {
assert.strictEqual(resp_index, 4);
assert.strictEqual(status.code, grpc.status.OK); assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(resp_index, 4);
if (done) { if (done) {
done(); done();
} }

@ -106,6 +106,7 @@ function handleStreamingOutput(call) {
testProto.PayloadType.COMPRESSABLE, testProto.PayloadType.COMPRESSABLE,
testProto.PayloadType.UNCOMPRESSABLE][Math.random() < 0.5 ? 0 : 1]; testProto.PayloadType.UNCOMPRESSABLE][Math.random() < 0.5 ? 0 : 1];
} }
console.log('req:', req);
_.each(req.response_parameters, function(resp_param) { _.each(req.response_parameters, function(resp_param) {
call.write({ call.write({
payload: { payload: {

@ -56,6 +56,7 @@ function ClientWritableStream(call, serialize) {
this.call = call; this.call = call;
this.serialize = common.wrapIgnoreNull(serialize); this.serialize = common.wrapIgnoreNull(serialize);
this.on('finish', function() { this.on('finish', function() {
console.log('Send close from client');
var batch = {}; var batch = {};
batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(batch, function() {}); call.startBatch(batch, function() {});
@ -90,7 +91,7 @@ function ClientReadableStream(call, deserialize) {
this.call = call; this.call = call;
this.finished = false; this.finished = false;
this.reading = false; this.reading = false;
this.serialize = common.wrapIgnoreNull(deserialize); this.deserialize = common.wrapIgnoreNull(deserialize);
} }
function _read(size) { function _read(size) {
@ -100,12 +101,15 @@ function _read(size) {
* the read queue and starts reading again if applicable * the read queue and starts reading again if applicable
* @param {grpc.Event} event READ event object * @param {grpc.Event} event READ event object
*/ */
function readCallback(event) { function readCallback(err, event) {
if (err) {
throw err;
}
if (self.finished) { if (self.finished) {
self.push(null); self.push(null);
return; return;
} }
var data = event.data; var data = event.read;
if (self.push(self.deserialize(data)) && data != null) { if (self.push(self.deserialize(data)) && data != null) {
var read_batch = {}; var read_batch = {};
read_batch[grpc.opType.RECV_MESSAGE] = true; read_batch[grpc.opType.RECV_MESSAGE] = true;
@ -142,12 +146,18 @@ util.inherits(ClientDuplexStream, Duplex);
function ClientDuplexStream(call, serialize, deserialize) { function ClientDuplexStream(call, serialize, deserialize) {
Duplex.call(this, {objectMode: true}); Duplex.call(this, {objectMode: true});
this.serialize = common.wrapIgnoreNull(serialize); this.serialize = common.wrapIgnoreNull(serialize);
this.serialize = common.wrapIgnoreNull(deserialize); this.deserialize = common.wrapIgnoreNull(deserialize);
var self = this; var self = this;
var finished = false; var finished = false;
// Indicates that a read is currently pending // Indicates that a read is currently pending
var reading = false; var reading = false;
this.call = call; this.call = call;
this.on('finish', function() {
console.log('Send close from client');
var batch = {};
batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(batch, function() {});
});
} }
ClientDuplexStream.prototype._read = _read; ClientDuplexStream.prototype._read = _read;
@ -208,6 +218,10 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
callback(err); callback(err);
return; return;
} }
if (response.status.code != grpc.status.OK) {
callback(response.status);
return;
}
emitter.emit('status', response.status); emitter.emit('status', response.status);
emitter.emit('metadata', response.metadata); emitter.emit('metadata', response.metadata);
callback(null, deserialize(response.read)); callback(null, deserialize(response.read));
@ -265,6 +279,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
callback(err); callback(err);
return; return;
} }
console.log(response);
if (response.status.code != grpc.status.OK) {
callback(response.status);
return;
}
stream.emit('status', response.status); stream.emit('status', response.status);
callback(null, deserialize(response.read)); callback(null, deserialize(response.read));
}); });

@ -89,11 +89,13 @@ function sendUnaryResponse(call, value, serialize) {
function setUpWritable(stream, serialize) { function setUpWritable(stream, serialize) {
stream.finished = false; stream.finished = false;
stream.status = { stream.status = {
'code' : grpc.status.OK, code : grpc.status.OK,
'details' : 'OK' details : 'OK',
metadata : {}
}; };
stream.serialize = common.wrapIgnoreNull(serialize); stream.serialize = common.wrapIgnoreNull(serialize);
function sendStatus() { function sendStatus() {
console.log('Server sending status');
var batch = {}; var batch = {};
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
stream.call.startBatch(batch, function(){}); stream.call.startBatch(batch, function(){});
@ -115,7 +117,7 @@ function setUpWritable(stream, serialize) {
details = err.details; details = err.details;
} }
} }
stream.status = {'code': code, 'details': details}; stream.status = {code: code, details: details, metadata: {}};
} }
/** /**
* Terminate the call. This includes indicating that reads are done, draining * Terminate the call. This includes indicating that reads are done, draining
@ -167,6 +169,7 @@ function ServerWritableStream(call, serialize) {
function _write(chunk, encoding, callback) { function _write(chunk, encoding, callback) {
var batch = {}; var batch = {};
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
console.log('Server writing', batch);
this.call.startBatch(batch, function(err, value) { this.call.startBatch(batch, function(err, value) {
if (err) { if (err) {
this.emit('error', err); this.emit('error', err);
@ -204,11 +207,14 @@ function _read(size) {
return; return;
} }
if (self.finished) { if (self.finished) {
console.log('Pushing null');
self.push(null); self.push(null);
return; return;
} }
var data = event.read; var data = event.read;
console.log(data);
if (self.push(self.deserialize(data)) && data != null) { if (self.push(self.deserialize(data)) && data != null) {
console.log('Reading again');
var read_batch = {}; var read_batch = {};
read_batch[grpc.opType.RECV_MESSAGE] = true; read_batch[grpc.opType.RECV_MESSAGE] = true;
self.call.startBatch(read_batch, readCallback); self.call.startBatch(read_batch, readCallback);
@ -234,6 +240,7 @@ util.inherits(ServerDuplexStream, Duplex);
function ServerDuplexStream(call, serialize, deserialize) { function ServerDuplexStream(call, serialize, deserialize) {
Duplex.call(this, {objectMode: true}); Duplex.call(this, {objectMode: true});
this.call = call;
setUpWritable(this, serialize); setUpWritable(this, serialize);
setUpReadable(this, deserialize); setUpReadable(this, deserialize);
} }
@ -280,7 +287,7 @@ function handleServerStreaming(call, handler, metadata) {
stream.emit('error', err); stream.emit('error', err);
return; return;
} }
stream.request = result.read; stream.request = handler.deserialize(result.read);
handler.func(stream); handler.func(stream);
}); });
} }

@ -1,357 +0,0 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
var _ = require('underscore');
var capitalize = require('underscore.string/capitalize');
var decapitalize = require('underscore.string/decapitalize');
var client = require('./client.js');
var common = require('./common.js');
var EventEmitter = require('events').EventEmitter;
var stream = require('stream');
var Readable = stream.Readable;
var Writable = stream.Writable;
var Duplex = stream.Duplex;
var util = require('util');
function forwardEvent(fromEmitter, toEmitter, event) {
fromEmitter.on(event, function forward() {
_.partial(toEmitter.emit, event).apply(toEmitter, arguments);
});
}
util.inherits(ClientReadableObjectStream, Readable);
/**
* Class for representing a gRPC server streaming call as a Node stream on the
* client side. Extends from stream.Readable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
*/
function ClientReadableObjectStream(stream) {
var options = {objectMode: true};
Readable.call(this, options);
this._stream = stream;
var self = this;
forwardEvent(stream, this, 'status');
forwardEvent(stream, this, 'metadata');
this._stream.on('data', function forwardData(chunk) {
if (!self.push(chunk)) {
self._stream.pause();
}
});
this._stream.pause();
}
/**
* _read implementation for both types of streams that allow reading.
* @this {ClientReadableObjectStream}
* @param {number} size Ignored
*/
function _read(size) {
this._stream.resume();
}
/**
* See docs for _read
*/
ClientReadableObjectStream.prototype._read = _read;
util.inherits(ClientWritableObjectStream, Writable);
/**
* Class for representing a gRPC client streaming call as a Node stream on the
* client side. Extends from stream.Writable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
*/
function ClientWritableObjectStream(stream) {
var options = {objectMode: true};
Writable.call(this, options);
this._stream = stream;
forwardEvent(stream, this, 'status');
forwardEvent(stream, this, 'metadata');
this.on('finish', function() {
this._stream.end();
});
}
/**
* _write implementation for both types of streams that allow writing
* @this {ClientWritableObjectStream}
* @param {*} chunk The value to write to the stream
* @param {string} encoding Ignored
* @param {function(Error)} callback Callback to call when finished writing
*/
function _write(chunk, encoding, callback) {
this._stream.write(chunk, encoding, callback);
}
/**
* See docs for _write
*/
ClientWritableObjectStream.prototype._write = _write;
/**
* Cancel the underlying call
*/
function cancel() {
this._stream.cancel();
}
ClientReadableObjectStream.prototype.cancel = cancel;
ClientWritableObjectStream.prototype.cancel = cancel;
/**
* Get a function that can make unary requests to the specified method.
* @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs
* @param {function(Buffer)} deserialize The deserialization function for
* outputs
* @return {Function} makeUnaryRequest
*/
function makeUnaryRequestFunction(method, serialize, deserialize) {
/**
* Make a unary request with this method on the given channel with the given
* argument, callback, etc.
* @this {SurfaceClient} Client object. Must have a channel member.
* @param {*} argument The argument to the call. Should be serializable with
* serialize
* @param {function(?Error, value=)} callback The callback to for when the
* response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
function makeUnaryRequest(argument, callback, metadata, deadline) {
var stream = client.makeRequest(this.channel, method, serialize,
deserialize, metadata, deadline);
var emitter = new EventEmitter();
emitter.cancel = function cancel() {
stream.cancel();
};
forwardEvent(stream, emitter, 'status');
forwardEvent(stream, emitter, 'metadata');
stream.write(argument);
stream.end();
stream.on('data', function forwardData(chunk) {
try {
callback(null, chunk);
} catch (e) {
callback(e);
}
});
stream.on('status', function forwardStatus(status) {
if (status.code !== client.status.OK) {
callback(status);
}
});
return emitter;
}
return makeUnaryRequest;
}
/**
* Get a function that can make client stream requests to the specified method.
* @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs
* @param {function(Buffer)} deserialize The deserialization function for
* outputs
* @return {Function} makeClientStreamRequest
*/
function makeClientStreamRequestFunction(method, serialize, deserialize) {
/**
* Make a client stream request with this method on the given channel with the
* given callback, etc.
* @this {SurfaceClient} Client object. Must have a channel member.
* @param {function(?Error, value=)} callback The callback to for when the
* response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
function makeClientStreamRequest(callback, metadata, deadline) {
var stream = client.makeRequest(this.channel, method, serialize,
deserialize, metadata, deadline);
var obj_stream = new ClientWritableObjectStream(stream);
stream.on('data', function forwardData(chunk) {
try {
callback(null, chunk);
} catch (e) {
callback(e);
}
});
stream.on('status', function forwardStatus(status) {
if (status.code !== client.status.OK) {
callback(status);
}
});
return obj_stream;
}
return makeClientStreamRequest;
}
/**
* Get a function that can make server stream requests to the specified method.
* @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs
* @param {function(Buffer)} deserialize The deserialization function for
* outputs
* @return {Function} makeServerStreamRequest
*/
function makeServerStreamRequestFunction(method, serialize, deserialize) {
/**
* Make a server stream request with this method on the given channel with the
* given argument, etc.
* @this {SurfaceClient} Client object. Must have a channel member.
* @param {*} argument The argument to the call. Should be serializable with
* serialize
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
function makeServerStreamRequest(argument, metadata, deadline) {
var stream = client.makeRequest(this.channel, method, serialize,
deserialize, metadata, deadline);
var obj_stream = new ClientReadableObjectStream(stream);
stream.write(argument);
stream.end();
return obj_stream;
}
return makeServerStreamRequest;
}
/**
* Get a function that can make bidirectional stream requests to the specified
* method.
* @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs
* @param {function(Buffer)} deserialize The deserialization function for
* outputs
* @return {Function} makeBidiStreamRequest
*/
function makeBidiStreamRequestFunction(method, serialize, deserialize) {
/**
* Make a bidirectional stream request with this method on the given channel.
* @this {SurfaceClient} Client object. Must have a channel member.
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {(number|Date)=} deadline The deadline for processing this request.
* Defaults to infinite future
* @return {EventEmitter} An event emitter for stream related events
*/
function makeBidiStreamRequest(metadata, deadline) {
return client.makeRequest(this.channel, method, serialize,
deserialize, metadata, deadline);
}
return makeBidiStreamRequest;
}
/**
* Map with short names for each of the requester maker functions. Used in
* makeClientConstructor
*/
var requester_makers = {
unary: makeUnaryRequestFunction,
server_stream: makeServerStreamRequestFunction,
client_stream: makeClientStreamRequestFunction,
bidi: makeBidiStreamRequestFunction
}
/**
* Creates a constructor for clients for the given service
* @param {ProtoBuf.Reflect.Service} service The service to generate a client
* for
* @return {function(string, Object)} New client constructor
*/
function makeClientConstructor(service) {
var prefix = '/' + common.fullyQualifiedName(service) + '/';
/**
* Create a client with the given methods
* @constructor
* @param {string} address The address of the server to connect to
* @param {Object} options Options to pass to the underlying channel
*/
function SurfaceClient(address, options) {
this.channel = new client.Channel(address, options);
}
_.each(service.children, function(method) {
var method_type;
if (method.requestStream) {
if (method.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
if (method.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
SurfaceClient.prototype[decapitalize(method.name)] =
requester_makers[method_type](
prefix + capitalize(method.name),
common.serializeCls(method.resolvedRequestType.build()),
common.deserializeCls(method.resolvedResponseType.build()));
});
SurfaceClient.service = service;
return SurfaceClient;
}
exports.makeClientConstructor = makeClientConstructor;
/**
* See docs for client.status
*/
exports.status = client.status;
/**
* See docs for client.callError
*/
exports.callError = client.callError;

@ -1,340 +0,0 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
var _ = require('underscore');
var capitalize = require('underscore.string/capitalize');
var decapitalize = require('underscore.string/decapitalize');
var Server = require('./server.js');
var stream = require('stream');
var Readable = stream.Readable;
var Writable = stream.Writable;
var Duplex = stream.Duplex;
var util = require('util');
var common = require('./common.js');
util.inherits(ServerReadableObjectStream, Readable);
/**
* Class for representing a gRPC client streaming call as a Node stream on the
* server side. Extends from stream.Readable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
*/
function ServerReadableObjectStream(stream) {
var options = {objectMode: true};
Readable.call(this, options);
this._stream = stream;
Object.defineProperty(this, 'cancelled', {
get: function() { return stream.cancelled; }
});
var self = this;
this._stream.on('cancelled', function() {
self.emit('cancelled');
});
this._stream.on('data', function forwardData(chunk) {
if (!self.push(chunk)) {
self._stream.pause();
}
});
this._stream.on('end', function forwardEnd() {
self.push(null);
});
this._stream.pause();
}
/**
* _read implementation for both types of streams that allow reading.
* @this {ServerReadableObjectStream|ServerBidiObjectStream}
* @param {number} size Ignored
*/
function _read(size) {
this._stream.resume();
}
/**
* See docs for _read
*/
ServerReadableObjectStream.prototype._read = _read;
util.inherits(ServerWritableObjectStream, Writable);
/**
* Class for representing a gRPC server streaming call as a Node stream on the
* server side. Extends from stream.Writable.
* @constructor
* @param {stream} stream Underlying binary Duplex stream for the call
*/
function ServerWritableObjectStream(stream) {
var options = {objectMode: true};
Writable.call(this, options);
this._stream = stream;
this._stream.on('cancelled', function() {
self.emit('cancelled');
});
this.on('finish', function() {
this._stream.end();
});
}
/**
* _write implementation for both types of streams that allow writing
* @this {ServerWritableObjectStream}
* @param {*} chunk The value to write to the stream
* @param {string} encoding Ignored
* @param {function(Error)} callback Callback to call when finished writing
*/
function _write(chunk, encoding, callback) {
this._stream.write(chunk, encoding, callback);
}
/**
* See docs for _write
*/
ServerWritableObjectStream.prototype._write = _write;
/**
* Creates a binary stream handler function from a unary handler function
* @param {function(Object, function(Error, *), metadata=)} handler Unary call
* handler
* @return {function(stream, metadata=)} Binary stream handler
*/
function makeUnaryHandler(handler) {
/**
* Handles a stream by reading a single data value, passing it to the handler,
* and writing the response back to the stream.
* @param {stream} stream Binary data stream
* @param {metadata=} metadata Incoming metadata array
*/
return function handleUnaryCall(stream, metadata) {
stream.on('data', function handleUnaryData(value) {
var call = {request: value};
Object.defineProperty(call, 'cancelled', {
get: function() { return stream.cancelled;}
});
stream.on('cancelled', function() {
call.emit('cancelled');
});
handler(call, function sendUnaryData(err, value) {
if (err) {
stream.emit('error', err);
} else {
stream.write(value);
stream.end();
}
}, metadata);
});
};
}
/**
* Creates a binary stream handler function from a client stream handler
* function
* @param {function(Readable, function(Error, *), metadata=)} handler Client
* stream call handler
* @return {function(stream, metadata=)} Binary stream handler
*/
function makeClientStreamHandler(handler) {
/**
* Handles a stream by passing a deserializing stream to the handler and
* writing the response back to the stream.
* @param {stream} stream Binary data stream
* @param {metadata=} metadata Incoming metadata array
*/
return function handleClientStreamCall(stream, metadata) {
var object_stream = new ServerReadableObjectStream(stream);
handler(object_stream, function sendClientStreamData(err, value) {
if (err) {
stream.emit('error', err);
} else {
stream.write(value);
stream.end();
}
}, metadata);
};
}
/**
* Creates a binary stream handler function from a server stream handler
* function
* @param {function(Writable, metadata=)} handler Server stream call handler
* @return {function(stream, metadata=)} Binary stream handler
*/
function makeServerStreamHandler(handler) {
/**
* Handles a stream by attaching it to a serializing stream, and passing it to
* the handler.
* @param {stream} stream Binary data stream
* @param {metadata=} metadata Incoming metadata array
*/
return function handleServerStreamCall(stream, metadata) {
stream.on('data', function handleClientData(value) {
var object_stream = new ServerWritableObjectStream(stream);
object_stream.request = value;
handler(object_stream, metadata);
});
};
}
/**
* Creates a binary stream handler function from a bidi stream handler function
* @param {function(Duplex, metadata=)} handler Unary call handler
* @return {function(stream, metadata=)} Binary stream handler
*/
function makeBidiStreamHandler(handler) {
return handler;
}
/**
* Map with short names for each of the handler maker functions. Used in
* makeServerConstructor
*/
var handler_makers = {
unary: makeUnaryHandler,
server_stream: makeServerStreamHandler,
client_stream: makeClientStreamHandler,
bidi: makeBidiStreamHandler
};
/**
* Creates a constructor for servers with a service defined by the methods
* object. The methods object has string keys and values of this form:
* {serialize: function, deserialize: function, client_stream: bool,
* server_stream: bool}
* @param {Object} methods Method descriptor for each method the server should
* expose
* @param {string} prefix The prefex to prepend to each method name
* @return {function(Object, Object)} New server constructor
*/
function makeServerConstructor(services) {
var qual_names = [];
_.each(services, function(service) {
_.each(service.children, function(method) {
var name = common.fullyQualifiedName(method);
if (_.indexOf(qual_names, name) !== -1) {
throw new Error('Method ' + name + ' exposed by more than one service');
}
qual_names.push(name);
});
});
/**
* Create a server with the given handlers for all of the methods.
* @constructor
* @param {Object} service_handlers Map from service names to map from method
* names to handlers
* @param {function(string, Object<string, Array<Buffer>>):
Object<string, Array<Buffer|string>>=} getMetadata Callback that
* gets metatada for a given method
* @param {Object=} options Options to pass to the underlying server
*/
function SurfaceServer(service_handlers, getMetadata, options) {
var server = new Server(getMetadata, options);
this.inner_server = server;
_.each(services, function(service) {
var service_name = common.fullyQualifiedName(service);
if (service_handlers[service_name] === undefined) {
throw new Error('Handlers for service ' +
service_name + ' not provided.');
}
var prefix = '/' + common.fullyQualifiedName(service) + '/';
_.each(service.children, function(method) {
var method_type;
if (method.requestStream) {
if (method.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
if (method.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
if (service_handlers[service_name][decapitalize(method.name)] ===
undefined) {
throw new Error('Method handler for ' +
common.fullyQualifiedName(method) + ' not provided.');
}
var binary_handler = handler_makers[method_type](
service_handlers[service_name][decapitalize(method.name)]);
var serialize = common.serializeCls(
method.resolvedResponseType.build());
var deserialize = common.deserializeCls(
method.resolvedRequestType.build());
server.register(prefix + capitalize(method.name), binary_handler,
serialize, deserialize);
});
}, this);
}
/**
* Binds the server to the given port, with SSL enabled if secure is specified
* @param {string} port The port that the server should bind on, in the format
* "address:port"
* @param {boolean=} secure Whether the server should open a secure port
* @return {SurfaceServer} this
*/
SurfaceServer.prototype.bind = function(port, secure) {
return this.inner_server.bind(port, secure);
};
/**
* Starts the server listening on any bound ports
* @return {SurfaceServer} this
*/
SurfaceServer.prototype.listen = function() {
this.inner_server.start();
return this;
};
/**
* Shuts the server down; tells it to stop listening for new requests and to
* kill old requests.
*/
SurfaceServer.prototype.shutdown = function() {
this.inner_server.shutdown();
};
return SurfaceServer;
}
/**
* See documentation for makeServerConstructor
*/
exports.makeServerConstructor = makeServerConstructor;

@ -1,255 +0,0 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
var assert = require('assert');
var fs = require('fs');
var path = require('path');
var grpc = require('bindings')('grpc.node');
var Server = require('../src/server');
var client = require('../src/client');
var common = require('../src/common');
var ca_path = path.join(__dirname, 'data/ca.pem');
var key_path = path.join(__dirname, 'data/server1.key');
var pem_path = path.join(__dirname, 'data/server1.pem');
/**
* Helper function to return an absolute deadline given a relative timeout in
* seconds.
* @param {number} timeout_secs The number of seconds to wait before timing out
* @return {Date} A date timeout_secs in the future
*/
function getDeadline(timeout_secs) {
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + timeout_secs);
return deadline;
}
/**
* Responds to every request with the same data as a response
* @param {Stream} stream
*/
function echoHandler(stream) {
stream.pipe(stream);
}
/**
* Responds to every request with an error status
* @param {Stream} stream
*/
function errorHandler(stream) {
throw {
'code' : grpc.status.UNIMPLEMENTED,
'details' : 'error details'
};
}
/**
* Wait for a cancellation instead of responding
* @param {Stream} stream
*/
function cancelHandler(stream) {
// do nothing
}
function metadataHandler(stream, metadata) {
stream.end();
}
/**
* Serialize a string to a Buffer
* @param {string} value The string to serialize
* @return {Buffer} The serialized value
*/
function stringSerialize(value) {
return new Buffer(value);
}
/**
* Deserialize a Buffer to a string
* @param {Buffer} buffer The buffer to deserialize
* @return {string} The string value of the buffer
*/
function stringDeserialize(buffer) {
return buffer.toString();
}
describe('echo client', function() {
var server;
var channel;
before(function() {
server = new Server(function getMetadata(method, metadata) {
return {method: [method]};
});
var port_num = server.bind('0.0.0.0:0');
server.register('echo', echoHandler);
server.register('error', errorHandler);
server.register('cancellation', cancelHandler);
server.register('metadata', metadataHandler);
server.start();
channel = new grpc.Channel('localhost:' + port_num);
});
after(function() {
server.shutdown();
});
it('should receive echo responses', function(done) {
var messages = ['echo1', 'echo2', 'echo3', 'echo4'];
var stream = client.makeRequest(
channel,
'echo',
stringSerialize,
stringDeserialize);
for (var i = 0; i < messages.length; i++) {
stream.write(messages[i]);
}
stream.end();
var index = 0;
stream.on('data', function(chunk) {
assert.equal(messages[index], chunk);
index += 1;
});
stream.on('status', function(status) {
assert.equal(status.code, client.status.OK);
});
stream.on('end', function() {
assert.equal(index, messages.length);
done();
});
});
it('should recieve metadata set by the server', function(done) {
var stream = client.makeRequest(channel, 'metadata');
stream.on('metadata', function(metadata) {
assert.strictEqual(metadata.method[0].toString(), 'metadata');
});
stream.on('status', function(status) {
assert.equal(status.code, client.status.OK);
done();
});
stream.end();
});
it('should get an error status that the server throws', function(done) {
var stream = client.makeRequest(channel, 'error');
stream.on('data', function() {});
stream.write(new Buffer('test'));
stream.end();
stream.on('status', function(status) {
assert.equal(status.code, grpc.status.UNIMPLEMENTED);
assert.equal(status.details, 'error details');
done();
});
});
it('should be able to cancel a call', function(done) {
var stream = client.makeRequest(
channel,
'cancellation',
null,
getDeadline(1));
stream.cancel();
stream.on('status', function(status) {
assert.equal(status.code, grpc.status.CANCELLED);
done();
});
});
it('should get correct status for unimplemented method', function(done) {
var stream = client.makeRequest(channel, 'unimplemented_method');
stream.end();
stream.on('status', function(status) {
assert.equal(status.code, grpc.status.UNIMPLEMENTED);
done();
});
});
});
/* TODO(mlumish): explore options for reducing duplication between this test
* and the insecure echo client test */
describe('secure echo client', function() {
var server;
var channel;
before(function(done) {
fs.readFile(ca_path, function(err, ca_data) {
assert.ifError(err);
fs.readFile(key_path, function(err, key_data) {
assert.ifError(err);
fs.readFile(pem_path, function(err, pem_data) {
assert.ifError(err);
var creds = grpc.Credentials.createSsl(ca_data);
var server_creds = grpc.ServerCredentials.createSsl(null,
key_data,
pem_data);
server = new Server(null, {'credentials' : server_creds});
var port_num = server.bind('0.0.0.0:0', true);
server.register('echo', echoHandler);
server.start();
channel = new grpc.Channel('localhost:' + port_num, {
'grpc.ssl_target_name_override' : 'foo.test.google.com',
'credentials' : creds
});
done();
});
});
});
});
after(function() {
server.shutdown();
});
it('should recieve echo responses', function(done) {
var messages = ['echo1', 'echo2', 'echo3', 'echo4'];
var stream = client.makeRequest(
channel,
'echo',
stringSerialize,
stringDeserialize);
for (var i = 0; i < messages.length; i++) {
stream.write(messages[i]);
}
stream.end();
var index = 0;
stream.on('data', function(chunk) {
assert.equal(messages[index], chunk);
index += 1;
});
stream.on('status', function(status) {
assert.equal(status.code, client.status.OK);
});
stream.on('end', function() {
assert.equal(index, messages.length);
done();
});
});
});

@ -56,7 +56,7 @@ describe('Interop tests', function() {
interop_client.runTest(port, name_override, 'empty_unary', true, done); interop_client.runTest(port, name_override, 'empty_unary', true, done);
}); });
// This fails due to an unknown bug // This fails due to an unknown bug
it.skip('should pass large_unary', function(done) { it('should pass large_unary', function(done) {
interop_client.runTest(port, name_override, 'large_unary', true, done); interop_client.runTest(port, name_override, 'large_unary', true, done);
}); });
it('should pass client_streaming', function(done) { it('should pass client_streaming', function(done) {

@ -66,7 +66,7 @@ describe('Math client', function() {
done(); done();
}); });
}); });
it.only('should handle a server streaming request', function(done) { it('should handle a server streaming request', function(done) {
var call = math_client.fib({limit: 7}); var call = math_client.fib({limit: 7});
var expected_results = [1, 1, 2, 3, 5, 8, 13]; var expected_results = [1, 1, 2, 3, 5, 8, 13];
var next_expected = 0; var next_expected = 0;

@ -1,122 +0,0 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
var assert = require('assert');
var grpc = require('bindings')('grpc.node');
var Server = require('../src/server');
/**
* This is used for testing functions with multiple asynchronous calls that
* can happen in different orders. This should be passed the number of async
* function invocations that can occur last, and each of those should call this
* function's return value
* @param {function()} done The function that should be called when a test is
* complete.
* @param {number} count The number of calls to the resulting function if the
* test passes.
* @return {function()} The function that should be called at the end of each
* sequence of asynchronous functions.
*/
function multiDone(done, count) {
return function() {
count -= 1;
if (count <= 0) {
done();
}
};
}
/**
* Responds to every request with the same data as a response
* @param {Stream} stream
*/
function echoHandler(stream) {
stream.pipe(stream);
}
describe('echo server', function() {
var server;
var channel;
before(function() {
server = new Server();
var port_num = server.bind('[::]:0');
server.register('echo', echoHandler);
server.start();
channel = new grpc.Channel('localhost:' + port_num);
});
after(function() {
server.shutdown();
});
it('should echo inputs as responses', function(done) {
done = multiDone(done, 4);
var req_text = 'echo test string';
var status_text = 'OK';
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var call = new grpc.Call(channel,
'echo',
deadline);
call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
},function(event) {
assert.strictEqual(event.type, grpc.completionType.FINISHED);
var status = event.data;
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(status.details, status_text);
done();
}, 0);
call.startWrite(
new Buffer(req_text),
function(event) {
assert.strictEqual(event.type,
grpc.completionType.WRITE_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
call.writesDone(function(event) {
assert.strictEqual(event.type,
grpc.completionType.FINISH_ACCEPTED);
assert.strictEqual(event.data, grpc.opError.OK);
done();
});
}, 0);
call.startRead(function(event) {
assert.strictEqual(event.type, grpc.completionType.READ);
assert.strictEqual(event.data.toString(), req_text);
done();
});
});
});

@ -33,9 +33,9 @@
var assert = require('assert'); var assert = require('assert');
var surface_server = require('../src/surface_server.js'); var surface_server = require('../src/server.js');
var surface_client = require('../src/surface_client.js'); var surface_client = require('../src/client.js');
var ProtoBuf = require('protobufjs'); var ProtoBuf = require('protobufjs');

Loading…
Cancel
Save