Added new Metadata class to abstract over internal representation and normalize keys

pull/3012/head
murgatroid99 9 years ago
parent 89bede02f1
commit 84e3cdeb97
  1. 15
      src/node/index.js
  2. 8
      src/node/interop/interop_client.js
  3. 65
      src/node/src/client.js
  4. 167
      src/node/src/metadata.js
  5. 60
      src/node/src/server.js
  6. 2
      src/node/test/health_test.js
  7. 172
      src/node/test/metadata_test.js
  8. 61
      src/node/test/surface_test.js

@ -41,6 +41,8 @@ var client = require('./src/client.js');
var server = require('./src/server.js'); var server = require('./src/server.js');
var Metadata = require('./src/metadata.js');
var grpc = require('bindings')('grpc'); var grpc = require('bindings')('grpc');
/** /**
@ -107,18 +109,12 @@ exports.getGoogleAuthDelegate = function getGoogleAuthDelegate(credential) {
* @param {function(Error, Object)} callback * @param {function(Error, Object)} callback
*/ */
return function updateMetadata(authURI, metadata, callback) { return function updateMetadata(authURI, metadata, callback) {
metadata = _.clone(metadata);
if (metadata.Authorization) {
metadata.Authorization = _.clone(metadata.Authorization);
} else {
metadata.Authorization = [];
}
credential.getRequestMetadata(authURI, function(err, header) { credential.getRequestMetadata(authURI, function(err, header) {
if (err) { if (err) {
callback(err); callback(err);
return; return;
} }
metadata.Authorization.push(header.Authorization); metadata.add('authorization', header.Authorization);
callback(null, metadata); callback(null, metadata);
}); });
}; };
@ -129,6 +125,11 @@ exports.getGoogleAuthDelegate = function getGoogleAuthDelegate(credential) {
*/ */
exports.Server = server.Server; exports.Server = server.Server;
/**
* @see module:src/metadata
*/
exports.Metadata = Metadata;
/** /**
* Status name to code number mapping * Status name to code number mapping
*/ */

@ -321,13 +321,7 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) {
credential.getAccessToken(function(err, token) { credential.getAccessToken(function(err, token) {
assert.ifError(err); assert.ifError(err);
var updateMetadata = function(authURI, metadata, callback) { var updateMetadata = function(authURI, metadata, callback) {
metadata = _.clone(metadata); metadata.Add('authorization', 'Bearer ' + token);
if (metadata.Authorization) {
metadata.Authorization = _.clone(metadata.Authorization);
} else {
metadata.Authorization = [];
}
metadata.Authorization.push('Bearer ' + token);
callback(null, metadata); callback(null, metadata);
}; };
var makeTestCall = function(error, client_metadata) { var makeTestCall = function(error, client_metadata) {

@ -42,7 +42,9 @@ var _ = require('lodash');
var grpc = require('bindings')('grpc.node'); var grpc = require('bindings')('grpc.node');
var common = require('./common.js'); var common = require('./common');
var Metadata = require('./metadata');
var EventEmitter = require('events').EventEmitter; var EventEmitter = require('events').EventEmitter;
@ -254,8 +256,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
* serialize * serialize
* @param {function(?Error, value=)} callback The callback to for when the * @param {function(?Error, value=)} callback The callback to for when the
* response is received * response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the * @param {Metadata=} metadata Metadata to add to the call
* call
* @param {Object=} options Options map * @param {Object=} options Options map
* @return {EventEmitter} An event emitter for stream related events * @return {EventEmitter} An event emitter for stream related events
*/ */
@ -264,7 +265,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
var emitter = new EventEmitter(); var emitter = new EventEmitter();
var call = getCall(this.channel, method, options); var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) { if (metadata === null || metadata === undefined) {
metadata = {}; metadata = new Metadata();
} else {
metadata = metadata.clone();
} }
emitter.cancel = function cancel() { emitter.cancel = function cancel() {
call.cancel(); call.cancel();
@ -281,7 +284,8 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
var client_batch = {}; var client_batch = {};
var message = serialize(argument); var message = serialize(argument);
message.grpcWriteFlags = options.flags; message.grpcWriteFlags = options.flags;
client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; client_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
client_batch[grpc.opType.SEND_MESSAGE] = message; client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
@ -292,7 +296,8 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
if (response.status.code !== grpc.status.OK) { if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details); var error = new Error(response.status.details);
error.code = response.status.code; error.code = response.status.code;
error.metadata = response.status.metadata; error.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
callback(error); callback(error);
return; return;
} else { } else {
@ -302,7 +307,8 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
return; return;
} }
} }
emitter.emit('metadata', response.metadata); emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
callback(null, deserialize(response.read)); callback(null, deserialize(response.read));
}); });
}); });
@ -326,7 +332,7 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
* @this {Client} Client object. Must have a channel member. * @this {Client} Client object. Must have a channel member.
* @param {function(?Error, value=)} callback The callback to for when the * @param {function(?Error, value=)} callback The callback to for when the
* response is received * response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the * @param {Metadata=} metadata Array of metadata key/value pairs to add to the
* call * call
* @param {Object=} options Options map * @param {Object=} options Options map
* @return {EventEmitter} An event emitter for stream related events * @return {EventEmitter} An event emitter for stream related events
@ -335,7 +341,9 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
/* jshint validthis: true */ /* jshint validthis: true */
var call = getCall(this.channel, method, options); var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) { if (metadata === null || metadata === undefined) {
metadata = {}; metadata = new Metadata();
} else {
metadata = metadata.clone();
} }
var stream = new ClientWritableStream(call, serialize); var stream = new ClientWritableStream(call, serialize);
this.updateMetadata(this.auth_uri, metadata, function(error, metadata) { this.updateMetadata(this.auth_uri, metadata, function(error, metadata) {
@ -345,7 +353,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
return; return;
} }
var metadata_batch = {}; var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; metadata_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(metadata_batch, function(err, response) { call.startBatch(metadata_batch, function(err, response) {
if (err) { if (err) {
@ -353,7 +362,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
// in the other batch. // in the other batch.
return; return;
} }
stream.emit('metadata', response.metadata); stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
}); });
var client_batch = {}; var client_batch = {};
client_batch[grpc.opType.RECV_MESSAGE] = true; client_batch[grpc.opType.RECV_MESSAGE] = true;
@ -363,7 +373,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
if (response.status.code !== grpc.status.OK) { if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details); var error = new Error(response.status.details);
error.code = response.status.code; error.code = response.status.code;
error.metadata = response.status.metadata; error.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
callback(error); callback(error);
return; return;
} else { } else {
@ -396,7 +407,7 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
* @this {SurfaceClient} Client object. Must have a channel member. * @this {SurfaceClient} Client object. Must have a channel member.
* @param {*} argument The argument to the call. Should be serializable with * @param {*} argument The argument to the call. Should be serializable with
* serialize * serialize
* @param {array=} metadata Array of metadata key/value pairs to add to the * @param {Metadata=} metadata Array of metadata key/value pairs to add to the
* call * call
* @param {Object} options Options map * @param {Object} options Options map
* @return {EventEmitter} An event emitter for stream related events * @return {EventEmitter} An event emitter for stream related events
@ -405,7 +416,9 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
/* jshint validthis: true */ /* jshint validthis: true */
var call = getCall(this.channel, method, options); var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) { if (metadata === null || metadata === undefined) {
metadata = {}; metadata = new Metadata();
} else {
metadata = metadata.clone();
} }
var stream = new ClientReadableStream(call, deserialize); var stream = new ClientReadableStream(call, deserialize);
this.updateMetadata(this.auth_uri, metadata, function(error, metadata) { this.updateMetadata(this.auth_uri, metadata, function(error, metadata) {
@ -417,7 +430,8 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
var start_batch = {}; var start_batch = {};
var message = serialize(argument); var message = serialize(argument);
message.grpcWriteFlags = options.flags; message.grpcWriteFlags = options.flags;
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = message; start_batch[grpc.opType.SEND_MESSAGE] = message;
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
@ -427,7 +441,8 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
// in the other batch. // in the other batch.
return; return;
} }
stream.emit('metadata', response.metadata); stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
}); });
var status_batch = {}; var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
@ -436,7 +451,8 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
if (response.status.code !== grpc.status.OK) { if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details); var error = new Error(response.status.details);
error.code = response.status.code; error.code = response.status.code;
error.metadata = response.status.metadata; error.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('error', error); stream.emit('error', error);
return; return;
} else { } else {
@ -466,7 +482,7 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
/** /**
* Make a bidirectional stream request with this method on the given channel. * Make a bidirectional stream request with this method on the given channel.
* @this {SurfaceClient} Client object. Must have a channel member. * @this {SurfaceClient} Client object. Must have a channel member.
* @param {array=} metadata Array of metadata key/value pairs to add to the * @param {Metadata=} metadata Array of metadata key/value pairs to add to the
* call * call
* @param {Options} options Options map * @param {Options} options Options map
* @return {EventEmitter} An event emitter for stream related events * @return {EventEmitter} An event emitter for stream related events
@ -475,7 +491,9 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
/* jshint validthis: true */ /* jshint validthis: true */
var call = getCall(this.channel, method, options); var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) { if (metadata === null || metadata === undefined) {
metadata = {}; metadata = new Metadata();
} else {
metadata = metadata.clone();
} }
var stream = new ClientDuplexStream(call, serialize, deserialize); var stream = new ClientDuplexStream(call, serialize, deserialize);
this.updateMetadata(this.auth_uri, metadata, function(error, metadata) { this.updateMetadata(this.auth_uri, metadata, function(error, metadata) {
@ -485,7 +503,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
return; return;
} }
var start_batch = {}; var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) { call.startBatch(start_batch, function(err, response) {
if (err) { if (err) {
@ -493,7 +512,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
// in the other batch. // in the other batch.
return; return;
} }
stream.emit('metadata', response.metadata); stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
}); });
var status_batch = {}; var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
@ -502,7 +522,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
if (response.status.code !== grpc.status.OK) { if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details); var error = new Error(response.status.details);
error.code = response.status.code; error.code = response.status.code;
error.metadata = response.status.metadata; error.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('error', error); stream.emit('error', error);
return; return;
} else { } else {

@ -0,0 +1,167 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/**
* Metadata module
* @module
*/
'use strict';
var _ = require('lodash');
/**
* Class for storing metadata. Keys are normalized to lowercase ASCII.
* @constructor
*/
function Metadata() {
this._internal_repr = {};
}
function normalizeKey(key) {
return _.deburr(key).toLowerCase();
}
function validate(key, value) {
if (_.endsWith(key, '-bin')) {
if (!(value instanceof Buffer)) {
throw new Error('keys that end with \'-bin\' must have Buffer values');
}
} else {
if (!_.isString(value)) {
throw new Error(
'keys that don\'t end with \'-bin\' must have String values');
}
}
}
/**
* Sets the given value for the given key, replacing any other values associated
* with that key. Normalizes the key.
* @param {String} key The key to set
* @param {String|Buffer} value The value to set. Must be a buffer if and only
* if the normalized key ends with '-bin'
*/
Metadata.prototype.set = function(key, value) {
key = normalizeKey(key);
validate(key, value);
this._internal_repr[key] = [value];
};
/**
* Adds the given value for the given key. Normalizes the key.
* @param {String} key The key to add to.
* @param {String|Buffer} value The value to add. Must be a buffer if and only
* if the normalized key ends with '-bin'
*/
Metadata.prototype.add = function(key, value) {
key = normalizeKey(key);
validate(key, value);
if (!this._internal_repr[key]) {
this._internal_repr[key] = [];
}
this._internal_repr[key].push(value);
};
/**
* Remove the given key and any associated values.
* @param {String} key The key to remove
*/
Metadata.prototype.remove = function(key) {
if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) {
delete this._internal_repr[key];
}
};
/**
* Gets a list of all values associated with the key.
* @param {String} key The key to get
* @return {Array.<String|Buffer>} The values associated with that key
*/
Metadata.prototype.get = function(key) {
if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) {
return this._internal_repr[key];
} else {
return [];
}
};
/**
* Get a map of each key to a single associated value. This reflects the most
* common way that people will want to see metadata.
* @return {Object.<String,String|Buffer>} A key/value mapping of the metadata
*/
Metadata.prototype.getMap = function() {
var result = {};
_.forOwn(this._internal_repr, function(values, key) {
if(values.length > 0) {
result[key] = values[0];
}
});
return result;
};
/**
* Clone the metadata object.
* @return {Metadata} The new cloned object
*/
Metadata.prototype.clone = function() {
var copy = new Metadata();
copy._internal_repr = _.cloneDeep(this._internal_repr);
return copy;
}
/**
* Gets the metadata in the format used by interal code. Intended for internal
* use only. API stability is not guaranteed.
* @private
* @return {Object.<String, Array.<String|Buffer>>} The metadata
*/
Metadata.prototype._getCoreRepresentation = function() {
return this._internal_repr;
};
/**
* Creates a Metadata object from a metadata map in the internal format.
* Intended for internal use only. API stability is not guaranteed.
* @private
* @param {Object.<String, Array.<String|Buffer>>} The metadata
* @return {Metadata} The new Metadata object
*/
Metadata._fromCoreRepresentation = function(metadata) {
var newMetadata = new Metadata();
newMetadata._internal_repr = _.cloneDeep(metadata);
return newMetadata;
};
module.exports = Metadata;

@ -44,6 +44,8 @@ var grpc = require('bindings')('grpc.node');
var common = require('./common'); var common = require('./common');
var Metadata = require('./metadata');
var stream = require('stream'); var stream = require('stream');
var Readable = stream.Readable; var Readable = stream.Readable;
@ -60,10 +62,10 @@ var EventEmitter = require('events').EventEmitter;
* @param {Object} error The error object * @param {Object} error The error object
*/ */
function handleError(call, error) { function handleError(call, error) {
var statusMetadata = new Metadata();
var status = { var status = {
code: grpc.status.UNKNOWN, code: grpc.status.UNKNOWN,
details: 'Unknown Error', details: 'Unknown Error'
metadata: {}
}; };
if (error.hasOwnProperty('message')) { if (error.hasOwnProperty('message')) {
status.details = error.message; status.details = error.message;
@ -75,11 +77,13 @@ function handleError(call, error) {
} }
} }
if (error.hasOwnProperty('metadata')) { if (error.hasOwnProperty('metadata')) {
status.metadata = error.metadata; statusMetadata = error.metadata;
} }
status.metadata = statusMetadata._getCoreRepresentation();
var error_batch = {}; var error_batch = {};
if (!call.metadataSent) { if (!call.metadataSent) {
error_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; error_batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
} }
error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(error_batch, function(){}); call.startBatch(error_batch, function(){});
@ -114,22 +118,24 @@ function waitForCancel(call, emitter) {
* @param {*} value The value to respond with * @param {*} value The value to respond with
* @param {function(*):Buffer=} serialize Serialization function for the * @param {function(*):Buffer=} serialize Serialization function for the
* response * response
* @param {Object=} metadata Optional trailing metadata to send with status * @param {Metadata=} metadata Optional trailing metadata to send with status
* @param {number=} flags Flags for modifying how the message is sent. * @param {number=} flags Flags for modifying how the message is sent.
* Defaults to 0. * Defaults to 0.
*/ */
function sendUnaryResponse(call, value, serialize, metadata, flags) { function sendUnaryResponse(call, value, serialize, metadata, flags) {
var end_batch = {}; var end_batch = {};
var statusMetadata = new Metadata();
var status = { var status = {
code: grpc.status.OK, code: grpc.status.OK,
details: 'OK', details: 'OK'
metadata: {}
}; };
if (metadata) { if (metadata) {
status.metadata = metadata; statusMetadata = metadata;
} }
status.metadata = statusMetadata._getCoreRepresentation();
if (!call.metadataSent) { if (!call.metadataSent) {
end_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; end_batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
call.metadataSent = true; call.metadataSent = true;
} }
var message = serialize(value); var message = serialize(value);
@ -151,15 +157,17 @@ function setUpWritable(stream, serialize) {
stream.status = { stream.status = {
code : grpc.status.OK, code : grpc.status.OK,
details : 'OK', details : 'OK',
metadata : {} metadata : new Metadata()
}; };
stream.serialize = common.wrapIgnoreNull(serialize); stream.serialize = common.wrapIgnoreNull(serialize);
function sendStatus() { function sendStatus() {
var batch = {}; var batch = {};
if (!stream.call.metadataSent) { if (!stream.call.metadataSent) {
stream.call.metadataSent = true; stream.call.metadataSent = true;
batch[grpc.opType.SEND_INITIAL_METADATA] = {}; batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
} }
stream.status.metadata = stream.status.metadata._getCoreRepresentation();
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
stream.call.startBatch(batch, function(){}); stream.call.startBatch(batch, function(){});
} }
@ -203,7 +211,7 @@ function setUpWritable(stream, serialize) {
/** /**
* Override of Writable#end method that allows for sending metadata with a * Override of Writable#end method that allows for sending metadata with a
* success status. * success status.
* @param {Object=} metadata Metadata to send with the status * @param {Metadata=} metadata Metadata to send with the status
*/ */
stream.end = function(metadata) { stream.end = function(metadata) {
if (metadata) { if (metadata) {
@ -266,7 +274,8 @@ function _write(chunk, encoding, callback) {
/* jshint validthis: true */ /* jshint validthis: true */
var batch = {}; var batch = {};
if (!this.call.metadataSent) { if (!this.call.metadataSent) {
batch[grpc.opType.SEND_INITIAL_METADATA] = {}; batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
this.call.metadataSent = true; this.call.metadataSent = true;
} }
var message = this.serialize(chunk); var message = this.serialize(chunk);
@ -289,15 +298,15 @@ ServerWritableStream.prototype._write = _write;
/** /**
* Send the initial metadata for a writable stream. * Send the initial metadata for a writable stream.
* @param {Object<String, Array<(String|Buffer)>>} responseMetadata Metadata * @param {Metadata} responseMetadata Metadata to send
* to send
*/ */
function sendMetadata(responseMetadata) { function sendMetadata(responseMetadata) {
/* jshint validthis: true */ /* jshint validthis: true */
if (!this.call.metadataSent) { if (!this.call.metadataSent) {
this.call.metadataSent = true; this.call.metadataSent = true;
var batch = []; var batch = [];
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; batch[grpc.opType.SEND_INITIAL_METADATA] =
responseMetadata._getCoreRepresentation();
this.call.startBatch(batch, function(err) { this.call.startBatch(batch, function(err) {
if (err) { if (err) {
this.emit('error', err); this.emit('error', err);
@ -422,7 +431,7 @@ ServerDuplexStream.prototype.getPeer = getPeer;
* @access private * @access private
* @param {grpc.Call} call The call to handle * @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called * @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client * @param {Metadata} metadata Metadata from the client
*/ */
function handleUnary(call, handler, metadata) { function handleUnary(call, handler, metadata) {
var emitter = new EventEmitter(); var emitter = new EventEmitter();
@ -430,7 +439,8 @@ function handleUnary(call, handler, metadata) {
if (!call.metadataSent) { if (!call.metadataSent) {
call.metadataSent = true; call.metadataSent = true;
var batch = {}; var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; batch[grpc.opType.SEND_INITIAL_METADATA] =
responseMetadata._getCoreRepresentation();
call.startBatch(batch, function() {}); call.startBatch(batch, function() {});
} }
}; };
@ -478,7 +488,7 @@ function handleUnary(call, handler, metadata) {
* @access private * @access private
* @param {grpc.Call} call The call to handle * @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called * @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client * @param {Metadata} metadata Metadata from the client
*/ */
function handleServerStreaming(call, handler, metadata) { function handleServerStreaming(call, handler, metadata) {
var stream = new ServerWritableStream(call, handler.serialize); var stream = new ServerWritableStream(call, handler.serialize);
@ -507,7 +517,7 @@ function handleServerStreaming(call, handler, metadata) {
* @access private * @access private
* @param {grpc.Call} call The call to handle * @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called * @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client * @param {Metadata} metadata Metadata from the client
*/ */
function handleClientStreaming(call, handler, metadata) { function handleClientStreaming(call, handler, metadata) {
var stream = new ServerReadableStream(call, handler.deserialize); var stream = new ServerReadableStream(call, handler.deserialize);
@ -515,7 +525,8 @@ function handleClientStreaming(call, handler, metadata) {
if (!call.metadataSent) { if (!call.metadataSent) {
call.metadataSent = true; call.metadataSent = true;
var batch = {}; var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; batch[grpc.opType.SEND_INITIAL_METADATA] =
responseMetadata._getCoreRepresentation();
call.startBatch(batch, function() {}); call.startBatch(batch, function() {});
} }
}; };
@ -542,7 +553,7 @@ function handleClientStreaming(call, handler, metadata) {
* @access private * @access private
* @param {grpc.Call} call The call to handle * @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called * @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client * @param {Metadata} metadata Metadata from the client
*/ */
function handleBidiStreaming(call, handler, metadata) { function handleBidiStreaming(call, handler, metadata) {
var stream = new ServerDuplexStream(call, handler.serialize, var stream = new ServerDuplexStream(call, handler.serialize,
@ -599,7 +610,7 @@ function Server(options) {
var details = event.new_call; var details = event.new_call;
var call = details.call; var call = details.call;
var method = details.method; var method = details.method;
var metadata = details.metadata; var metadata = Metadata._fromCoreRepresentation(details.metadata);
if (method === null) { if (method === null) {
return; return;
} }
@ -609,7 +620,8 @@ function Server(options) {
handler = handlers[method]; handler = handlers[method];
} else { } else {
var batch = {}; var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = {}; batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
code: grpc.status.UNIMPLEMENTED, code: grpc.status.UNIMPLEMENTED,
details: 'This method is not available on this server.', details: 'This method is not available on this server.',

@ -39,7 +39,7 @@ var health = require('../health_check/health.js');
var grpc = require('../'); var grpc = require('../');
describe('Health Checking', function() { describe.only('Health Checking', function() {
var statusMap = { var statusMap = {
'': { '': {
'': 'SERVING', '': 'SERVING',

@ -0,0 +1,172 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
'use strict';
var Metadata = require('../src/metadata.js');
var assert = require('assert');
describe('Metadata', function() {
var metadata;
beforeEach(function() {
metadata = new Metadata();
});
describe('#set', function() {
it('Only accepts string values for non "-bin" keys', function() {
assert.throws(function() {
metadata.set('key', new Buffer('value'));
});
assert.doesNotThrow(function() {
metadata.set('key', 'value');
});
});
it('Only accepts Buffer values for "-bin" keys', function() {
assert.throws(function() {
metadata.set('key-bin', 'value');
});
assert.doesNotThrow(function() {
metadata.set('key-bin', new Buffer('value'));
});
});
it('Saves values that can be retrieved', function() {
metadata.set('key', 'value');
assert.deepEqual(metadata.get('key'), ['value']);
});
it('Overwrites previous values', function() {
metadata.set('key', 'value1');
metadata.set('key', 'value2');
assert.deepEqual(metadata.get('key'), ['value2']);
});
it('Normalizes keys', function() {
metadata.set('Key', 'value1');
assert.deepEqual(metadata.get('key'), ['value1']);
metadata.set('KEY', 'value2');
assert.deepEqual(metadata.get('key'), ['value2']);
});
});
describe('#add', function() {
it('Only accepts string values for non "-bin" keys', function() {
assert.throws(function() {
metadata.add('key', new Buffer('value'));
});
assert.doesNotThrow(function() {
metadata.add('key', 'value');
});
});
it('Only accepts Buffer values for "-bin" keys', function() {
assert.throws(function() {
metadata.add('key-bin', 'value');
});
assert.doesNotThrow(function() {
metadata.add('key-bin', new Buffer('value'));
});
});
it('Saves values that can be retrieved', function() {
metadata.add('key', 'value');
assert.deepEqual(metadata.get('key'), ['value']);
});
it('Combines with previous values', function() {
metadata.add('key', 'value1');
metadata.add('key', 'value2');
assert.deepEqual(metadata.get('key'), ['value1', 'value2']);
});
it('Normalizes keys', function() {
metadata.add('Key', 'value1');
assert.deepEqual(metadata.get('key'), ['value1']);
metadata.add('KEY', 'value2');
assert.deepEqual(metadata.get('key'), ['value1', 'value2']);
});
});
describe('#remove', function() {
it('clears values from a key', function() {
metadata.add('key', 'value');
metadata.remove('key');
assert.deepEqual(metadata.get('key'), []);
});
it('does not normalize keys', function() {
metadata.add('key', 'value');
metadata.remove('KEY');
assert.deepEqual(metadata.get('key'), ['value']);
});
});
describe('#get', function() {
beforeEach(function() {
metadata.add('key', 'value1');
metadata.add('key', 'value2');
metadata.add('key-bin', new Buffer('value'));
});
it('gets all values associated with a key', function() {
assert.deepEqual(metadata.get('key'), ['value1', 'value2']);
});
it('does not normalize keys', function() {
assert.deepEqual(metadata.get('KEY'), []);
});
it('returns an empty list for non-existent keys', function() {
assert.deepEqual(metadata.get('non-existent-key'), []);
});
it('returns Buffers for "-bin" keys', function() {
assert(metadata.get('key-bin')[0] instanceof Buffer);
});
});
describe('#getMap', function() {
it('gets a map of keys to values', function() {
metadata.add('key1', 'value1');
metadata.add('Key2', 'value2');
metadata.add('KEY3', 'value3');
assert.deepEqual(metadata.getMap(),
{key1: 'value1',
key2: 'value2',
key3: 'value3'});
});
});
describe('#clone', function() {
it('retains values from the original', function() {
metadata.add('key', 'value');
var copy = metadata.clone();
assert.deepEqual(copy.get('key'), ['value']);
});
it('Does not see newly added values', function() {
metadata.add('key', 'value1');
var copy = metadata.clone();
metadata.add('key', 'value2');
assert.deepEqual(copy.get('key'), ['value1']);
});
it('Does not add new values to the original', function() {
metadata.add('key', 'value1');
var copy = metadata.clone();
copy.add('key', 'value2');
assert.deepEqual(metadata.get('key'), ['value1']);
});
});
});

@ -262,6 +262,7 @@ describe('Generic client and server', function() {
describe('Echo metadata', function() { describe('Echo metadata', function() {
var client; var client;
var server; var server;
var metadata;
before(function() { before(function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
var test_service = test_proto.lookup('TestService'); var test_service = test_proto.lookup('TestService');
@ -294,6 +295,8 @@ describe('Echo metadata', function() {
var Client = surface_client.makeProtobufClientConstructor(test_service); var Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port, grpc.Credentials.createInsecure()); client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
server.start(); server.start();
metadata = new grpc.Metadata();
metadata.set('key', 'value');
}); });
after(function() { after(function() {
server.shutdown(); server.shutdown();
@ -301,35 +304,35 @@ describe('Echo metadata', function() {
it('with unary call', function(done) { it('with unary call', function(done) {
var call = client.unary({}, function(err, data) { var call = client.unary({}, function(err, data) {
assert.ifError(err); assert.ifError(err);
}, {key: ['value']}); }, metadata);
call.on('metadata', function(metadata) { call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']); assert.deepEqual(metadata.get('key'), ['value']);
done(); done();
}); });
}); });
it('with client stream call', function(done) { it('with client stream call', function(done) {
var call = client.clientStream(function(err, data) { var call = client.clientStream(function(err, data) {
assert.ifError(err); assert.ifError(err);
}, {key: ['value']}); }, metadata);
call.on('metadata', function(metadata) { call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']); assert.deepEqual(metadata.get('key'), ['value']);
done(); done();
}); });
call.end(); call.end();
}); });
it('with server stream call', function(done) { it('with server stream call', function(done) {
var call = client.serverStream({}, {key: ['value']}); var call = client.serverStream({}, metadata);
call.on('data', function() {}); call.on('data', function() {});
call.on('metadata', function(metadata) { call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']); assert.deepEqual(metadata.get('key'), ['value']);
done(); done();
}); });
}); });
it('with bidi stream call', function(done) { it('with bidi stream call', function(done) {
var call = client.bidiStream({key: ['value']}); var call = client.bidiStream(metadata);
call.on('data', function() {}); call.on('data', function() {});
call.on('metadata', function(metadata) { call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']); assert.deepEqual(metadata.get('key'), ['value']);
done(); done();
}); });
call.end(); call.end();
@ -337,9 +340,10 @@ describe('Echo metadata', function() {
it('shows the correct user-agent string', function(done) { it('shows the correct user-agent string', function(done) {
var version = require('../package.json').version; var version = require('../package.json').version;
var call = client.unary({}, function(err, data) { assert.ifError(err); }, var call = client.unary({}, function(err, data) { assert.ifError(err); },
{key: ['value']}); metadata);
call.on('metadata', function(metadata) { call.on('metadata', function(metadata) {
assert(_.startsWith(metadata['user-agent'], 'grpc-node/' + version)); assert(_.startsWith(metadata.get('user-agent')[0],
'grpc-node/' + version));
done(); done();
}); });
}); });
@ -354,13 +358,14 @@ describe('Other conditions', function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
test_service = test_proto.lookup('TestService'); test_service = test_proto.lookup('TestService');
server = new grpc.Server(); server = new grpc.Server();
var trailer_metadata = new grpc.Metadata();
server.addProtoService(test_service, { server.addProtoService(test_service, {
unary: function(call, cb) { unary: function(call, cb) {
var req = call.request; var req = call.request;
if (req.error) { if (req.error) {
cb(new Error('Requested error'), null, {trailer_present: ['yes']}); cb(new Error('Requested error'), null, trailer_metadata);
} else { } else {
cb(null, {count: 1}, {trailer_present: ['yes']}); cb(null, {count: 1}, trailer_metadata);
} }
}, },
clientStream: function(stream, cb){ clientStream: function(stream, cb){
@ -369,14 +374,14 @@ describe('Other conditions', function() {
stream.on('data', function(data) { stream.on('data', function(data) {
if (data.error) { if (data.error) {
errored = true; errored = true;
cb(new Error('Requested error'), null, {trailer_present: ['yes']}); cb(new Error('Requested error'), null, trailer_metadata);
} else { } else {
count += 1; count += 1;
} }
}); });
stream.on('end', function() { stream.on('end', function() {
if (!errored) { if (!errored) {
cb(null, {count: count}, {trailer_present: ['yes']}); cb(null, {count: count}, trailer_metadata);
} }
}); });
}, },
@ -384,13 +389,13 @@ describe('Other conditions', function() {
var req = stream.request; var req = stream.request;
if (req.error) { if (req.error) {
var err = new Error('Requested error'); var err = new Error('Requested error');
err.metadata = {trailer_present: ['yes']}; err.metadata = trailer_metadata;
stream.emit('error', err); stream.emit('error', err);
} else { } else {
for (var i = 0; i < 5; i++) { for (var i = 0; i < 5; i++) {
stream.write({count: i}); stream.write({count: i});
} }
stream.end({trailer_present: ['yes']}); stream.end(trailer_metadata);
} }
}, },
bidiStream: function(stream) { bidiStream: function(stream) {
@ -398,10 +403,8 @@ describe('Other conditions', function() {
stream.on('data', function(data) { stream.on('data', function(data) {
if (data.error) { if (data.error) {
var err = new Error('Requested error'); var err = new Error('Requested error');
err.metadata = { err.metadata = trailer_metadata.clone();
trailer_present: ['yes'], err.metadata.add('count', '' + count);
count: ['' + count]
};
stream.emit('error', err); stream.emit('error', err);
} else { } else {
stream.write({count: count}); stream.write({count: count});
@ -409,7 +412,7 @@ describe('Other conditions', function() {
} }
}); });
stream.on('end', function() { stream.on('end', function() {
stream.end({trailer_present: ['yes']}); stream.end(trailer_metadata);
}); });
} }
}); });
@ -510,7 +513,7 @@ describe('Other conditions', function() {
assert.ifError(err); assert.ifError(err);
}); });
call.on('status', function(status) { call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']); assert.deepEqual(status.metadata.get('trailer_present'), ['yes']);
done(); done();
}); });
}); });
@ -519,7 +522,7 @@ describe('Other conditions', function() {
assert(err); assert(err);
}); });
call.on('status', function(status) { call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']); assert.deepEqual(status.metadata.get('trailer_present'), ['yes']);
done(); done();
}); });
}); });
@ -531,7 +534,7 @@ describe('Other conditions', function() {
call.write({error: false}); call.write({error: false});
call.end(); call.end();
call.on('status', function(status) { call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']); assert.deepEqual(status.metadata.get('trailer_present'), ['yes']);
done(); done();
}); });
}); });
@ -543,7 +546,7 @@ describe('Other conditions', function() {
call.write({error: true}); call.write({error: true});
call.end(); call.end();
call.on('status', function(status) { call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']); assert.deepEqual(status.metadata.get('trailer_present'), ['yes']);
done(); done();
}); });
}); });
@ -552,7 +555,7 @@ describe('Other conditions', function() {
call.on('data', function(){}); call.on('data', function(){});
call.on('status', function(status) { call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK); assert.strictEqual(status.code, grpc.status.OK);
assert.deepEqual(status.metadata.trailer_present, ['yes']); assert.deepEqual(status.metadata.get('trailer_present'), ['yes']);
done(); done();
}); });
}); });
@ -560,7 +563,7 @@ describe('Other conditions', function() {
var call = client.serverStream({error: true}); var call = client.serverStream({error: true});
call.on('data', function(){}); call.on('data', function(){});
call.on('error', function(error) { call.on('error', function(error) {
assert.deepEqual(error.metadata.trailer_present, ['yes']); assert.deepEqual(error.metadata.get('trailer_present'), ['yes']);
done(); done();
}); });
}); });
@ -572,7 +575,7 @@ describe('Other conditions', function() {
call.on('data', function(){}); call.on('data', function(){});
call.on('status', function(status) { call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK); assert.strictEqual(status.code, grpc.status.OK);
assert.deepEqual(status.metadata.trailer_present, ['yes']); assert.deepEqual(status.metadata.get('trailer_present'), ['yes']);
done(); done();
}); });
}); });
@ -583,7 +586,7 @@ describe('Other conditions', function() {
call.end(); call.end();
call.on('data', function(){}); call.on('data', function(){});
call.on('error', function(error) { call.on('error', function(error) {
assert.deepEqual(error.metadata.trailer_present, ['yes']); assert.deepEqual(error.metadata.get('trailer_present'), ['yes']);
done(); done();
}); });
}); });

Loading…
Cancel
Save