Async server works

pull/837/head
Craig Tiller 10 years ago
parent ef63839042
commit d6479d6cc4
  1. 2
      Makefile
  2. 1
      build.json
  3. 31
      test/cpp/qps/server.cc
  4. 30
      test/cpp/qps/server.h
  5. 140
      test/cpp/qps/server_async.cc
  6. 2
      test/cpp/qps/worker.cc

@ -8181,6 +8181,7 @@ QPS_WORKER_SRC = \
test/cpp/qps/client.cc \
test/cpp/qps/client_async.cc \
test/cpp/qps/server.cc \
test/cpp/qps/server_async.cc \
test/cpp/qps/worker.cc \
QPS_WORKER_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(QPS_WORKER_SRC))))
@ -8214,6 +8215,7 @@ endif
$(OBJDIR)/$(CONFIG)/test/cpp/qps/client.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/server.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/worker.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_qps_worker: $(QPS_WORKER_OBJS:.o=.dep)

@ -1849,6 +1849,7 @@
"test/cpp/qps/client.cc",
"test/cpp/qps/client_async.cc",
"test/cpp/qps/server.cc",
"test/cpp/qps/server_async.cc",
"test/cpp/qps/worker.cc"
],
"deps": [

@ -57,24 +57,12 @@
namespace grpc {
namespace testing {
static bool SetPayload(PayloadType type, int size, Payload* payload) {
PayloadType response_type = type;
// TODO(yangg): Support UNCOMPRESSABLE payload.
if (type != PayloadType::COMPRESSABLE) {
return false;
}
payload->set_type(response_type);
std::unique_ptr<char[]> body(new char[size]());
payload->set_body(body.get(), size);
return true;
}
class TestServiceImpl GRPC_FINAL : public TestService::Service {
public:
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) override {
if (request->has_response_size() && request->response_size() > 0) {
if (!SetPayload(request->response_type(), request->response_size(),
if (!Server::SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
}
@ -87,21 +75,7 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
public:
SynchronousServer(const ServerConfig& config, int port)
: thread_pool_(config.threads()),
impl_(MakeImpl(port)),
timer_(new Timer) {}
ServerStats Mark() GRPC_OVERRIDE {
std::unique_ptr<Timer> timer(new Timer);
timer.swap(timer_);
auto timer_result = timer->Mark();
ServerStats stats;
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
return stats;
}
impl_(MakeImpl(port)) {}
private:
std::unique_ptr<grpc::Server> MakeImpl(int port) {
@ -120,7 +94,6 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
TestServiceImpl service_;
ThreadPool thread_pool_;
std::unique_ptr<grpc::Server> impl_;
std::unique_ptr<Timer> timer_;
};
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(

@ -34,6 +34,7 @@
#ifndef TEST_QPS_SERVER_H
#define TEST_QPS_SERVER_H
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/qpstest.pb.h"
namespace grpc {
@ -41,9 +42,36 @@ namespace testing {
class Server {
public:
Server():timer_(new Timer) {}
virtual ~Server() {}
virtual ServerStats Mark() = 0;
ServerStats Mark() {
std::unique_ptr<Timer> timer(new Timer);
timer.swap(timer_);
auto timer_result = timer->Mark();
ServerStats stats;
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
return stats;
}
static bool SetPayload(PayloadType type, int size, Payload* payload) {
PayloadType response_type = type;
// TODO(yangg): Support UNCOMPRESSABLE payload.
if (type != PayloadType::COMPRESSABLE) {
return false;
}
payload->set_type(response_type);
std::unique_ptr<char[]> body(new char[size]());
payload->set_body(body.get(), size);
return true;
}
private:
std::unique_ptr<Timer> timer_;
};
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config,

@ -51,104 +51,37 @@
#include "src/cpp/server/thread_pool.h"
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/qps/qpstest.pb.h"
#include "test/cpp/qps/server.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
DEFINE_int32(port, 0, "Server port.");
DEFINE_int32(server_threads, 4, "Number of server threads.");
namespace grpc {
namespace testing {
using grpc::CompletionQueue;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ThreadPool;
using grpc::testing::Payload;
using grpc::testing::PayloadType;
using grpc::testing::ServerStats;
using grpc::testing::SimpleRequest;
using grpc::testing::SimpleResponse;
using grpc::testing::StatsRequest;
using grpc::testing::TestService;
using grpc::Status;
// In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both.
namespace google {}
namespace gflags {}
using namespace google;
using namespace gflags;
static bool got_sigint = false;
static void sigint_handler(int x) { got_sigint = 1; }
static double time_double(struct timeval *tv) {
return tv->tv_sec + 1e-6 * tv->tv_usec;
}
static bool SetPayload(PayloadType type, int size, Payload *payload) {
PayloadType response_type = type;
// TODO(yangg): Support UNCOMPRESSABLE payload.
if (type != PayloadType::COMPRESSABLE) {
return false;
}
payload->set_type(response_type);
std::unique_ptr<char[]> body(new char[size]());
payload->set_body(body.get(), size);
return true;
}
namespace {
class AsyncQpsServerTest {
class AsyncQpsServerTest : public Server {
public:
AsyncQpsServerTest() : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
AsyncQpsServerTest(const ServerConfig& config, int port) : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
char *server_address = NULL;
gpr_join_host_port(&server_address, "::", FLAGS_port);
gpr_join_host_port(&server_address, "::", port);
ServerBuilder builder;
builder.AddPort(server_address);
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
server_ = builder.BuildAndStart();
gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
gpr_free(server_address);
using namespace std::placeholders;
request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall,
&async_service_, _1, _2, _3, &srv_cq_, _4);
request_stats_ =
std::bind(&TestService::AsyncService::RequestCollectServerStats,
&async_service_, _1, _2, _3, &srv_cq_, _4);
for (int i = 0; i < 100; i++) {
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary_, UnaryCall));
contexts_.push_front(
new ServerRpcContextUnaryImpl<StatsRequest, ServerStats>(
request_stats_, CollectServerStats));
}
}
~AsyncQpsServerTest() {
server_->Shutdown();
void *ignored_tag;
bool ignored_ok;
srv_cq_.Shutdown();
while (srv_cq_.Next(&ignored_tag, &ignored_ok)) {
}
while (!contexts_.empty()) {
delete contexts_.front();
contexts_.pop_front();
}
for (auto& thr: threads_) {
thr.join();
}
}
void ServeRpcs(int num_threads) {
for (int i = 0; i < num_threads; i++) {
for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
@ -166,8 +99,16 @@ class AsyncQpsServerTest {
return;
}));
}
while (!got_sigint) {
std::this_thread::sleep_for(std::chrono::seconds(5));
}
~AsyncQpsServerTest() {
server_->Shutdown();
srv_cq_.Shutdown();
for (auto& thr: threads_) {
thr.join();
}
while (!contexts_.empty()) {
delete contexts_.front();
contexts_.pop_front();
}
}
@ -240,17 +181,6 @@ class AsyncQpsServerTest {
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
static Status CollectServerStats(const StatsRequest *,
ServerStats *response) {
struct rusage usage;
struct timeval tv;
gettimeofday(&tv, NULL);
getrusage(RUSAGE_SELF, &usage);
response->set_time_now(time_double(&tv));
response->set_time_user(time_double(&usage.ru_utime));
response->set_time_system(time_double(&usage.ru_stime));
return Status::OK;
}
static Status UnaryCall(const SimpleRequest *request,
SimpleResponse *response) {
if (request->has_response_size() && request->response_size() > 0) {
@ -264,40 +194,16 @@ class AsyncQpsServerTest {
CompletionQueue srv_cq_;
TestService::AsyncService async_service_;
std::vector<std::thread> threads_;
std::unique_ptr<Server> server_;
std::unique_ptr<grpc::Server> server_;
std::function<void(ServerContext *, SimpleRequest *,
grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
std::function<void(ServerContext *, StatsRequest *,
grpc::ServerAsyncResponseWriter<ServerStats> *, void *)>
request_stats_;
std::forward_list<ServerRpcContext *> contexts_;
};
} // namespace
static void RunServer() {
AsyncQpsServerTest server;
grpc_profiler_start("qps_server_async.prof");
server.ServeRpcs(FLAGS_server_threads);
grpc_profiler_stop();
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
}
int main(int argc, char **argv) {
grpc_init();
ParseCommandLineFlags(&argc, &argv, true);
GPR_ASSERT(FLAGS_port != 0);
GPR_ASSERT(!FLAGS_enable_ssl);
signal(SIGINT, sigint_handler);
RunServer();
grpc_shutdown();
google::protobuf::ShutdownProtobufLibrary();
return 0;
}
}// namespace testing
}// namespace grpc

@ -90,7 +90,7 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
case ServerType::SYNCHRONOUS_SERVER:
return CreateSynchronousServer(config, FLAGS_server_port);
case ServerType::ASYNC_SERVER:
abort(); // return CreateAsyncServer(config, FLAGS_server_port);
return CreateAsyncServer(config, FLAGS_server_port);
}
abort();
}

Loading…
Cancel
Save