From d6479d6cc42ca180bcfc219751cf2a6f73f87dd4 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 4 Mar 2015 12:50:11 -0800 Subject: [PATCH] Async server works --- Makefile | 2 + build.json | 1 + test/cpp/qps/server.cc | 31 +------- test/cpp/qps/server.h | 30 +++++++- test/cpp/qps/server_async.cc | 140 ++++++----------------------------- test/cpp/qps/worker.cc | 2 +- 6 files changed, 58 insertions(+), 148 deletions(-) diff --git a/Makefile b/Makefile index 36eedb95add..aef6ff3a639 100644 --- a/Makefile +++ b/Makefile @@ -8181,6 +8181,7 @@ QPS_WORKER_SRC = \ test/cpp/qps/client.cc \ test/cpp/qps/client_async.cc \ test/cpp/qps/server.cc \ + test/cpp/qps/server_async.cc \ test/cpp/qps/worker.cc \ QPS_WORKER_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(QPS_WORKER_SRC)))) @@ -8214,6 +8215,7 @@ endif $(OBJDIR)/$(CONFIG)/test/cpp/qps/client.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/test/cpp/qps/server.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a +$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/test/cpp/qps/worker.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a deps_qps_worker: $(QPS_WORKER_OBJS:.o=.dep) diff --git a/build.json b/build.json index 08b1e1842c3..a8dc2e6bec5 100644 --- a/build.json +++ b/build.json @@ -1849,6 +1849,7 @@ "test/cpp/qps/client.cc", "test/cpp/qps/client_async.cc", "test/cpp/qps/server.cc", + "test/cpp/qps/server_async.cc", "test/cpp/qps/worker.cc" ], "deps": [ diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc index 8424dbafe97..e598fb51ae0 100644 --- a/test/cpp/qps/server.cc +++ b/test/cpp/qps/server.cc @@ -57,24 +57,12 @@ namespace grpc { namespace testing { -static bool SetPayload(PayloadType type, int size, Payload* payload) { - PayloadType response_type = type; - // TODO(yangg): Support UNCOMPRESSABLE payload. - if (type != PayloadType::COMPRESSABLE) { - return false; - } - payload->set_type(response_type); - std::unique_ptr body(new char[size]()); - payload->set_body(body.get(), size); - return true; -} - class TestServiceImpl GRPC_FINAL : public TestService::Service { public: Status UnaryCall(ServerContext* context, const SimpleRequest* request, SimpleResponse* response) override { if (request->has_response_size() && request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), + if (!Server::SetPayload(request->response_type(), request->response_size(), response->mutable_payload())) { return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); } @@ -87,21 +75,7 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server { public: SynchronousServer(const ServerConfig& config, int port) : thread_pool_(config.threads()), - impl_(MakeImpl(port)), - timer_(new Timer) {} - - ServerStats Mark() GRPC_OVERRIDE { - std::unique_ptr timer(new Timer); - timer.swap(timer_); - - auto timer_result = timer->Mark(); - - ServerStats stats; - stats.set_time_elapsed(timer_result.wall); - stats.set_time_system(timer_result.system); - stats.set_time_user(timer_result.user); - return stats; - } + impl_(MakeImpl(port)) {} private: std::unique_ptr MakeImpl(int port) { @@ -120,7 +94,6 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server { TestServiceImpl service_; ThreadPool thread_pool_; std::unique_ptr impl_; - std::unique_ptr timer_; }; std::unique_ptr CreateSynchronousServer( diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 3542c17a6a2..ca22d7ca1cd 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -34,6 +34,7 @@ #ifndef TEST_QPS_SERVER_H #define TEST_QPS_SERVER_H +#include "test/cpp/qps/timer.h" #include "test/cpp/qps/qpstest.pb.h" namespace grpc { @@ -41,9 +42,36 @@ namespace testing { class Server { public: + Server():timer_(new Timer) {} virtual ~Server() {} - virtual ServerStats Mark() = 0; + ServerStats Mark() { + std::unique_ptr timer(new Timer); + timer.swap(timer_); + + auto timer_result = timer->Mark(); + + ServerStats stats; + stats.set_time_elapsed(timer_result.wall); + stats.set_time_system(timer_result.system); + stats.set_time_user(timer_result.user); + return stats; + } + + static bool SetPayload(PayloadType type, int size, Payload* payload) { + PayloadType response_type = type; + // TODO(yangg): Support UNCOMPRESSABLE payload. + if (type != PayloadType::COMPRESSABLE) { + return false; + } + payload->set_type(response_type); + std::unique_ptr body(new char[size]()); + payload->set_body(body.get(), size); + return true; + } + + private: + std::unique_ptr timer_; }; std::unique_ptr CreateSynchronousServer(const ServerConfig& config, diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c006262fc34..741a85802ad 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -51,104 +51,37 @@ #include "src/cpp/server/thread_pool.h" #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/server.h" #include #include -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); -DEFINE_int32(port, 0, "Server port."); -DEFINE_int32(server_threads, 4, "Number of server threads."); +namespace grpc { + namespace testing { -using grpc::CompletionQueue; -using grpc::Server; -using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::ThreadPool; -using grpc::testing::Payload; -using grpc::testing::PayloadType; -using grpc::testing::ServerStats; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; -using grpc::Status; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google {} -namespace gflags {} -using namespace google; -using namespace gflags; - -static bool got_sigint = false; - -static void sigint_handler(int x) { got_sigint = 1; } - -static double time_double(struct timeval *tv) { - return tv->tv_sec + 1e-6 * tv->tv_usec; -} - -static bool SetPayload(PayloadType type, int size, Payload *payload) { - PayloadType response_type = type; - // TODO(yangg): Support UNCOMPRESSABLE payload. - if (type != PayloadType::COMPRESSABLE) { - return false; - } - payload->set_type(response_type); - std::unique_ptr body(new char[size]()); - payload->set_body(body.get(), size); - return true; -} - -namespace { - -class AsyncQpsServerTest { +class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest() : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { + AsyncQpsServerTest(const ServerConfig& config, int port) : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { char *server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_port); + gpr_join_host_port(&server_address, "::", port); ServerBuilder builder; builder.AddPort(server_address); + gpr_free(server_address); builder.RegisterAsyncService(&async_service_); server_ = builder.BuildAndStart(); - gpr_log(GPR_INFO, "Server listening on %s\n", server_address); - gpr_free(server_address); using namespace std::placeholders; request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_, _1, _2, _3, &srv_cq_, _4); - request_stats_ = - std::bind(&TestService::AsyncService::RequestCollectServerStats, - &async_service_, _1, _2, _3, &srv_cq_, _4); for (int i = 0; i < 100; i++) { contexts_.push_front( new ServerRpcContextUnaryImpl( request_unary_, UnaryCall)); - contexts_.push_front( - new ServerRpcContextUnaryImpl( - request_stats_, CollectServerStats)); - } - } - ~AsyncQpsServerTest() { - server_->Shutdown(); - void *ignored_tag; - bool ignored_ok; - srv_cq_.Shutdown(); - while (srv_cq_.Next(&ignored_tag, &ignored_ok)) { } - while (!contexts_.empty()) { - delete contexts_.front(); - contexts_.pop_front(); - } - for (auto& thr: threads_) { - thr.join(); - } - } - void ServeRpcs(int num_threads) { - for (int i = 0; i < num_threads; i++) { + for (int i = 0; i < config.threads(); i++) { threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down bool ok; @@ -166,8 +99,16 @@ class AsyncQpsServerTest { return; })); } - while (!got_sigint) { - std::this_thread::sleep_for(std::chrono::seconds(5)); + } + ~AsyncQpsServerTest() { + server_->Shutdown(); + srv_cq_.Shutdown(); + for (auto& thr: threads_) { + thr.join(); + } + while (!contexts_.empty()) { + delete contexts_.front(); + contexts_.pop_front(); } } @@ -240,17 +181,6 @@ class AsyncQpsServerTest { grpc::ServerAsyncResponseWriter response_writer_; }; - static Status CollectServerStats(const StatsRequest *, - ServerStats *response) { - struct rusage usage; - struct timeval tv; - gettimeofday(&tv, NULL); - getrusage(RUSAGE_SELF, &usage); - response->set_time_now(time_double(&tv)); - response->set_time_user(time_double(&usage.ru_utime)); - response->set_time_system(time_double(&usage.ru_stime)); - return Status::OK; - } static Status UnaryCall(const SimpleRequest *request, SimpleResponse *response) { if (request->has_response_size() && request->response_size() > 0) { @@ -264,40 +194,16 @@ class AsyncQpsServerTest { CompletionQueue srv_cq_; TestService::AsyncService async_service_; std::vector threads_; - std::unique_ptr server_; + std::unique_ptr server_; std::function *, void *)> request_unary_; - std::function *, void *)> - request_stats_; std::forward_list contexts_; }; -} // namespace - -static void RunServer() { - AsyncQpsServerTest server; - - grpc_profiler_start("qps_server_async.prof"); - - server.ServeRpcs(FLAGS_server_threads); - - grpc_profiler_stop(); +std::unique_ptr CreateAsyncServer(const ServerConfig& config, int port) { + return std::unique_ptr(new AsyncQpsServerTest(config, port)); } -int main(int argc, char **argv) { - grpc_init(); - ParseCommandLineFlags(&argc, &argv, true); - GPR_ASSERT(FLAGS_port != 0); - GPR_ASSERT(!FLAGS_enable_ssl); - - signal(SIGINT, sigint_handler); - - RunServer(); - - grpc_shutdown(); - google::protobuf::ShutdownProtobufLibrary(); - - return 0; -} + }// namespace testing +}// namespace grpc diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index 4a2e798a477..a8d5752120d 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -90,7 +90,7 @@ std::unique_ptr CreateServer(const ServerConfig& config) { case ServerType::SYNCHRONOUS_SERVER: return CreateSynchronousServer(config, FLAGS_server_port); case ServerType::ASYNC_SERVER: - abort(); // return CreateAsyncServer(config, FLAGS_server_port); + return CreateAsyncServer(config, FLAGS_server_port); } abort(); }