From 40f572657970b26337fa7f89a6c506a02cc9fd05 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 17 Jun 2016 09:42:19 -0700 Subject: [PATCH] Node QPS worker: wait for clients to be ready before making calls --- src/node/performance/benchmark_client.js | 50 ++++++++++++++++++------ 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/src/node/performance/benchmark_client.js b/src/node/performance/benchmark_client.js index 262aa338629..5ef5260a96d 100644 --- a/src/node/performance/benchmark_client.js +++ b/src/node/performance/benchmark_client.js @@ -42,6 +42,8 @@ var fs = require('fs'); var path = require('path'); var util = require('util'); var EventEmitter = require('events'); + +var async = require('async'); var _ = require('lodash'); var PoissonProcess = require('poisson-process'); var Histogram = require('./histogram'); @@ -127,6 +129,36 @@ function BenchmarkClient(server_targets, channels, histogram_params, util.inherits(BenchmarkClient, EventEmitter); +/** + * Start every client in the list of clients by waiting for each to be ready, + * then starting outstanding_rpcs_per_channel calls on each of them + * @param {Array} client_list The list of clients + * @param {Number} outstanding_rpcs_per_channel The number of calls to start + * on each client + * @param {function(grpc.Client)} makeCall Function to make a single call on + * a single client + * @param {EventEmitter} emitter The event emitter to send errors on, if + * necessary + */ +function startAllClients(client_list, outstanding_rpcs_per_channel, makeCall, + emitter) { + var ready_wait_funcs = _.map(client_list, function(client) { + return _.partial(grpc.waitForClientReady, client, Infinity); + }); + async.parallel(ready_wait_funcs, function(err) { + if (err) { + emitter.emit('error', err); + return; + } + + _.each(client_list, function(client) { + _.times(outstanding_rpcs_per_channel, function() { + makeCall(client); + }); + }); + }); +} + /** * Start a closed-loop test. For each channel, start * outstanding_rpcs_per_channel RPCs. Then, whenever an RPC finishes, start @@ -212,11 +244,7 @@ BenchmarkClient.prototype.startClosedLoop = function( }; } - _.each(client_list, function(client) { - _.times(outstanding_rpcs_per_channel, function() { - makeCall(client); - }); - }); + startAllClients(client_list, outstanding_rpcs_per_channel, makeCall, self); }; /** @@ -310,14 +338,12 @@ BenchmarkClient.prototype.startPoisson = function( var averageIntervalMs = (1 / offered_load) * 1000; - _.each(client_list, function(client) { - _.times(outstanding_rpcs_per_channel, function() { - var p = PoissonProcess.create(averageIntervalMs, function() { - makeCall(client, p); - }); - p.start(); + startAllClients(client_list, outstanding_rpcs_per_channel, function(client){ + var p = PoissonProcess.create(averageIntervalMs, function() { + makeCall(client, p); }); - }); + p.start(); + }, self); }; /**