Create benchmark client and server for Node Express

pull/8419/head
murgatroid99 8 years ago
parent 58628ae7a7
commit b53e5d1f2b
  1. 12
      package.json
  2. 289
      src/node/performance/benchmark_client_express.js
  3. 5
      src/node/performance/benchmark_server.js
  4. 107
      src/node/performance/benchmark_server_express.js
  5. 10
      src/node/performance/worker.js
  6. 36
      src/node/performance/worker_service_impl.js
  7. 61
      tools/run_tests/performance/scenario_config.py
  8. 48
      tools/run_tests/run_tests.py

@ -25,7 +25,9 @@
"coverage": "./node_modules/.bin/istanbul cover ./node_modules/.bin/_mocha src/node/test",
"install": "./node_modules/.bin/node-pre-gyp install --fallback-to-build"
},
"bundledDependencies": ["node-pre-gyp"],
"bundledDependencies": [
"node-pre-gyp"
],
"dependencies": {
"arguejs": "^0.2.3",
"lodash": "^3.9.3",
@ -34,6 +36,7 @@
},
"devDependencies": {
"async": "^1.5.0",
"express": "^4.14.0",
"google-auth-library": "^0.9.2",
"google-protobuf": "^3.0.0",
"istanbul": "^0.3.21",
@ -50,11 +53,10 @@
},
"binary": {
"module_name": "grpc_node",
"module_path": "./build/Release/",
"module_path": "src/node/extension_binary",
"host": "https://storage.googleapis.com/",
"remote_path": "grpc-precompiled-binaries/node/{name}/v{version}",
"package_name": "{node_abi}-{platform}-{arch}.tar.gz",
"module_path": "src/node/extension_binary"
"package_name": "{node_abi}-{platform}-{arch}.tar.gz"
},
"files": [
"LICENSE",
@ -75,7 +77,7 @@
],
"main": "src/node/index.js",
"license": "BSD-3-Clause",
"jshintConfig" : {
"jshintConfig": {
"bitwise": true,
"curly": true,
"eqeqeq": true,

@ -0,0 +1,289 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/**
* Benchmark client module
* @module
*/
'use strict';
var fs = require('fs');
var path = require('path');
var util = require('util');
var EventEmitter = require('events');
var http = require('http');
var https = require('https');
var async = require('async');
var _ = require('lodash');
var PoissonProcess = require('poisson-process');
var Histogram = require('./histogram');
/**
* Convert a time difference, as returned by process.hrtime, to a number of
* nanoseconds.
* @param {Array.<number>} time_diff The time diff, represented as
* [seconds, nanoseconds]
* @return {number} The total number of nanoseconds
*/
function timeDiffToNanos(time_diff) {
return time_diff[0] * 1e9 + time_diff[1];
}
function BenchmarkClient(server_targets, channels, histogram_params,
security_params) {
var options = {
method: 'PUT',
headers: {
'Content-Type': 'application/json'
}
};
var protocol;
if (security_params) {
var ca_path;
protocol = https;
this.request = _.bind(https.request, https);
if (security_params.use_test_ca) {
ca_path = path.join(__dirname, '../test/data/ca.pem');
var ca_data = fs.readFileSync(ca_path);
options.ca = ca_data;
}
if (security_params.server_host_override) {
var host_override = security_params.server_host_override;
options.servername = host_override;
}
} else {
protocol = http;
}
this.request = _.bind(protocol.request, protocol);
this.client_options = [];
for (var i = 0; i < channels; i++) {
var new_options = _.assign({host: server_targets[i]}, options);
new_options.agent = new protocol.Agent(new_options);
this.client_options[i] = new_options;
}
this.histogram = new Histogram(histogram_params.resolution,
histogram_params.max_possible);
this.running = false;
this.pending_calls = 0;
}
util.inherits(BenchmarkClient, EventEmitter);
function startAllClients(client_options_list, outstanding_rpcs_per_channel,
makeCall, emitter) {
_.each(client_options_list, function(client_options) {
_.times(outstanding_rpcs_per_channel, function() {
makeCall(client_options);
});
});
}
BenchmarkClient.prototype.startClosedLoop = function(
outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, generic) {
var self = this;
var options = {};
self.running = true;
if (rpc_type == 'UNARY') {
options.path = '/serviceProto.BenchmarkService.service/unaryCall';
} else {
self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
}
if (generic) {
self.emit('error', new Error('Generic client not supported'));
}
self.last_wall_time = process.hrtime();
var argument = {
response_size: resp_size,
payload: {
body: '0'.repeat(req_size)
}
};
function makeCall(client_options) {
if (self.running) {
self.pending_calls++;
var start_time = process.hrtime();
var req = self.request(client_options, function(res) {
var res_data = '';
res.on('data', function(data) {
res_data += data;
});
res.on('end', function() {
JSON.parse(res_data);
var time_diff = process.hrtime(start_time);
self.histogram.add(timeDiffToNanos(time_diff));
makeCall(client_options);
self.pending_calls--;
if ((!self.running) && self.pending_calls == 0) {
self.emit('finished');
}
});
});
req.write(JSON.stringify(argument));
req.end();
req.on('error', function(error) {
self.emit('error', new Error('Client error: ' + error.message));
self.running = false;
});
}
}
startAllClients(_.assign(options, self.client_options),
outstanding_rpcs_per_channel, makeCall, self);
};
BenchmarkClient.prototype.startPoisson = function(
outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load,
generic) {
var self = this;
var options = {};
self.running = true;
if (rpc_type == 'UNARY') {
options.path = '/serviceProto.BenchmarkService.service/unaryCall';
} else {
self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
}
if (generic) {
self.emit('error', new Error('Generic client not supported'));
}
self.last_wall_time = process.hrtime();
var argument = {
response_size: resp_size,
payload: {
body: '0'.repeat(req_size)
}
};
function makeCall(client_options, poisson) {
if (self.running) {
self.pending_calls++;
var start_time = process.hrtime();
var req = self.request(client_options, function(res) {
var res_data = '';
res.on('data', function(data) {
res_data += data;
});
res.on('end', function() {
JSON.parse(res_data);
var time_diff = process.hrtime(start_time);
self.histogram.add(timeDiffToNanos(time_diff));
self.pending_calls--;
if ((!self.running) && self.pending_calls == 0) {
self.emit('finished');
}
});
});
req.write(JSON.stringify(argument));
req.end();
req.on('error', function(error) {
self.emit('error', new Error('Client error: ' + error.message));
self.running = false;
});
} else {
poisson.stop();
}
}
var averageIntervalMs = (1 / offered_load) * 1000;
startAllClients(_.assign(options, self.client_options),
outstanding_rpcs_per_channel, function(opts){
var p = PoissonProcess.create(averageIntervalMs, function() {
makeCall(opts, p);
});
p.start();
}, self);
};
/**
* Return curent statistics for the client. If reset is set, restart
* statistic collection.
* @param {boolean} reset Indicates that statistics should be reset
* @return {object} Client statistics
*/
BenchmarkClient.prototype.mark = function(reset) {
var wall_time_diff = process.hrtime(this.last_wall_time);
var histogram = this.histogram;
if (reset) {
this.last_wall_time = process.hrtime();
this.histogram = new Histogram(histogram.resolution,
histogram.max_possible);
}
return {
latencies: {
bucket: histogram.getContents(),
min_seen: histogram.minimum(),
max_seen: histogram.maximum(),
sum: histogram.getSum(),
sum_of_squares: histogram.sumOfSquares(),
count: histogram.getCount()
},
time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
// Not sure how to measure these values
time_user: 0,
time_system: 0
};
};
/**
* Stop the clients.
* @param {function} callback Called when the clients have finished shutting
* down
*/
BenchmarkClient.prototype.stop = function(callback) {
this.running = false;
this.on('finished', callback);
};
module.exports = BenchmarkClient;

@ -40,6 +40,8 @@
var fs = require('fs');
var path = require('path');
var EventEmitter = require('events');
var util = require('util');
var genericService = require('./generic_service');
@ -138,12 +140,15 @@ function BenchmarkServer(host, port, tls, generic, response_size) {
this.server = server;
}
util.inherits(BenchmarkServer, EventEmitter);
/**
* Start the benchmark server.
*/
BenchmarkServer.prototype.start = function() {
this.server.start();
this.last_wall_time = process.hrtime();
this.emit('started');
};
/**

@ -0,0 +1,107 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/**
* Benchmark server module
* @module
*/
'use strict';
var fs = require('fs');
var path = require('path');
var http = require('http');
var https = require('https');
var EventEmitter = require('events');
var util = require('util');
var express = require('express');
function unaryCall(req, res) {
var reqObj = JSON.parse(req.body);
var payload = {body: '0'.repeat(reqObj.response_size)};
res.send(JSON.dumps(payload));
}
function BenchmarkServer(host, port, tls, generic, response_size) {
var app = express();
app.put('/serviceProto.BenchmarkService.service/unaryCall', unaryCall);
this.input_host = host;
this.input_port = port;
if (tls) {
var credentials = {};
var key_path = path.join(__dirname, '../test/data/server1.key');
var pem_path = path.join(__dirname, '../test/data/server1.pem');
var key_data = fs.readFileSync(key_path);
var pem_data = fs.readFileSync(pem_path);
credentials['key'] = key_data;
credentials['cert'] = pem_data;
this.server = https.createServer(credentials, app);
} else {
this.server = http.createServer(app);
}
}
util.inherits(BenchmarkServer, EventEmitter);
BenchmarkServer.prototype.start = function() {
var self = this;
this.server.listen(this.input_port, this.input_hostname, function() {
this.last_wall_time = process.hrtime();
self.emit('started');
});
};
BenchmarkServer.prototype.getPort = function() {
return this.server.address().port;
};
BenchmarkServer.prototype.mark = function(reset) {
var wall_time_diff = process.hrtime(this.last_wall_time);
if (reset) {
this.last_wall_time = process.hrtime();
}
return {
time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
// Not sure how to measure these values
time_user: 0,
time_system: 0
};
};
BenchmarkServer.prototype.stop = function(callback) {
this.server.close(callback);
};
module.exports = BenchmarkServer;

@ -34,18 +34,18 @@
'use strict';
var console = require('console');
var worker_service_impl = require('./worker_service_impl');
var WorkerServiceImpl = require('./worker_service_impl');
var grpc = require('../../../');
var serviceProto = grpc.load({
root: __dirname + '/../../..',
file: 'src/proto/grpc/testing/services.proto'}).grpc.testing;
function runServer(port) {
function runServer(port, benchmark_impl) {
var server_creds = grpc.ServerCredentials.createInsecure();
var server = new grpc.Server();
server.addProtoService(serviceProto.WorkerService.service,
worker_service_impl);
new WorkerServiceImpl(benchmark_impl, server));
var address = '0.0.0.0:' + port;
server.bind(address, server_creds);
server.start();
@ -57,9 +57,9 @@ if (require.main === module) {
Error.stackTraceLimit = Infinity;
var parseArgs = require('minimist');
var argv = parseArgs(process.argv, {
string: ['driver_port']
string: ['driver_port', 'benchmark_impl']
});
runServer(argv.driver_port);
runServer(argv.driver_port, argv.benchmark_impl);
}
exports.runServer = runServer;

@ -38,12 +38,29 @@ var console = require('console');
var BenchmarkClient = require('./benchmark_client');
var BenchmarkServer = require('./benchmark_server');
exports.quitWorker = function quitWorker(call, callback) {
module.exports = function WorkerServiceImpl(benchmark_impl, server) {
var BenchmarkClient;
var BenchmarkServer;
switch (benchmark_impl) {
case 'grpc':
BenchmarkClient = require('./benchmark_client');
BenchmarkServer = require('./benchmark_server');
break;
case 'express':
BenchmarkClient = require('./benchmark_client_express');
BenchmarkServer = require('./benchmark_server_express');
break;
default:
throw new Error('Unrecognized benchmark impl: ' + benchmark_impl);
}
this.quitWorker = function quitWorker(call, callback) {
server.tryShutdown(function() {
callback(null, {});
process.exit(0);
}
});
};
exports.runClient = function runClient(call) {
this.runClient = function runClient(call) {
var client;
call.on('data', function(request) {
var stats;
@ -112,9 +129,9 @@ exports.runClient = function runClient(call) {
call.end();
});
});
};
};
exports.runServer = function runServer(call) {
this.runServer = function runServer(call) {
var server;
call.on('data', function(request) {
var stats;
@ -124,11 +141,13 @@ exports.runServer = function runServer(call) {
server = new BenchmarkServer('[::]', request.setup.port,
request.setup.security_params);
server.start();
server.on('started', function() {
stats = server.mark();
call.write({
stats: stats,
port: server.getPort()
});
});
break;
case 'mark':
if (server) {
@ -151,8 +170,9 @@ exports.runServer = function runServer(call) {
call.end();
});
});
};
};
exports.coreCount = function coreCount(call, callback) {
this.coreCount = function coreCount(call, callback) {
callback(null, {cores: os.cpus().length});
};
};

@ -338,7 +338,8 @@ class NodeLanguage:
self.safename = str(self)
def worker_cmdline(self):
return ['tools/run_tests/performance/run_worker_node.sh']
return ['tools/run_tests/performance/run_worker_node.sh',
'--benchmark_impl=grpc']
def worker_port_offset(self):
return 200
@ -636,11 +637,69 @@ class GoLanguage:
def __str__(self):
return 'go'
class NodeExpressLanguage:
def __init__(self):
pass
self.safename = str(self)
def worker_cmdline(self):
return ['tools/run_tests/performance/run_worker_node.sh',
'--benchmark_impl=express']
def worker_port_offset(self):
return 700
def scenarios(self):
# TODO(jtattermusch): make this scenario work
#yield _ping_pong_scenario(
# 'node_generic_async_streaming_ping_pong', rpc_type='STREAMING',
# client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
# use_generic_payload=True)
# TODO(jtattermusch): make this scenario work
#yield _ping_pong_scenario(
# 'node_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
# client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
'node_protobuf_unary_ping_pong', rpc_type='UNARY',
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
categories=[SCALABLE, SMOKETEST])
yield _ping_pong_scenario(
'node_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
unconstrained_client='async',
categories=[SCALABLE, SMOKETEST])
# TODO(jtattermusch): make this scenario work
#yield _ping_pong_scenario(
# 'node_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING',
# client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
# unconstrained_client='async')
# TODO(jtattermusch): make this scenario work
#yield _ping_pong_scenario(
# 'node_to_cpp_protobuf_async_unary_ping_pong', rpc_type='UNARY',
# client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
# server_language='c++', server_core_limit=1, async_server_threads=1)
# TODO(jtattermusch): make this scenario work
#yield _ping_pong_scenario(
# 'node_to_cpp_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
# client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
# server_language='c++', server_core_limit=1, async_server_threads=1)
def __str__(self):
return 'node_express'
LANGUAGES = {
'c++' : CXXLanguage(),
'csharp' : CSharpLanguage(),
'node' : NodeLanguage(),
'node_express': NodeExpressLanguage(),
'ruby' : RubyLanguage(),
'java' : JavaLanguage(),
'python' : PythonLanguage(),

@ -825,6 +825,53 @@ class Sanity(object):
def __str__(self):
return 'sanity'
class NodeExpressLanguage(object):
"""Dummy Node express test target to enable running express performance
benchmarks"""
def __init__(self):
self.platform = platform_string()
def configure(self, config, args):
self.config = config
self.args = args
_check_compiler(self.args.compiler, ['default', 'node0.12',
'node4', 'node5', 'node6'])
if self.args.compiler == 'default':
self.node_version = '4'
else:
# Take off the word "node"
self.node_version = self.args.compiler[4:]
def test_specs(self):
return []
def pre_build_steps(self):
if self.platform == 'windows':
return [['tools\\run_tests\\pre_build_node.bat']]
else:
return [['tools/run_tests/pre_build_node.sh', self.node_version]]
def make_targets(self):
return []
def make_options(self):
return []
def build_steps(self):
return []
def post_tests_steps(self):
return []
def makefile_name(self):
return 'Makefile'
def dockerfile_dir(self):
return 'tools/dockerfile/test/node_jessie_%s' % _docker_arch_suffix(self.args.arch)
def __str__(self):
return 'node_express'
# different configurations we can run under
with open('tools/run_tests/configs.json') as f:
@ -835,6 +882,7 @@ _LANGUAGES = {
'c++': CLanguage('cxx', 'c++'),
'c': CLanguage('c', 'c'),
'node': NodeLanguage(),
'node_express': NodeExpressLanguage(),
'php': PhpLanguage(),
'php7': Php7Language(),
'python': PythonLanguage(),

Loading…
Cancel
Save