diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 01217bce797..32a8ff55b12 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -120,7 +120,7 @@ Server::Server(grpc_server *server) : wrapped_server(server) { Server::~Server() { this->ShutdownServer(); grpc_completion_queue_shutdown(this->shutdown_queue); - grpc_server_destroy(wrapped_server); + grpc_server_destroy(this->wrapped_server); grpc_completion_queue_destroy(this->shutdown_queue); } @@ -139,8 +139,11 @@ void Server::Init(Handle exports) { NanSetPrototypeTemplate(tpl, "start", NanNew(Start)->GetFunction()); - NanSetPrototypeTemplate(tpl, "shutdown", - NanNew(Shutdown)->GetFunction()); + NanSetPrototypeTemplate(tpl, "tryShutdown", + NanNew(TryShutdown)->GetFunction()); + NanSetPrototypeTemplate( + tpl, "forceShutdown", + NanNew(ForceShutdown)->GetFunction()); NanAssignPersistent(fun_tpl, tpl); Handle ctr = tpl->GetFunction(); @@ -153,14 +156,12 @@ bool Server::HasInstance(Handle val) { } void Server::ShutdownServer() { - if (this->wrapped_server != NULL) { - grpc_server_shutdown_and_notify(this->wrapped_server, - this->shutdown_queue, - NULL); - grpc_completion_queue_pluck(this->shutdown_queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); - this->wrapped_server = NULL; - } + grpc_server_shutdown_and_notify(this->wrapped_server, + this->shutdown_queue, + NULL); + grpc_server_cancel_all_calls(this->wrapped_server); + grpc_completion_queue_pluck(this->shutdown_queue, NULL, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); } NAN_METHOD(Server::New) { @@ -222,9 +223,6 @@ NAN_METHOD(Server::RequestCall) { return NanThrowTypeError("requestCall can only be called on a Server"); } Server *server = ObjectWrap::Unwrap(args.This()); - if (server->wrapped_server == NULL) { - return NanThrowError("requestCall cannot be called on a shut down Server"); - } NewCallOp *op = new NewCallOp(); unique_ptr ops(new OpVec()); ops->push_back(unique_ptr(op)); @@ -256,10 +254,6 @@ NAN_METHOD(Server::AddHttp2Port) { "addHttp2Port's second argument must be ServerCredentials"); } Server *server = ObjectWrap::Unwrap(args.This()); - if (server->wrapped_server == NULL) { - return NanThrowError( - "addHttp2Port cannot be called on a shut down Server"); - } ServerCredentials *creds_object = ObjectWrap::Unwrap( args[1]->ToObject()); grpc_server_credentials *creds = creds_object->GetWrappedServerCredentials(); @@ -281,21 +275,30 @@ NAN_METHOD(Server::Start) { return NanThrowTypeError("start can only be called on a Server"); } Server *server = ObjectWrap::Unwrap(args.This()); - if (server->wrapped_server == NULL) { - return NanThrowError("start cannot be called on a shut down Server"); - } grpc_server_start(server->wrapped_server); NanReturnUndefined(); } -NAN_METHOD(ShutdownCallback) { +NAN_METHOD(Server::TryShutdown) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("tryShutdown can only be called on a Server"); + } + Server *server = ObjectWrap::Unwrap(args.This()); + unique_ptr ops(new OpVec()); + grpc_server_shutdown_and_notify( + server->wrapped_server, + CompletionQueueAsyncWorker::GetQueue(), + new struct tag(new NanCallback(args[0].As()), ops.release(), + shared_ptr(nullptr))); + CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); } -NAN_METHOD(Server::Shutdown) { +NAN_METHOD(Server::ForceShutdown) { NanScope(); if (!HasInstance(args.This())) { - return NanThrowTypeError("shutdown can only be called on a Server"); + return NanThrowTypeError("forceShutdown can only be called on a Server"); } Server *server = ObjectWrap::Unwrap(args.This()); server->ShutdownServer(); diff --git a/src/node/ext/server.h b/src/node/ext/server.h index faab7e3418c..e7d5c3fb11a 100644 --- a/src/node/ext/server.h +++ b/src/node/ext/server.h @@ -67,7 +67,8 @@ class Server : public ::node::ObjectWrap { static NAN_METHOD(RequestCall); static NAN_METHOD(AddHttp2Port); static NAN_METHOD(Start); - static NAN_METHOD(Shutdown); + static NAN_METHOD(TryShutdown); + static NAN_METHOD(ForceShutdown); static NanCallback *constructor; static v8::Persistent fun_tpl; diff --git a/src/node/index.js b/src/node/index.js index 889b0ac0e92..51d3fa590cd 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -41,6 +41,8 @@ var client = require('./src/client.js'); var server = require('./src/server.js'); +var Metadata = require('./src/metadata.js'); + var grpc = require('bindings')('grpc'); /** @@ -107,18 +109,12 @@ exports.getGoogleAuthDelegate = function getGoogleAuthDelegate(credential) { * @param {function(Error, Object)} callback */ return function updateMetadata(authURI, metadata, callback) { - metadata = _.clone(metadata); - if (metadata.Authorization) { - metadata.Authorization = _.clone(metadata.Authorization); - } else { - metadata.Authorization = []; - } credential.getRequestMetadata(authURI, function(err, header) { if (err) { callback(err); return; } - metadata.Authorization.push(header.Authorization); + metadata.add('authorization', header.Authorization); callback(null, metadata); }); }; @@ -129,6 +125,11 @@ exports.getGoogleAuthDelegate = function getGoogleAuthDelegate(credential) { */ exports.Server = server.Server; +/** + * @see module:src/metadata + */ +exports.Metadata = Metadata; + /** * Status name to code number mapping */ diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index 612dcf01f65..8fb8d669206 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -321,13 +321,7 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) { credential.getAccessToken(function(err, token) { assert.ifError(err); var updateMetadata = function(authURI, metadata, callback) { - metadata = _.clone(metadata); - if (metadata.Authorization) { - metadata.Authorization = _.clone(metadata.Authorization); - } else { - metadata.Authorization = []; - } - metadata.Authorization.push('Bearer ' + token); + metadata.Add('authorization', 'Bearer ' + token); callback(null, metadata); }; var makeTestCall = function(error, client_metadata) { diff --git a/src/node/src/client.js b/src/node/src/client.js index 7b7eae51d26..e1bed3512e9 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -42,7 +42,9 @@ var _ = require('lodash'); var grpc = require('bindings')('grpc.node'); -var common = require('./common.js'); +var common = require('./common'); + +var Metadata = require('./metadata'); var EventEmitter = require('events').EventEmitter; @@ -254,8 +256,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { * serialize * @param {function(?Error, value=)} callback The callback to for when the * response is received - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call + * @param {Metadata=} metadata Metadata to add to the call * @param {Object=} options Options map * @return {EventEmitter} An event emitter for stream related events */ @@ -264,7 +265,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { var emitter = new EventEmitter(); var call = getCall(this.channel, method, options); if (metadata === null || metadata === undefined) { - metadata = {}; + metadata = new Metadata(); + } else { + metadata = metadata.clone(); } emitter.cancel = function cancel() { call.cancel(); @@ -283,13 +286,16 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { if (options) { message.grpcWriteFlags = options.flags; } - client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = + metadata._getCoreRepresentation(); client_batch[grpc.opType.SEND_MESSAGE] = message; client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; client_batch[grpc.opType.RECV_MESSAGE] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); emitter.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -304,7 +310,8 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { return; } } - emitter.emit('metadata', response.metadata); + emitter.emit('metadata', Metadata._fromCoreRepresentation( + response.metadata)); callback(null, deserialize(response.read)); }); }); @@ -328,7 +335,7 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { * @this {Client} Client object. Must have a channel member. * @param {function(?Error, value=)} callback The callback to for when the * response is received - * @param {array=} metadata Array of metadata key/value pairs to add to the + * @param {Metadata=} metadata Array of metadata key/value pairs to add to the * call * @param {Object=} options Options map * @return {EventEmitter} An event emitter for stream related events @@ -337,7 +344,9 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { /* jshint validthis: true */ var call = getCall(this.channel, method, options); if (metadata === null || metadata === undefined) { - metadata = {}; + metadata = new Metadata(); + } else { + metadata = metadata.clone(); } var stream = new ClientWritableStream(call, serialize); this.updateMetadata(this.auth_uri, metadata, function(error, metadata) { @@ -347,7 +356,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { return; } var metadata_batch = {}; - metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = + metadata._getCoreRepresentation(); metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; call.startBatch(metadata_batch, function(err, response) { if (err) { @@ -355,12 +365,15 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { // in the other batch. return; } - stream.emit('metadata', response.metadata); + stream.emit('metadata', Metadata._fromCoreRepresentation( + response.metadata)); }); var client_batch = {}; client_batch[grpc.opType.RECV_MESSAGE] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -398,7 +411,7 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { * @this {SurfaceClient} Client object. Must have a channel member. * @param {*} argument The argument to the call. Should be serializable with * serialize - * @param {array=} metadata Array of metadata key/value pairs to add to the + * @param {Metadata=} metadata Array of metadata key/value pairs to add to the * call * @param {Object} options Options map * @return {EventEmitter} An event emitter for stream related events @@ -407,7 +420,9 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { /* jshint validthis: true */ var call = getCall(this.channel, method, options); if (metadata === null || metadata === undefined) { - metadata = {}; + metadata = new Metadata(); + } else { + metadata = metadata.clone(); } var stream = new ClientReadableStream(call, deserialize); this.updateMetadata(this.auth_uri, metadata, function(error, metadata) { @@ -421,7 +436,8 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { if (options) { message.grpcWriteFlags = options.flags; } - start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + start_batch[grpc.opType.SEND_INITIAL_METADATA] = + metadata._getCoreRepresentation(); start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; start_batch[grpc.opType.SEND_MESSAGE] = message; start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; @@ -431,11 +447,14 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { // in the other batch. return; } - stream.emit('metadata', response.metadata); + stream.emit('metadata', Metadata._fromCoreRepresentation( + response.metadata)); }); var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -470,7 +489,7 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { /** * Make a bidirectional stream request with this method on the given channel. * @this {SurfaceClient} Client object. Must have a channel member. - * @param {array=} metadata Array of metadata key/value pairs to add to the + * @param {Metadata=} metadata Array of metadata key/value pairs to add to the * call * @param {Options} options Options map * @return {EventEmitter} An event emitter for stream related events @@ -479,7 +498,9 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { /* jshint validthis: true */ var call = getCall(this.channel, method, options); if (metadata === null || metadata === undefined) { - metadata = {}; + metadata = new Metadata(); + } else { + metadata = metadata.clone(); } var stream = new ClientDuplexStream(call, serialize, deserialize); this.updateMetadata(this.auth_uri, metadata, function(error, metadata) { @@ -489,7 +510,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { return; } var start_batch = {}; - start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + start_batch[grpc.opType.SEND_INITIAL_METADATA] = + metadata._getCoreRepresentation(); start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; call.startBatch(start_batch, function(err, response) { if (err) { @@ -497,11 +519,14 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { // in the other batch. return; } - stream.emit('metadata', response.metadata); + stream.emit('metadata', Metadata._fromCoreRepresentation( + response.metadata)); }); var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); diff --git a/src/node/src/metadata.js b/src/node/src/metadata.js new file mode 100644 index 00000000000..65fd91f3672 --- /dev/null +++ b/src/node/src/metadata.js @@ -0,0 +1,179 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/** + * Metadata module + * @module + */ + +'use strict'; + +var _ = require('lodash'); + +/** + * Class for storing metadata. Keys are normalized to lowercase ASCII. + * @constructor + */ +function Metadata() { + this._internal_repr = {}; +} + +function normalizeKey(key) { + if (!(/^[A-Za-z\d_-]+$/.test(key))) { + throw new Error('Metadata keys must be nonempty strings containing only ' + + 'alphanumeric characters and hyphens'); + } + return key.toLowerCase(); +} + +function validate(key, value) { + if (_.endsWith(key, '-bin')) { + if (!(value instanceof Buffer)) { + throw new Error('keys that end with \'-bin\' must have Buffer values'); + } + } else { + if (!_.isString(value)) { + throw new Error( + 'keys that don\'t end with \'-bin\' must have String values'); + } + if (!(/^[\x20-\x7E]*$/.test(value))) { + throw new Error('Metadata string values can only contain printable ' + + 'ASCII characters and space'); + } + } +} + +/** + * Sets the given value for the given key, replacing any other values associated + * with that key. Normalizes the key. + * @param {String} key The key to set + * @param {String|Buffer} value The value to set. Must be a buffer if and only + * if the normalized key ends with '-bin' + */ +Metadata.prototype.set = function(key, value) { + key = normalizeKey(key); + validate(key, value); + this._internal_repr[key] = [value]; +}; + +/** + * Adds the given value for the given key. Normalizes the key. + * @param {String} key The key to add to. + * @param {String|Buffer} value The value to add. Must be a buffer if and only + * if the normalized key ends with '-bin' + */ +Metadata.prototype.add = function(key, value) { + key = normalizeKey(key); + validate(key, value); + if (!this._internal_repr[key]) { + this._internal_repr[key] = []; + } + this._internal_repr[key].push(value); +}; + +/** + * Remove the given key and any associated values. Normalizes the key. + * @param {String} key The key to remove + */ +Metadata.prototype.remove = function(key) { + key = normalizeKey(key); + if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) { + delete this._internal_repr[key]; + } +}; + +/** + * Gets a list of all values associated with the key. Normalizes the key. + * @param {String} key The key to get + * @return {Array.} The values associated with that key + */ +Metadata.prototype.get = function(key) { + key = normalizeKey(key); + if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) { + return this._internal_repr[key]; + } else { + return []; + } +}; + +/** + * Get a map of each key to a single associated value. This reflects the most + * common way that people will want to see metadata. + * @return {Object.} A key/value mapping of the metadata + */ +Metadata.prototype.getMap = function() { + var result = {}; + _.forOwn(this._internal_repr, function(values, key) { + if(values.length > 0) { + result[key] = values[0]; + } + }); + return result; +}; + +/** + * Clone the metadata object. + * @return {Metadata} The new cloned object + */ +Metadata.prototype.clone = function() { + var copy = new Metadata(); + copy._internal_repr = _.cloneDeep(this._internal_repr); + return copy; +}; + +/** + * Gets the metadata in the format used by interal code. Intended for internal + * use only. API stability is not guaranteed. + * @private + * @return {Object.>} The metadata + */ +Metadata.prototype._getCoreRepresentation = function() { + return this._internal_repr; +}; + +/** + * Creates a Metadata object from a metadata map in the internal format. + * Intended for internal use only. API stability is not guaranteed. + * @private + * @param {Object.>} The metadata + * @return {Metadata} The new Metadata object + */ +Metadata._fromCoreRepresentation = function(metadata) { + var newMetadata = new Metadata(); + if (metadata) { + newMetadata._internal_repr = _.cloneDeep(metadata); + } + return newMetadata; +}; + +module.exports = Metadata; diff --git a/src/node/src/server.js b/src/node/src/server.js index 5037abae434..b6f162adf85 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -44,6 +44,8 @@ var grpc = require('bindings')('grpc.node'); var common = require('./common'); +var Metadata = require('./metadata'); + var stream = require('stream'); var Readable = stream.Readable; @@ -60,10 +62,10 @@ var EventEmitter = require('events').EventEmitter; * @param {Object} error The error object */ function handleError(call, error) { + var statusMetadata = new Metadata(); var status = { code: grpc.status.UNKNOWN, - details: 'Unknown Error', - metadata: {} + details: 'Unknown Error' }; if (error.hasOwnProperty('message')) { status.details = error.message; @@ -75,11 +77,13 @@ function handleError(call, error) { } } if (error.hasOwnProperty('metadata')) { - status.metadata = error.metadata; + statusMetadata = error.metadata; } + status.metadata = statusMetadata._getCoreRepresentation(); var error_batch = {}; if (!call.metadataSent) { - error_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + error_batch[grpc.opType.SEND_INITIAL_METADATA] = + (new Metadata())._getCoreRepresentation(); } error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(error_batch, function(){}); @@ -114,22 +118,24 @@ function waitForCancel(call, emitter) { * @param {*} value The value to respond with * @param {function(*):Buffer=} serialize Serialization function for the * response - * @param {Object=} metadata Optional trailing metadata to send with status + * @param {Metadata=} metadata Optional trailing metadata to send with status * @param {number=} flags Flags for modifying how the message is sent. * Defaults to 0. */ function sendUnaryResponse(call, value, serialize, metadata, flags) { var end_batch = {}; + var statusMetadata = new Metadata(); var status = { code: grpc.status.OK, - details: 'OK', - metadata: {} + details: 'OK' }; if (metadata) { - status.metadata = metadata; + statusMetadata = metadata; } + status.metadata = statusMetadata._getCoreRepresentation(); if (!call.metadataSent) { - end_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + end_batch[grpc.opType.SEND_INITIAL_METADATA] = + (new Metadata())._getCoreRepresentation(); call.metadataSent = true; } var message = serialize(value); @@ -151,14 +157,19 @@ function setUpWritable(stream, serialize) { stream.status = { code : grpc.status.OK, details : 'OK', - metadata : {} + metadata : new Metadata() }; stream.serialize = common.wrapIgnoreNull(serialize); function sendStatus() { var batch = {}; if (!stream.call.metadataSent) { stream.call.metadataSent = true; - batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = + (new Metadata())._getCoreRepresentation(); + } + + if (stream.status.metadata) { + stream.status.metadata = stream.status.metadata._getCoreRepresentation(); } batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; stream.call.startBatch(batch, function(){}); @@ -173,7 +184,7 @@ function setUpWritable(stream, serialize) { function setStatus(err) { var code = grpc.status.UNKNOWN; var details = 'Unknown Error'; - var metadata = {}; + var metadata = new Metadata(); if (err.hasOwnProperty('message')) { details = err.message; } @@ -203,7 +214,7 @@ function setUpWritable(stream, serialize) { /** * Override of Writable#end method that allows for sending metadata with a * success status. - * @param {Object=} metadata Metadata to send with the status + * @param {Metadata=} metadata Metadata to send with the status */ stream.end = function(metadata) { if (metadata) { @@ -266,7 +277,8 @@ function _write(chunk, encoding, callback) { /* jshint validthis: true */ var batch = {}; if (!this.call.metadataSent) { - batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = + (new Metadata())._getCoreRepresentation(); this.call.metadataSent = true; } var message = this.serialize(chunk); @@ -289,15 +301,15 @@ ServerWritableStream.prototype._write = _write; /** * Send the initial metadata for a writable stream. - * @param {Object>} responseMetadata Metadata - * to send + * @param {Metadata} responseMetadata Metadata to send */ function sendMetadata(responseMetadata) { /* jshint validthis: true */ if (!this.call.metadataSent) { this.call.metadataSent = true; var batch = []; - batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; + batch[grpc.opType.SEND_INITIAL_METADATA] = + responseMetadata._getCoreRepresentation(); this.call.startBatch(batch, function(err) { if (err) { this.emit('error', err); @@ -422,7 +434,7 @@ ServerDuplexStream.prototype.getPeer = getPeer; * @access private * @param {grpc.Call} call The call to handle * @param {Object} handler Request handler object for the method that was called - * @param {Object} metadata Metadata from the client + * @param {Metadata} metadata Metadata from the client */ function handleUnary(call, handler, metadata) { var emitter = new EventEmitter(); @@ -430,7 +442,8 @@ function handleUnary(call, handler, metadata) { if (!call.metadataSent) { call.metadataSent = true; var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; + batch[grpc.opType.SEND_INITIAL_METADATA] = + responseMetadata._getCoreRepresentation(); call.startBatch(batch, function() {}); } }; @@ -478,7 +491,7 @@ function handleUnary(call, handler, metadata) { * @access private * @param {grpc.Call} call The call to handle * @param {Object} handler Request handler object for the method that was called - * @param {Object} metadata Metadata from the client + * @param {Metadata} metadata Metadata from the client */ function handleServerStreaming(call, handler, metadata) { var stream = new ServerWritableStream(call, handler.serialize); @@ -507,7 +520,7 @@ function handleServerStreaming(call, handler, metadata) { * @access private * @param {grpc.Call} call The call to handle * @param {Object} handler Request handler object for the method that was called - * @param {Object} metadata Metadata from the client + * @param {Metadata} metadata Metadata from the client */ function handleClientStreaming(call, handler, metadata) { var stream = new ServerReadableStream(call, handler.deserialize); @@ -515,7 +528,8 @@ function handleClientStreaming(call, handler, metadata) { if (!call.metadataSent) { call.metadataSent = true; var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; + batch[grpc.opType.SEND_INITIAL_METADATA] = + responseMetadata._getCoreRepresentation(); call.startBatch(batch, function() {}); } }; @@ -542,7 +556,7 @@ function handleClientStreaming(call, handler, metadata) { * @access private * @param {grpc.Call} call The call to handle * @param {Object} handler Request handler object for the method that was called - * @param {Object} metadata Metadata from the client + * @param {Metadata} metadata Metadata from the client */ function handleBidiStreaming(call, handler, metadata) { var stream = new ServerDuplexStream(call, handler.serialize, @@ -599,7 +613,7 @@ function Server(options) { var details = event.new_call; var call = details.call; var method = details.method; - var metadata = details.metadata; + var metadata = Metadata._fromCoreRepresentation(details.metadata); if (method === null) { return; } @@ -609,7 +623,8 @@ function Server(options) { handler = handlers[method]; } else { var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = + (new Metadata())._getCoreRepresentation(); batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { code: grpc.status.UNIMPLEMENTED, details: 'This method is not available on this server.', @@ -623,11 +638,26 @@ function Server(options) { } server.requestCall(handleNewCall); }; + + /** + * Gracefully shuts down the server. The server will stop receiving new calls, + * and any pending calls will complete. The callback will be called when all + * pending calls have completed and the server is fully shut down. This method + * is idempotent with itself and forceShutdown. + * @param {function()} callback The shutdown complete callback + */ + this.tryShutdown = function(callback) { + server.tryShutdown(callback); + }; + /** - * Shuts down the server. + * Forcibly shuts down the server. The server will stop receiving new calls + * and cancel all pending calls. When it returns, the server has shut down. + * This method is idempotent with itself and tryShutdown, and it will trigger + * any outstanding tryShutdown callbacks. */ - this.shutdown = function() { - server.shutdown(); + this.forceShutdown = function() { + server.forceShutdown(); }; } diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js index 8d0f20b0747..e7f071bcd53 100644 --- a/src/node/test/call_test.js +++ b/src/node/test/call_test.js @@ -61,7 +61,7 @@ describe('call', function() { channel = new grpc.Channel('localhost:' + port, insecureCreds); }); after(function() { - server.shutdown(); + server.forceShutdown(); }); describe('constructor', function() { it('should reject anything less than 3 arguments', function() { diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index 7574d98b8af..4b8da3bfb17 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -70,7 +70,7 @@ describe('end-to-end', function() { channel = new grpc.Channel('localhost:' + port_num, insecureCreds); }); after(function() { - server.shutdown(); + server.forceShutdown(); }); it('should start and end a request without error', function(complete) { var done = multiDone(complete, 2); diff --git a/src/node/test/health_test.js b/src/node/test/health_test.js index 04959f5f552..9267bff7ebe 100644 --- a/src/node/test/health_test.js +++ b/src/node/test/health_test.js @@ -57,7 +57,7 @@ describe('Health Checking', function() { grpc.Credentials.createInsecure()); }); after(function() { - healthServer.shutdown(); + healthServer.forceShutdown(); }); it('should say an enabled service is SERVING', function(done) { healthClient.check({service: ''}, function(err, response) { diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index 0a5eb29c0c1..2ca07c1d50d 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -51,7 +51,7 @@ describe('Interop tests', function() { done(); }); after(function() { - server.shutdown(); + server.forceShutdown(); }); // This depends on not using a binary stream it('should pass empty_unary', function(done) { diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js index ef01870a4c1..80b0c5ff2a9 100644 --- a/src/node/test/math_client_test.js +++ b/src/node/test/math_client_test.js @@ -59,7 +59,7 @@ describe('Math client', function() { done(); }); after(function() { - server.shutdown(); + server.forceShutdown(); }); it('should handle a single request', function(done) { var arg = {dividend: 7, divisor: 4}; diff --git a/src/node/test/metadata_test.js b/src/node/test/metadata_test.js new file mode 100644 index 00000000000..86383f1badc --- /dev/null +++ b/src/node/test/metadata_test.js @@ -0,0 +1,193 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +'use strict'; + +var Metadata = require('../src/metadata.js'); + +var assert = require('assert'); + +describe('Metadata', function() { + var metadata; + beforeEach(function() { + metadata = new Metadata(); + }); + describe('#set', function() { + it('Only accepts string values for non "-bin" keys', function() { + assert.throws(function() { + metadata.set('key', new Buffer('value')); + }); + assert.doesNotThrow(function() { + metadata.set('key', 'value'); + }); + }); + it('Only accepts Buffer values for "-bin" keys', function() { + assert.throws(function() { + metadata.set('key-bin', 'value'); + }); + assert.doesNotThrow(function() { + metadata.set('key-bin', new Buffer('value')); + }); + }); + it('Rejects invalid keys', function() { + assert.throws(function() { + metadata.set('key$', 'value'); + }); + assert.throws(function() { + metadata.set('', 'value'); + }); + }); + it('Rejects values with non-ASCII characters', function() { + assert.throws(function() { + metadata.set('key', 'résumé'); + }); + }); + it('Saves values that can be retrieved', function() { + metadata.set('key', 'value'); + assert.deepEqual(metadata.get('key'), ['value']); + }); + it('Overwrites previous values', function() { + metadata.set('key', 'value1'); + metadata.set('key', 'value2'); + assert.deepEqual(metadata.get('key'), ['value2']); + }); + it('Normalizes keys', function() { + metadata.set('Key', 'value1'); + assert.deepEqual(metadata.get('key'), ['value1']); + metadata.set('KEY', 'value2'); + assert.deepEqual(metadata.get('key'), ['value2']); + }); + }); + describe('#add', function() { + it('Only accepts string values for non "-bin" keys', function() { + assert.throws(function() { + metadata.add('key', new Buffer('value')); + }); + assert.doesNotThrow(function() { + metadata.add('key', 'value'); + }); + }); + it('Only accepts Buffer values for "-bin" keys', function() { + assert.throws(function() { + metadata.add('key-bin', 'value'); + }); + assert.doesNotThrow(function() { + metadata.add('key-bin', new Buffer('value')); + }); + }); + it('Rejects invalid keys', function() { + assert.throws(function() { + metadata.add('key$', 'value'); + }); + assert.throws(function() { + metadata.add('', 'value'); + }); + }); + it('Saves values that can be retrieved', function() { + metadata.add('key', 'value'); + assert.deepEqual(metadata.get('key'), ['value']); + }); + it('Combines with previous values', function() { + metadata.add('key', 'value1'); + metadata.add('key', 'value2'); + assert.deepEqual(metadata.get('key'), ['value1', 'value2']); + }); + it('Normalizes keys', function() { + metadata.add('Key', 'value1'); + assert.deepEqual(metadata.get('key'), ['value1']); + metadata.add('KEY', 'value2'); + assert.deepEqual(metadata.get('key'), ['value1', 'value2']); + }); + }); + describe('#remove', function() { + it('clears values from a key', function() { + metadata.add('key', 'value'); + metadata.remove('key'); + assert.deepEqual(metadata.get('key'), []); + }); + it('Normalizes keys', function() { + metadata.add('key', 'value'); + metadata.remove('KEY'); + assert.deepEqual(metadata.get('key'), []); + }); + }); + describe('#get', function() { + beforeEach(function() { + metadata.add('key', 'value1'); + metadata.add('key', 'value2'); + metadata.add('key-bin', new Buffer('value')); + }); + it('gets all values associated with a key', function() { + assert.deepEqual(metadata.get('key'), ['value1', 'value2']); + }); + it('Normalizes keys', function() { + assert.deepEqual(metadata.get('KEY'), ['value1', 'value2']); + }); + it('returns an empty list for non-existent keys', function() { + assert.deepEqual(metadata.get('non-existent-key'), []); + }); + it('returns Buffers for "-bin" keys', function() { + assert(metadata.get('key-bin')[0] instanceof Buffer); + }); + }); + describe('#getMap', function() { + it('gets a map of keys to values', function() { + metadata.add('key1', 'value1'); + metadata.add('Key2', 'value2'); + metadata.add('KEY3', 'value3'); + assert.deepEqual(metadata.getMap(), + {key1: 'value1', + key2: 'value2', + key3: 'value3'}); + }); + }); + describe('#clone', function() { + it('retains values from the original', function() { + metadata.add('key', 'value'); + var copy = metadata.clone(); + assert.deepEqual(copy.get('key'), ['value']); + }); + it('Does not see newly added values', function() { + metadata.add('key', 'value1'); + var copy = metadata.clone(); + metadata.add('key', 'value2'); + assert.deepEqual(copy.get('key'), ['value1']); + }); + it('Does not add new values to the original', function() { + metadata.add('key', 'value1'); + var copy = metadata.clone(); + copy.add('key', 'value2'); + assert.deepEqual(metadata.get('key'), ['value1']); + }); + }); +}); diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js index 78bac8da294..1e69d52e583 100644 --- a/src/node/test/server_test.js +++ b/src/node/test/server_test.js @@ -92,7 +92,7 @@ describe('server', function() { server.addHttp2Port('0.0.0.0:0', grpc.ServerCredentials.createInsecure()); }); after(function() { - server.shutdown(); + server.forceShutdown(); }); it('should start without error', function() { assert.doesNotThrow(function() { @@ -100,4 +100,33 @@ describe('server', function() { }); }); }); + describe('shutdown', function() { + var server; + beforeEach(function() { + server = new grpc.Server(); + server.addHttp2Port('0.0.0.0:0', grpc.ServerCredentials.createInsecure()); + server.start(); + }); + afterEach(function() { + server.forceShutdown(); + }); + it('tryShutdown should shutdown successfully', function(done) { + server.tryShutdown(done); + }); + it('forceShutdown should shutdown successfully', function() { + server.forceShutdown(); + }); + it('tryShutdown should be idempotent', function(done) { + server.tryShutdown(done); + server.tryShutdown(function() {}); + }); + it('forceShutdown should be idempotent', function() { + server.forceShutdown(); + server.forceShutdown(); + }); + it('forceShutdown should trigger tryShutdown', function(done) { + server.tryShutdown(done); + server.forceShutdown(); + }); + }); }); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index ec7ed87728d..7c2a8d72583 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -104,7 +104,7 @@ describe('Server.prototype.addProtoService', function() { server = new grpc.Server(); }); afterEach(function() { - server.shutdown(); + server.forceShutdown(); }); it('Should succeed with a single service', function() { assert.doesNotThrow(function() { @@ -148,7 +148,7 @@ describe('Client#$waitForReady', function() { client = new Client('localhost:' + port, grpc.Credentials.createInsecure()); }); after(function() { - server.shutdown(); + server.forceShutdown(); }); it('should complete when called alone', function(done) { client.$waitForReady(Infinity, function(error) { @@ -203,7 +203,7 @@ describe('Echo service', function() { server.start(); }); after(function() { - server.shutdown(); + server.forceShutdown(); }); it('should echo the recieved message directly', function(done) { client.echo({value: 'test value', value2: 3}, function(error, response) { @@ -248,7 +248,7 @@ describe('Generic client and server', function() { grpc.Credentials.createInsecure()); }); after(function() { - server.shutdown(); + server.forceShutdown(); }); it('Should respond with a capitalized string', function(done) { client.capitalize('abc', function(err, response) { @@ -262,6 +262,7 @@ describe('Generic client and server', function() { describe('Echo metadata', function() { var client; var server; + var metadata; before(function() { var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); var test_service = test_proto.lookup('TestService'); @@ -294,42 +295,44 @@ describe('Echo metadata', function() { var Client = surface_client.makeProtobufClientConstructor(test_service); client = new Client('localhost:' + port, grpc.Credentials.createInsecure()); server.start(); + metadata = new grpc.Metadata(); + metadata.set('key', 'value'); }); after(function() { - server.shutdown(); + server.forceShutdown(); }); it('with unary call', function(done) { var call = client.unary({}, function(err, data) { assert.ifError(err); - }, {key: ['value']}); + }, metadata); call.on('metadata', function(metadata) { - assert.deepEqual(metadata.key, ['value']); + assert.deepEqual(metadata.get('key'), ['value']); done(); }); }); it('with client stream call', function(done) { var call = client.clientStream(function(err, data) { assert.ifError(err); - }, {key: ['value']}); + }, metadata); call.on('metadata', function(metadata) { - assert.deepEqual(metadata.key, ['value']); + assert.deepEqual(metadata.get('key'), ['value']); done(); }); call.end(); }); it('with server stream call', function(done) { - var call = client.serverStream({}, {key: ['value']}); + var call = client.serverStream({}, metadata); call.on('data', function() {}); call.on('metadata', function(metadata) { - assert.deepEqual(metadata.key, ['value']); + assert.deepEqual(metadata.get('key'), ['value']); done(); }); }); it('with bidi stream call', function(done) { - var call = client.bidiStream({key: ['value']}); + var call = client.bidiStream(metadata); call.on('data', function() {}); call.on('metadata', function(metadata) { - assert.deepEqual(metadata.key, ['value']); + assert.deepEqual(metadata.get('key'), ['value']); done(); }); call.end(); @@ -337,9 +340,10 @@ describe('Echo metadata', function() { it('shows the correct user-agent string', function(done) { var version = require('../package.json').version; var call = client.unary({}, function(err, data) { assert.ifError(err); }, - {key: ['value']}); + metadata); call.on('metadata', function(metadata) { - assert(_.startsWith(metadata['user-agent'], 'grpc-node/' + version)); + assert(_.startsWith(metadata.get('user-agent')[0], + 'grpc-node/' + version)); done(); }); }); @@ -354,13 +358,15 @@ describe('Other conditions', function() { var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); test_service = test_proto.lookup('TestService'); server = new grpc.Server(); + var trailer_metadata = new grpc.Metadata(); + trailer_metadata.add('trailer-present', 'yes'); server.addProtoService(test_service, { unary: function(call, cb) { var req = call.request; if (req.error) { - cb(new Error('Requested error'), null, {trailer_present: ['yes']}); + cb(new Error('Requested error'), null, trailer_metadata); } else { - cb(null, {count: 1}, {trailer_present: ['yes']}); + cb(null, {count: 1}, trailer_metadata); } }, clientStream: function(stream, cb){ @@ -369,14 +375,14 @@ describe('Other conditions', function() { stream.on('data', function(data) { if (data.error) { errored = true; - cb(new Error('Requested error'), null, {trailer_present: ['yes']}); + cb(new Error('Requested error'), null, trailer_metadata); } else { count += 1; } }); stream.on('end', function() { if (!errored) { - cb(null, {count: count}, {trailer_present: ['yes']}); + cb(null, {count: count}, trailer_metadata); } }); }, @@ -384,13 +390,13 @@ describe('Other conditions', function() { var req = stream.request; if (req.error) { var err = new Error('Requested error'); - err.metadata = {trailer_present: ['yes']}; + err.metadata = trailer_metadata; stream.emit('error', err); } else { for (var i = 0; i < 5; i++) { stream.write({count: i}); } - stream.end({trailer_present: ['yes']}); + stream.end(trailer_metadata); } }, bidiStream: function(stream) { @@ -398,10 +404,8 @@ describe('Other conditions', function() { stream.on('data', function(data) { if (data.error) { var err = new Error('Requested error'); - err.metadata = { - trailer_present: ['yes'], - count: ['' + count] - }; + err.metadata = trailer_metadata.clone(); + err.metadata.add('count', '' + count); stream.emit('error', err); } else { stream.write({count: count}); @@ -409,7 +413,7 @@ describe('Other conditions', function() { } }); stream.on('end', function() { - stream.end({trailer_present: ['yes']}); + stream.end(trailer_metadata); }); } }); @@ -419,7 +423,7 @@ describe('Other conditions', function() { server.start(); }); after(function() { - server.shutdown(); + server.forceShutdown(); }); it('channel.getTarget should be available', function() { assert.strictEqual(typeof client.channel.getTarget(), 'string'); @@ -510,7 +514,7 @@ describe('Other conditions', function() { assert.ifError(err); }); call.on('status', function(status) { - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -519,7 +523,7 @@ describe('Other conditions', function() { assert(err); }); call.on('status', function(status) { - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -531,7 +535,7 @@ describe('Other conditions', function() { call.write({error: false}); call.end(); call.on('status', function(status) { - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -543,7 +547,7 @@ describe('Other conditions', function() { call.write({error: true}); call.end(); call.on('status', function(status) { - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -552,7 +556,7 @@ describe('Other conditions', function() { call.on('data', function(){}); call.on('status', function(status) { assert.strictEqual(status.code, grpc.status.OK); - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -560,7 +564,7 @@ describe('Other conditions', function() { var call = client.serverStream({error: true}); call.on('data', function(){}); call.on('error', function(error) { - assert.deepEqual(error.metadata.trailer_present, ['yes']); + assert.deepEqual(error.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -572,7 +576,7 @@ describe('Other conditions', function() { call.on('data', function(){}); call.on('status', function(status) { assert.strictEqual(status.code, grpc.status.OK); - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -583,7 +587,7 @@ describe('Other conditions', function() { call.end(); call.on('data', function(){}); call.on('error', function(error) { - assert.deepEqual(error.metadata.trailer_present, ['yes']); + assert.deepEqual(error.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -681,7 +685,7 @@ describe('Other conditions', function() { }); afterEach(function() { console.log('Shutting down server'); - proxy.shutdown(); + proxy.forceShutdown(); }); describe('Cancellation', function() { it('With a unary call', function(done) { @@ -847,7 +851,7 @@ describe('Cancelling surface client', function() { server.start(); }); after(function() { - server.shutdown(); + server.forceShutdown(); }); it('Should correctly cancel a unary call', function(done) { var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {