From e9a6eb7332eef19805e1177e813dfd301096743d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Apr 2015 15:51:41 -0700 Subject: [PATCH 1/2] Allow RunScenarios to spawn in-process workers This allows us to get back to single binary tests where appropriate, which will help in-depth profiling efforts. I've built this atop my smoke_test changes as they inspired me to get this done. --- Makefile | 23 ++-- build.json | 10 +- test/core/util/port_posix.c | 33 ++++- test/cpp/qps/driver.cc | 28 ++++- test/cpp/qps/driver.h | 3 +- test/cpp/qps/qps_driver.cc | 4 +- test/cpp/qps/qps_worker.cc | 233 ++++++++++++++++++++++++++++++++++++ test/cpp/qps/qps_worker.h | 60 ++++++++++ test/cpp/qps/smoke_test.cc | 8 +- test/cpp/qps/smoke_test.sh | 28 ----- test/cpp/qps/worker.cc | 189 +---------------------------- 11 files changed, 385 insertions(+), 234 deletions(-) create mode 100644 test/cpp/qps/qps_worker.cc create mode 100644 test/cpp/qps/qps_worker.h delete mode 100755 test/cpp/qps/smoke_test.sh diff --git a/Makefile b/Makefile index 6665eb418cb..47ce0cff057 100644 --- a/Makefile +++ b/Makefile @@ -3728,7 +3728,12 @@ $(OBJDIR)/$(CONFIG)/examples/pubsub/subscriber.o: $(GENDIR)/examples/pubsub/ LIBQPS_SRC = \ $(GENDIR)/test/cpp/qps/qpstest.pb.cc \ + test/cpp/qps/client_async.cc \ + test/cpp/qps/client_sync.cc \ test/cpp/qps/driver.cc \ + test/cpp/qps/qps_worker.cc \ + test/cpp/qps/server_async.cc \ + test/cpp/qps/server_sync.cc \ test/cpp/qps/timer.cc \ @@ -3757,7 +3762,12 @@ ifneq ($(OPENSSL_DEP),) # installing headers to their final destination on the drive. We need this # otherwise parallel compilation will fail if a source is compiled first. test/cpp/qps/qpstest.proto: $(OPENSSL_DEP) +test/cpp/qps/client_async.cc: $(OPENSSL_DEP) +test/cpp/qps/client_sync.cc: $(OPENSSL_DEP) test/cpp/qps/driver.cc: $(OPENSSL_DEP) +test/cpp/qps/qps_worker.cc: $(OPENSSL_DEP) +test/cpp/qps/server_async.cc: $(OPENSSL_DEP) +test/cpp/qps/server_sync.cc: $(OPENSSL_DEP) test/cpp/qps/timer.cc: $(OPENSSL_DEP) endif @@ -3784,7 +3794,12 @@ endif endif +$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_sync.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(OBJDIR)/$(CONFIG)/test/cpp/qps/driver.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/qps_worker.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_sync.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(OBJDIR)/$(CONFIG)/test/cpp/qps/timer.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc @@ -8775,10 +8790,6 @@ endif QPS_WORKER_SRC = \ - test/cpp/qps/client_async.cc \ - test/cpp/qps/client_sync.cc \ - test/cpp/qps/server_async.cc \ - test/cpp/qps/server_sync.cc \ test/cpp/qps/worker.cc \ QPS_WORKER_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(QPS_WORKER_SRC)))) @@ -8809,10 +8820,6 @@ endif endif -$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a -$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_sync.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a -$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a -$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_sync.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/test/cpp/qps/worker.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a deps_qps_worker: $(QPS_WORKER_OBJS:.o=.dep) diff --git a/build.json b/build.json index f2a39c6f2c5..adf6128339c 100644 --- a/build.json +++ b/build.json @@ -554,11 +554,17 @@ "language": "c++", "headers": [ "test/cpp/qps/driver.h", + "test/cpp/qps/qps_worker.h", "test/cpp/qps/timer.h" ], "src": [ "test/cpp/qps/qpstest.proto", + "test/cpp/qps/client_async.cc", + "test/cpp/qps/client_sync.cc", "test/cpp/qps/driver.cc", + "test/cpp/qps/qps_worker.cc", + "test/cpp/qps/server_async.cc", + "test/cpp/qps/server_sync.cc", "test/cpp/qps/timer.cc" ] }, @@ -2007,10 +2013,6 @@ "test/cpp/qps/server.h" ], "src": [ - "test/cpp/qps/client_async.cc", - "test/cpp/qps/client_sync.cc", - "test/cpp/qps/server_async.cc", - "test/cpp/qps/server_sync.cc", "test/cpp/qps/worker.cc" ], "deps": [ diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index 7467c2f9ea4..726ee3bd6c7 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -44,10 +44,37 @@ #include #include +#include #include #define NUM_RANDOM_PORTS_TO_PICK 100 +static int *chosen_ports = NULL; +static size_t num_chosen_ports = 0; + +static int has_port_been_chosen(int port) { + size_t i; + for (i = 0; i < num_chosen_ports; i++) { + if (chosen_ports[i] == port) { + return 1; + } + } + return 0; +} + +static void free_chosen_ports() { + gpr_free(chosen_ports); +} + +static void chose_port(int port) { + if (chosen_ports == NULL) { + atexit(free_chosen_ports); + } + num_chosen_ports++; + chosen_ports = gpr_realloc(chosen_ports, sizeof(int) * num_chosen_ports); + chosen_ports[num_chosen_ports - 1] = port; +} + static int is_port_available(int *port, int is_tcp) { const int proto = is_tcp ? IPPROTO_TCP : 0; const int fd = socket(AF_INET, is_tcp ? SOCK_STREAM : SOCK_DGRAM, proto); @@ -127,6 +154,10 @@ int grpc_pick_unused_port(void) { port = 0; } + if (has_port_been_chosen(port)) { + continue; + } + if (!is_port_available(&port, is_tcp)) { continue; } @@ -142,7 +173,7 @@ int grpc_pick_unused_port(void) { /* TODO(ctiller): consider caching this port in some structure, to avoid handing it out again */ - + chose_port(port); return port; } diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index f44883783dd..9f7d3b56a44 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -42,21 +42,25 @@ #include #include #include +#include #include #include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/qps_worker.h" +#include "test/core/util/port.h" using std::list; using std::thread; using std::unique_ptr; +using std::deque; using std::vector; namespace grpc { namespace testing { -static vector get_hosts(const string& name) { +static deque get_hosts(const string& name) { char* env = gpr_getenv(name.c_str()); - if (!env) return vector(); + if (!env) return deque(); - vector out; + deque out; char* p = env; for (;;) { char* comma = strchr(p, ','); @@ -76,7 +80,8 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, const ServerConfig& server_config, size_t num_servers, int warmup_seconds, - int benchmark_seconds) { + int benchmark_seconds, + int spawn_local_worker_count) { // ClientContext allocator (all are destroyed at scope exit) list contexts; auto alloc_context = [&contexts]() { @@ -88,6 +93,21 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, auto workers = get_hosts("QPS_WORKERS"); ClientConfig client_config = initial_client_config; + // Spawn some local workers if desired + vector> local_workers; + for (int i = 0; i < abs(spawn_local_worker_count); i++) { + int driver_port = grpc_pick_unused_port_or_die(); + int benchmark_port = grpc_pick_unused_port_or_die(); + local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port)); + char addr[256]; + sprintf(addr, "localhost:%d", driver_port); + if (spawn_local_worker_count < 0) { + workers.push_front(addr); + } else { + workers.push_back(addr); + } + } + // TODO(ctiller): support running multiple configurations, and binpack // client/server pairs // to available workers diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 7a81b701c48..bfa0e68ff86 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -56,7 +56,8 @@ ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config, const grpc::testing::ServerConfig& server_config, size_t num_servers, int warmup_seconds, - int benchmark_seconds); + int benchmark_seconds, + int spawn_local_worker_count); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index f42d538b165..e1f1649af42 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -42,6 +42,7 @@ DEFINE_int32(num_servers, 1, "Number of server binaries"); DEFINE_int32(warmup_seconds, 5, "Warmup time (in seconds)"); DEFINE_int32(benchmark_seconds, 30, "Benchmark time (in seconds)"); +DEFINE_int32(local_workers, 0, "Number of local workers to start"); // Common config DEFINE_bool(enable_ssl, false, "Use SSL"); @@ -103,7 +104,8 @@ int main(int argc, char** argv) { auto result = RunScenario(client_config, FLAGS_num_clients, server_config, FLAGS_num_servers, - FLAGS_warmup_seconds, FLAGS_benchmark_seconds); + FLAGS_warmup_seconds, FLAGS_benchmark_seconds, + FLAGS_local_workers); gpr_log(GPR_INFO, "QPS: %.1f", result.latencies.Count() / diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc new file mode 100644 index 00000000000..46d70dce529 --- /dev/null +++ b/test/cpp/qps/qps_worker.cc @@ -0,0 +1,233 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "qps_worker.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "test/core/util/grpc_profiler.h" +#include "test/cpp/util/create_test_channel.h" +#include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/client.h" +#include "test/cpp/qps/server.h" + +namespace grpc { +namespace testing { + +std::unique_ptr CreateClient(const ClientConfig& config) { + switch (config.client_type()) { + case ClientType::SYNCHRONOUS_CLIENT: + return (config.rpc_type() == RpcType::UNARY) ? + CreateSynchronousUnaryClient(config) : + CreateSynchronousStreamingClient(config); + case ClientType::ASYNC_CLIENT: + return (config.rpc_type() == RpcType::UNARY) ? + CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config); + } + abort(); +} + +std::unique_ptr CreateServer(const ServerConfig& config, int server_port) { + switch (config.server_type()) { + case ServerType::SYNCHRONOUS_SERVER: + return CreateSynchronousServer(config, server_port); + case ServerType::ASYNC_SERVER: + return CreateAsyncServer(config, server_port); + } + abort(); +} + +class WorkerImpl GRPC_FINAL : public Worker::Service { + public: + explicit WorkerImpl(int server_port) : server_port_(server_port), acquired_(false) {} + + Status RunTest(ServerContext* ctx, + ServerReaderWriter* stream) + GRPC_OVERRIDE { + InstanceGuard g(this); + if (!g.Acquired()) { + return Status(RESOURCE_EXHAUSTED); + } + + grpc_profiler_start("qps_client.prof"); + Status ret = RunTestBody(ctx,stream); + grpc_profiler_stop(); + return ret; + } + + Status RunServer(ServerContext* ctx, + ServerReaderWriter* stream) + GRPC_OVERRIDE { + InstanceGuard g(this); + if (!g.Acquired()) { + return Status(RESOURCE_EXHAUSTED); + } + + grpc_profiler_start("qps_server.prof"); + Status ret = RunServerBody(ctx,stream); + grpc_profiler_stop(); + return ret; + } + + private: + // Protect against multiple clients using this worker at once. + class InstanceGuard { + public: + InstanceGuard(WorkerImpl* impl) + : impl_(impl), acquired_(impl->TryAcquireInstance()) {} + ~InstanceGuard() { + if (acquired_) { + impl_->ReleaseInstance(); + } + } + + bool Acquired() const { return acquired_; } + + private: + WorkerImpl* const impl_; + const bool acquired_; + }; + + bool TryAcquireInstance() { + std::lock_guard g(mu_); + if (acquired_) return false; + acquired_ = true; + return true; + } + + void ReleaseInstance() { + std::lock_guard g(mu_); + GPR_ASSERT(acquired_); + acquired_ = false; + } + + Status RunTestBody(ServerContext* ctx, + ServerReaderWriter* stream) { + ClientArgs args; + if (!stream->Read(&args)) { + return Status(INVALID_ARGUMENT); + } + if (!args.has_setup()) { + return Status(INVALID_ARGUMENT); + } + auto client = CreateClient(args.setup()); + if (!client) { + return Status(INVALID_ARGUMENT); + } + ClientStatus status; + if (!stream->Write(status)) { + return Status(UNKNOWN); + } + while (stream->Read(&args)) { + if (!args.has_mark()) { + return Status(INVALID_ARGUMENT); + } + *status.mutable_stats() = client->Mark(); + stream->Write(status); + } + + return Status::OK; + } + + Status RunServerBody(ServerContext* ctx, + ServerReaderWriter* stream) { + ServerArgs args; + if (!stream->Read(&args)) { + return Status(INVALID_ARGUMENT); + } + if (!args.has_setup()) { + return Status(INVALID_ARGUMENT); + } + auto server = CreateServer(args.setup(), server_port_); + if (!server) { + return Status(INVALID_ARGUMENT); + } + ServerStatus status; + status.set_port(server_port_); + if (!stream->Write(status)) { + return Status(UNKNOWN); + } + while (stream->Read(&args)) { + if (!args.has_mark()) { + return Status(INVALID_ARGUMENT); + } + *status.mutable_stats() = server->Mark(); + stream->Write(status); + } + + return Status::OK; + } + + const int server_port_; + + std::mutex mu_; + bool acquired_; +}; + +QpsWorker::QpsWorker(int driver_port, int server_port) { + impl_.reset(new WorkerImpl(server_port)); + + char* server_address = NULL; + gpr_join_host_port(&server_address, "::", driver_port); + + ServerBuilder builder; + builder.AddListeningPort(server_address, InsecureServerCredentials()); + builder.RegisterService(impl_.get()); + + gpr_free(server_address); + + server_ = std::move(builder.BuildAndStart()); +} + +QpsWorker::~QpsWorker() { +} + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h new file mode 100644 index 00000000000..861588907ec --- /dev/null +++ b/test/cpp/qps/qps_worker.h @@ -0,0 +1,60 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef QPS_WORKER_H +#define QPS_WORKER_H + +#include + +namespace grpc { + +class Server; + +namespace testing { + +class WorkerImpl; + +class QpsWorker { + public: + QpsWorker(int driver_port, int server_port); + ~QpsWorker(); + + private: + std::unique_ptr impl_; + std::unique_ptr server_; +}; + +} // namespace testing +} // namespace grpc + +#endif diff --git a/test/cpp/qps/smoke_test.cc b/test/cpp/qps/smoke_test.cc index 5cdabb88a0d..b0cc0c30394 100644 --- a/test/cpp/qps/smoke_test.cc +++ b/test/cpp/qps/smoke_test.cc @@ -58,7 +58,7 @@ static void RunSynchronousUnaryPingPong() { server_config.set_enable_ssl(false); server_config.set_threads(1); - auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK); + auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); gpr_log(GPR_INFO, "QPS: %.1f", result.latencies.Count() / @@ -87,7 +87,7 @@ static void RunSynchronousStreamingPingPong() { server_config.set_enable_ssl(false); server_config.set_threads(1); - auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK); + auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); gpr_log(GPR_INFO, "QPS: %.1f", result.latencies.Count() / @@ -117,7 +117,7 @@ static void RunAsyncUnaryPingPong() { server_config.set_enable_ssl(false); server_config.set_threads(1); - auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK); + auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); gpr_log(GPR_INFO, "QPS: %.1f", result.latencies.Count() / @@ -147,7 +147,7 @@ static void RunQPS() { server_config.set_enable_ssl(false); server_config.set_threads(4); - auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK); + auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); auto qps = result.latencies.Count() / diff --git a/test/cpp/qps/smoke_test.sh b/test/cpp/qps/smoke_test.sh deleted file mode 100755 index ba7f0a4f27d..00000000000 --- a/test/cpp/qps/smoke_test.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/sh - -# performs a single qps run with one client and one server - -set -ex - -cd $(dirname $0)/../../.. - -killall qps_worker || true - -config=opt - -NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()'` - -make CONFIG=$config qps_worker qps_smoke_test -j$NUMCPUS - -bins/$config/qps_worker -driver_port 10000 -server_port 10001 & -PID1=$! -bins/$config/qps_worker -driver_port 10010 -server_port 10011 & -PID2=$! - -export QPS_WORKERS="localhost:10000,localhost:10010" - -bins/$config/qps_smoke_test $* - -kill -2 $PID1 $PID2 -wait - diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index b6830cc0557..1ef5313b661 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -31,33 +31,15 @@ * */ -#include -#include -#include -#include -#include -#include -#include - #include +#include +#include + #include -#include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include "test/core/util/grpc_profiler.h" -#include "test/cpp/util/create_test_channel.h" -#include "test/cpp/qps/qpstest.pb.h" -#include "test/cpp/qps/client.h" -#include "test/cpp/qps/server.h" + +#include "qps_worker.h" DEFINE_int32(driver_port, 0, "Driver server port."); DEFINE_int32(server_port, 0, "Spawned server port."); @@ -76,167 +58,8 @@ static void sigint_handler(int x) {got_sigint = true;} namespace grpc { namespace testing { -std::unique_ptr CreateClient(const ClientConfig& config) { - switch (config.client_type()) { - case ClientType::SYNCHRONOUS_CLIENT: - return (config.rpc_type() == RpcType::UNARY) ? - CreateSynchronousUnaryClient(config) : - CreateSynchronousStreamingClient(config); - case ClientType::ASYNC_CLIENT: - return (config.rpc_type() == RpcType::UNARY) ? - CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config); - } - abort(); -} - -std::unique_ptr CreateServer(const ServerConfig& config) { - switch (config.server_type()) { - case ServerType::SYNCHRONOUS_SERVER: - return CreateSynchronousServer(config, FLAGS_server_port); - case ServerType::ASYNC_SERVER: - return CreateAsyncServer(config, FLAGS_server_port); - } - abort(); -} - -class WorkerImpl GRPC_FINAL : public Worker::Service { - public: - WorkerImpl() : acquired_(false) {} - - Status RunTest(ServerContext* ctx, - ServerReaderWriter* stream) - GRPC_OVERRIDE { - InstanceGuard g(this); - if (!g.Acquired()) { - return Status(RESOURCE_EXHAUSTED); - } - - grpc_profiler_start("qps_client.prof"); - Status ret = RunTestBody(ctx,stream); - grpc_profiler_stop(); - return ret; - } - - Status RunServer(ServerContext* ctx, - ServerReaderWriter* stream) - GRPC_OVERRIDE { - InstanceGuard g(this); - if (!g.Acquired()) { - return Status(RESOURCE_EXHAUSTED); - } - - grpc_profiler_start("qps_server.prof"); - Status ret = RunServerBody(ctx,stream); - grpc_profiler_stop(); - return ret; - } - - private: - // Protect against multiple clients using this worker at once. - class InstanceGuard { - public: - InstanceGuard(WorkerImpl* impl) - : impl_(impl), acquired_(impl->TryAcquireInstance()) {} - ~InstanceGuard() { - if (acquired_) { - impl_->ReleaseInstance(); - } - } - - bool Acquired() const { return acquired_; } - - private: - WorkerImpl* const impl_; - const bool acquired_; - }; - - bool TryAcquireInstance() { - std::lock_guard g(mu_); - if (acquired_) return false; - acquired_ = true; - return true; - } - - void ReleaseInstance() { - std::lock_guard g(mu_); - GPR_ASSERT(acquired_); - acquired_ = false; - } - - Status RunTestBody(ServerContext* ctx, - ServerReaderWriter* stream) { - ClientArgs args; - if (!stream->Read(&args)) { - return Status(INVALID_ARGUMENT); - } - if (!args.has_setup()) { - return Status(INVALID_ARGUMENT); - } - auto client = CreateClient(args.setup()); - if (!client) { - return Status(INVALID_ARGUMENT); - } - ClientStatus status; - if (!stream->Write(status)) { - return Status(UNKNOWN); - } - while (stream->Read(&args)) { - if (!args.has_mark()) { - return Status(INVALID_ARGUMENT); - } - *status.mutable_stats() = client->Mark(); - stream->Write(status); - } - - return Status::OK; - } - - Status RunServerBody(ServerContext* ctx, - ServerReaderWriter* stream) { - ServerArgs args; - if (!stream->Read(&args)) { - return Status(INVALID_ARGUMENT); - } - if (!args.has_setup()) { - return Status(INVALID_ARGUMENT); - } - auto server = CreateServer(args.setup()); - if (!server) { - return Status(INVALID_ARGUMENT); - } - ServerStatus status; - status.set_port(FLAGS_server_port); - if (!stream->Write(status)) { - return Status(UNKNOWN); - } - while (stream->Read(&args)) { - if (!args.has_mark()) { - return Status(INVALID_ARGUMENT); - } - *status.mutable_stats() = server->Mark(); - stream->Write(status); - } - - return Status::OK; - } - - std::mutex mu_; - bool acquired_; -}; - static void RunServer() { - char* server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_driver_port); - - WorkerImpl service; - - ServerBuilder builder; - builder.AddListeningPort(server_address, InsecureServerCredentials()); - builder.RegisterService(&service); - - gpr_free(server_address); - - auto server = builder.BuildAndStart(); + QpsWorker worker(FLAGS_driver_port, FLAGS_server_port); while (!got_sigint) { std::this_thread::sleep_for(std::chrono::seconds(5)); From d4df088c750341cb3f7a928a42d421135ea7e38e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Apr 2015 14:40:28 -0700 Subject: [PATCH 2/2] Remove TODO --- test/core/util/port_posix.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index 726ee3bd6c7..b07df391f9a 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -171,8 +171,6 @@ int grpc_pick_unused_port(void) { continue; } - /* TODO(ctiller): consider caching this port in some structure, to avoid - handing it out again */ chose_port(port); return port; }