From 684f6f4359a4c09b3430ae8da80f9db5ac00a603 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 28 Jan 2015 11:47:23 -0800 Subject: [PATCH 1/2] Added metadata pass-through to server method handlers --- src/node/src/surface_server.js | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/node/src/surface_server.js b/src/node/src/surface_server.js index af23ec211cd..cf342fc9fd0 100644 --- a/src/node/src/surface_server.js +++ b/src/node/src/surface_server.js @@ -129,16 +129,18 @@ ServerWritableObjectStream.prototype._write = _write; /** * Creates a binary stream handler function from a unary handler function - * @param {function(Object, function(Error, *))} handler Unary call handler - * @return {function(stream)} Binary stream handler + * @param {function(Object, function(Error, *), metadata=)} handler Unary call + * handler + * @return {function(stream, metadata=)} Binary stream handler */ function makeUnaryHandler(handler) { /** * Handles a stream by reading a single data value, passing it to the handler, * and writing the response back to the stream. * @param {stream} stream Binary data stream + * @param {metadata=} metadata Incoming metadata array */ - return function handleUnaryCall(stream) { + return function handleUnaryCall(stream, metadata) { stream.on('data', function handleUnaryData(value) { var call = {request: value}; Object.defineProperty(call, 'cancelled', { @@ -154,7 +156,7 @@ function makeUnaryHandler(handler) { stream.write(value); stream.end(); } - }); + }, metadata); }); }; } @@ -162,17 +164,18 @@ function makeUnaryHandler(handler) { /** * Creates a binary stream handler function from a client stream handler * function - * @param {function(Readable, function(Error, *))} handler Client stream call - * handler - * @return {function(stream)} Binary stream handler + * @param {function(Readable, function(Error, *), metadata=)} handler Client + * stream call handler + * @return {function(stream, metadata=)} Binary stream handler */ function makeClientStreamHandler(handler) { /** * Handles a stream by passing a deserializing stream to the handler and * writing the response back to the stream. * @param {stream} stream Binary data stream + * @param {metadata=} metadata Incoming metadata array */ - return function handleClientStreamCall(stream) { + return function handleClientStreamCall(stream, metadata) { var object_stream = new ServerReadableObjectStream(stream); handler(object_stream, function sendClientStreamData(err, value) { if (err) { @@ -181,35 +184,36 @@ function makeClientStreamHandler(handler) { stream.write(value); stream.end(); } - }); + }, metadata); }; } /** * Creates a binary stream handler function from a server stream handler * function - * @param {function(Writable)} handler Server stream call handler - * @return {function(stream)} Binary stream handler + * @param {function(Writable, metadata=)} handler Server stream call handler + * @return {function(stream, metadata=)} Binary stream handler */ function makeServerStreamHandler(handler) { /** * Handles a stream by attaching it to a serializing stream, and passing it to * the handler. * @param {stream} stream Binary data stream + * @param {metadata=} metadata Incoming metadata array */ - return function handleServerStreamCall(stream) { + return function handleServerStreamCall(stream, metadata) { stream.on('data', function handleClientData(value) { var object_stream = new ServerWritableObjectStream(stream); object_stream.request = value; - handler(object_stream); + handler(object_stream, metadata); }); }; } /** * Creates a binary stream handler function from a bidi stream handler function - * @param {function(Duplex)} handler Unary call handler - * @return {function(stream)} Binary stream handler + * @param {function(Duplex, metadata=)} handler Unary call handler + * @return {function(stream, metadata=)} Binary stream handler */ function makeBidiStreamHandler(handler) { return handler; From 4d2d0f0f3acd1e90037524803e7cb6b445f9c676 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 29 Jan 2015 11:44:46 -0800 Subject: [PATCH 2/2] Added API for servers to provide metadata --- src/node/interop/interop_server.js | 2 +- src/node/src/server.js | 10 ++++++++-- src/node/src/surface_server.js | 9 ++++++--- src/node/test/client_server_test.js | 28 +++++++++++++++++++++------- src/node/test/surface_test.js | 2 +- 5 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js index ebf847876cc..54e9715d1e3 100644 --- a/src/node/interop/interop_server.js +++ b/src/node/interop/interop_server.js @@ -183,7 +183,7 @@ function getServer(port, tls) { fullDuplexCall: handleFullDuplex, halfDuplexCall: handleHalfDuplex } - }, options); + }, null, options); var port_num = server.bind('0.0.0.0:' + port, tls); return {server: server, port: port_num}; } diff --git a/src/node/src/server.js b/src/node/src/server.js index 03cdbe6f984..a5d737c68d3 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -202,10 +202,13 @@ GrpcServerStream.prototype._write = function(chunk, encoding, callback) { * Constructs a server object that stores request handlers and delegates * incoming requests to those handlers * @constructor - * @param {Array} options Options that should be passed to the internal server + * @param {function(string, Object>): + Object>=} getMetadata Callback that gets + * metatada for a given method + * @param {Object=} options Options that should be passed to the internal server * implementation */ -function Server(options) { +function Server(getMetadata, options) { this.handlers = {}; var handlers = this.handlers; var server = new grpc.Server(options); @@ -249,6 +252,9 @@ function Server(options) { stream.emit('cancelled'); } }, 0); + if (getMetadata) { + call.addMetadata(getMetadata(data.method, data.metadata)); + } call.serverEndInitialMetadata(0); var stream = new GrpcServerStream(call, handler.serialize, handler.deserialize); diff --git a/src/node/src/surface_server.js b/src/node/src/surface_server.js index cf342fc9fd0..a47d1fa23d5 100644 --- a/src/node/src/surface_server.js +++ b/src/node/src/surface_server.js @@ -256,10 +256,13 @@ function makeServerConstructor(services) { * @constructor * @param {Object} service_handlers Map from service names to map from method * names to handlers - * @param {Object} options Options to pass to the underlying server + * @param {function(string, Object>): + Object>=} getMetadata Callback that + * gets metatada for a given method + * @param {Object=} options Options to pass to the underlying server */ - function SurfaceServer(service_handlers, options) { - var server = new Server(options); + function SurfaceServer(service_handlers, getMetadata, options) { + var server = new Server(getMetadata, options); this.inner_server = server; _.each(services, function(service) { var service_name = common.fullyQualifiedName(service); diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js index 9e1b2a79c87..059dd1323af 100644 --- a/src/node/test/client_server_test.js +++ b/src/node/test/client_server_test.js @@ -84,6 +84,10 @@ function cancelHandler(stream) { // do nothing } +function metadataHandler(stream, metadata) { + stream.end(); +} + /** * Serialize a string to a Buffer * @param {string} value The string to serialize @@ -106,11 +110,14 @@ describe('echo client', function() { var server; var channel; before(function() { - server = new Server(); + server = new Server(function getMetadata(method, metadata) { + return {method: [method]}; + }); var port_num = server.bind('0.0.0.0:0'); server.register('echo', echoHandler); server.register('error', errorHandler); server.register('cancellation', cancelHandler); + server.register('metadata', metadataHandler); server.start(); channel = new grpc.Channel('localhost:' + port_num); @@ -142,12 +149,19 @@ describe('echo client', function() { done(); }); }); + it('should recieve metadata set by the server', function(done) { + var stream = client.makeRequest(channel, 'metadata'); + stream.on('metadata', function(metadata) { + assert.strictEqual(metadata.method[0].toString(), 'metadata'); + }); + stream.on('status', function(status) { + assert.equal(status.code, client.status.OK); + done(); + }); + stream.end(); + }); it('should get an error status that the server throws', function(done) { - var stream = client.makeRequest( - channel, - 'error', - null, - getDeadline(1)); + var stream = client.makeRequest(channel, 'error'); stream.on('data', function() {}); stream.write(new Buffer('test')); @@ -189,7 +203,7 @@ describe('secure echo client', function() { key_data, pem_data); - server = new Server({'credentials' : server_creds}); + server = new Server(null, {'credentials' : server_creds}); var port_num = server.bind('0.0.0.0:0', true); server.register('echo', echoHandler); server.start(); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 85f4841d4bc..1038f9ab337 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -75,7 +75,7 @@ describe('Surface server constructor', function() { }, /math.Math/); }); }); -describe('Surface client', function() { +describe('Cancelling surface client', function() { var client; var server; before(function() {