diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js index 5321005c863..cc527364b4a 100644 --- a/src/node/interop/interop_server.js +++ b/src/node/interop/interop_server.js @@ -35,6 +35,7 @@ var fs = require('fs'); var path = require('path'); +var async = require('async'); var _ = require('lodash'); var grpc = require('..'); var testProto = grpc.load({ @@ -86,6 +87,22 @@ function getEchoTrailer(call) { return response_trailer; } +/** + * @typedef Payload + * @type {object} + * @property {string} payload_type The payload type + * @property {Buffer} body The payload body + */ + +/** + * Get a payload of the specified type and size. If the requested payload is + * COMPRESSABLE, it returns a zero buffer. If the type is UNCOMRESSABLE, it + * returns a slice of pre-loaded uncompressable data. If the type is RANDOM, + * it returns one of the other choices, chosen at random. + * @param {string} payload_type The type of payload to return + * @param {Number} size The size of the payload body + * @return {Payload} The requested payload + */ function getPayload(payload_type, size) { if (payload_type === 'RANDOM') { payload_type = ['COMPRESSABLE', @@ -99,6 +116,15 @@ function getPayload(payload_type, size) { return {type: payload_type, body: body}; } +function respondWithStream(call, request, callback) { + async.eachSeries(request.response_parameters, function(resp_param, callback) { + setTimeout(function() { + call.write({payload: getPayload(request.response_type, resp_param.size)}); + callback(); + }, resp_param.interval_us/1000); + }, callback); +} + /** * Respond to an empty parameter with an empty response. * NOTE: this currently does not work due to issue #137 @@ -162,10 +188,13 @@ function handleStreamingOutput(call) { call.emit('error', status); return; } - _.each(req.response_parameters, function(resp_param) { - call.write({payload: getPayload(req.response_type, resp_param.size)}); + respondWithStream(call, req, function(err) { + if (err) { + call.emit(err); + } else { + call.end(getEchoTrailer(call)); + } }); - call.end(getEchoTrailer(call)); } /** @@ -175,6 +204,7 @@ function handleStreamingOutput(call) { */ function handleFullDuplex(call) { echoHeader(call); + var call_ended; call.on('data', function(value) { if (value.response_status) { var status = value.response_status; @@ -182,12 +212,17 @@ function handleFullDuplex(call) { call.emit('error', status); return; } - _.each(value.response_parameters, function(resp_param) { - call.write({payload: getPayload(value.response_type, resp_param.size)}); + call.pause(); + respondWithStream(call, value, function(err) { + call.resume(); + if (call_ended) { + call.end(getEchoTrailer(call)); + } }); }); call.on('end', function() { - call.end(getEchoTrailer(call)); + call_ended = true; + }); }