QPS worker no longer needs to specify server port on command line. This is part

of the proto if desired, or just goes to pick_unused_port_or_die if not
specified
reviewable/pr3905/r5
vjpai 9 years ago
parent 119c103ab0
commit 72a6332138
  1. 3
      test/cpp/qps/driver.cc
  2. 20
      test/cpp/qps/qps_worker.cc
  3. 2
      test/cpp/qps/qps_worker.h
  4. 16
      test/cpp/qps/server.h
  5. 10
      test/cpp/qps/server_async.cc
  6. 18
      test/cpp/qps/server_sync.cc
  7. 4
      test/cpp/qps/single_run_localhost.sh
  8. 5
      test/cpp/qps/worker.cc
  9. 1
      test/proto/perf_tests/perf_control.proto

@ -132,8 +132,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
int driver_port = grpc_pick_unused_port_or_die();
int benchmark_port = grpc_pick_unused_port_or_die();
local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
local_workers.emplace_back(new QpsWorker(driver_port));
char addr[256];
sprintf(addr, "localhost:%d", driver_port);
if (spawn_local_worker_count < 0) {

@ -76,13 +76,12 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
abort();
}
std::unique_ptr<Server> CreateServer(const ServerConfig& config,
int server_port) {
std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
switch (config.server_type()) {
case ServerType::SYNCHRONOUS_SERVER:
return CreateSynchronousServer(config, server_port);
return CreateSynchronousServer(config);
case ServerType::ASYNC_SERVER:
return CreateAsyncServer(config, server_port);
return CreateAsyncServer(config);
default:
abort();
}
@ -91,8 +90,7 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config,
class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
public:
explicit WorkerServiceImpl(int server_port)
: server_port_(server_port), acquired_(false) {}
explicit WorkerServiceImpl() : acquired_(false) {}
Status RunClient(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream)
@ -191,12 +189,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
auto server = CreateServer(args.setup(), server_port_);
auto server = CreateServer(args.setup());
if (!server) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
ServerStatus status;
status.set_port(server_port_);
status.set_port(server->Port());
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
}
@ -211,14 +209,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
return Status::OK;
}
const int server_port_;
std::mutex mu_;
bool acquired_;
};
QpsWorker::QpsWorker(int driver_port, int server_port) {
impl_.reset(new WorkerServiceImpl(server_port));
QpsWorker::QpsWorker(int driver_port) {
impl_.reset(new WorkerServiceImpl());
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", driver_port);

@ -46,7 +46,7 @@ class WorkerServiceImpl;
class QpsWorker {
public:
QpsWorker(int driver_port, int server_port);
explicit QpsWorker(int driver_port);
~QpsWorker();
private:

@ -34,6 +34,7 @@
#ifndef TEST_QPS_SERVER_H
#define TEST_QPS_SERVER_H
#include "test/core/util/port.h"
#include "test/cpp/qps/timer.h"
#include "test/proto/messages.grpc.pb.h"
#include "test/proto/perf_tests/perf_control.grpc.pb.h"
@ -43,7 +44,13 @@ namespace testing {
class Server {
public:
Server() : timer_(new Timer) {}
explicit Server(const ServerConfig& config) : timer_(new Timer) {
if (config.port()) {
port_ = config.port();
} else {
port_ = grpc_pick_unused_port_or_die();
}
}
virtual ~Server() {}
ServerStats Mark(bool reset) {
@ -75,13 +82,14 @@ class Server {
return true;
}
int Port() const {return port_;}
private:
int port_;
std::unique_ptr<Timer> timer_;
};
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config,
int port);
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port);
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config);
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config);
} // namespace testing
} // namespace grpc

@ -57,9 +57,10 @@ namespace testing {
class AsyncQpsServerTest : public Server {
public:
AsyncQpsServerTest(const ServerConfig &config, int port) {
explicit AsyncQpsServerTest(const ServerConfig &config): Server(config) {
char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
gpr_join_host_port(&server_address, "::", Port());
ServerBuilder builder;
builder.AddListeningPort(server_address, InsecureServerCredentials());
@ -333,9 +334,8 @@ class AsyncQpsServerTest : public Server {
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
int port) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config));
}
} // namespace testing

@ -84,30 +84,28 @@ class BenchmarkServiceImpl GRPC_FINAL : public BenchmarkService::Service {
class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
public:
SynchronousServer(const ServerConfig& config, int port)
: impl_(MakeImpl(port)) {}
private:
std::unique_ptr<grpc::Server> MakeImpl(int port) {
explicit SynchronousServer(const ServerConfig& config)
: Server(config) {
ServerBuilder builder;
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
gpr_join_host_port(&server_address, "::", Port());
builder.AddListeningPort(server_address, InsecureServerCredentials());
gpr_free(server_address);
builder.RegisterService(&service_);
return builder.BuildAndStart();
impl_ = builder.BuildAndStart();
}
private:
BenchmarkServiceImpl service_;
std::unique_ptr<grpc::Server> impl_;
};
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
const ServerConfig& config, int port) {
return std::unique_ptr<Server>(new SynchronousServer(config, port));
const ServerConfig& config) {
return std::unique_ptr<Server>(new SynchronousServer(config));
}
} // namespace testing

@ -42,9 +42,9 @@ NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()
make CONFIG=$config qps_worker qps_driver -j$NUMCPUS
bins/$config/qps_worker -driver_port 10000 -server_port 10001 &
bins/$config/qps_worker -driver_port 10000 &
PID1=$!
bins/$config/qps_worker -driver_port 10010 -server_port 10011 &
bins/$config/qps_worker -driver_port 10010 &
PID2=$!
export QPS_WORKERS="localhost:10000,localhost:10010"

@ -43,8 +43,7 @@
#include "test/cpp/qps/qps_worker.h"
#include "test/cpp/util/test_config.h"
DEFINE_int32(driver_port, 0, "Driver server port.");
DEFINE_int32(server_port, 0, "Spawned server port.");
DEFINE_int32(driver_port, 0, "Port for communication with driver");
static bool got_sigint = false;
@ -54,7 +53,7 @@ namespace grpc {
namespace testing {
static void RunServer() {
QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
QpsWorker worker(FLAGS_driver_port);
while (!got_sigint) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),

@ -116,6 +116,7 @@ message ServerConfig {
int32 threads = 2;
bool use_tls = 3;
string host = 4;
int32 port = 5;
}
message ServerArgs {

Loading…
Cancel
Save