Merge branch 'qps_driver' of github.com:ctiller/grpc into qps_driver

pull/837/head
Craig Tiller 10 years ago
commit 96cca4e83a
  1. 10
      Makefile
  2. 5
      build.json
  3. 25
      test/cpp/qps/client_async.cc
  4. 0
      test/cpp/qps/client_sync.cc
  5. 30
      test/cpp/qps/server.h
  6. 140
      test/cpp/qps/server_async.cc
  7. 31
      test/cpp/qps/server_sync.cc
  8. 4
      test/cpp/qps/worker.cc

@ -8178,9 +8178,10 @@ endif
QPS_WORKER_SRC = \
test/cpp/qps/client.cc \
test/cpp/qps/client_async.cc \
test/cpp/qps/server.cc \
test/cpp/qps/client_sync.cc \
test/cpp/qps/server_async.cc \
test/cpp/qps/server_sync.cc \
test/cpp/qps/worker.cc \
QPS_WORKER_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(QPS_WORKER_SRC))))
@ -8211,9 +8212,10 @@ endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/qps/client.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/server.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_sync.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_sync.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/worker.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_qps_worker: $(QPS_WORKER_OBJS:.o=.dep)

@ -1846,9 +1846,10 @@
"test/cpp/qps/server.h"
],
"src": [
"test/cpp/qps/client.cc",
"test/cpp/qps/client_async.cc",
"test/cpp/qps/server.cc",
"test/cpp/qps/client_sync.cc",
"test/cpp/qps/server_async.cc",
"test/cpp/qps/server_sync.cc",
"test/cpp/qps/worker.cc"
],
"deps": [

@ -60,6 +60,7 @@ class ClientRpcContext {
ClientRpcContext() {}
virtual ~ClientRpcContext() {}
virtual bool RunNextState() = 0; // do next state, return false if steps done
virtual void StartNewClone() = 0;
static void *tag(ClientRpcContext *c) { return reinterpret_cast<void *>(c); }
static ClientRpcContext *detag(void *t) {
return reinterpret_cast<ClientRpcContext *>(t);
@ -83,7 +84,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
req_(req),
response_(),
next_state_(&ClientRpcContextUnaryImpl::ReqSent),
callback_(on_done),
callback_(on_done), start_req_(start_req),
start_(Timer::Now()),
response_reader_(
start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {}
@ -93,6 +94,10 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
hist->Add((Timer::Now() - start_) * 1e9);
}
void StartNewClone() {
new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_);
}
private:
bool ReqSent() {
next_state_ = &ClientRpcContextUnaryImpl::RespDone;
@ -113,6 +118,10 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)();
std::function<void(grpc::Status, ResponseType *)> callback_;
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub *, grpc::ClientContext *, const RequestType &,
void *)> start_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
@ -152,6 +161,19 @@ class AsyncClient GRPC_FINAL : public Client {
StartThreads(config.async_client_threads());
}
~AsyncClient() GRPC_OVERRIDE {
EndThreads();
for (auto& cq : cli_cqs_) {
cq->Shutdown();
void *got_tag;
bool ok;
while (cq->Next(&got_tag, &ok)) {
delete ClientRpcContext::detag(got_tag);
}
}
}
void ThreadFunc(Histogram *histogram, size_t thread_idx) {
void *got_tag;
bool ok;
@ -162,6 +184,7 @@ class AsyncClient GRPC_FINAL : public Client {
// call the callback and then delete it
ctx->report_stats(histogram);
ctx->RunNextState();
ctx->StartNewClone();
delete ctx;
}
}

@ -34,6 +34,7 @@
#ifndef TEST_QPS_SERVER_H
#define TEST_QPS_SERVER_H
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/qpstest.pb.h"
namespace grpc {
@ -41,9 +42,36 @@ namespace testing {
class Server {
public:
Server():timer_(new Timer) {}
virtual ~Server() {}
virtual ServerStats Mark() = 0;
ServerStats Mark() {
std::unique_ptr<Timer> timer(new Timer);
timer.swap(timer_);
auto timer_result = timer->Mark();
ServerStats stats;
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
return stats;
}
static bool SetPayload(PayloadType type, int size, Payload* payload) {
PayloadType response_type = type;
// TODO(yangg): Support UNCOMPRESSABLE payload.
if (type != PayloadType::COMPRESSABLE) {
return false;
}
payload->set_type(response_type);
std::unique_ptr<char[]> body(new char[size]());
payload->set_body(body.get(), size);
return true;
}
private:
std::unique_ptr<Timer> timer_;
};
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config,

@ -51,104 +51,37 @@
#include "src/cpp/server/thread_pool.h"
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/qps/qpstest.pb.h"
#include "test/cpp/qps/server.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
DEFINE_int32(port, 0, "Server port.");
DEFINE_int32(server_threads, 4, "Number of server threads.");
namespace grpc {
namespace testing {
using grpc::CompletionQueue;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ThreadPool;
using grpc::testing::Payload;
using grpc::testing::PayloadType;
using grpc::testing::ServerStats;
using grpc::testing::SimpleRequest;
using grpc::testing::SimpleResponse;
using grpc::testing::StatsRequest;
using grpc::testing::TestService;
using grpc::Status;
// In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both.
namespace google {}
namespace gflags {}
using namespace google;
using namespace gflags;
static bool got_sigint = false;
static void sigint_handler(int x) { got_sigint = 1; }
static double time_double(struct timeval *tv) {
return tv->tv_sec + 1e-6 * tv->tv_usec;
}
static bool SetPayload(PayloadType type, int size, Payload *payload) {
PayloadType response_type = type;
// TODO(yangg): Support UNCOMPRESSABLE payload.
if (type != PayloadType::COMPRESSABLE) {
return false;
}
payload->set_type(response_type);
std::unique_ptr<char[]> body(new char[size]());
payload->set_body(body.get(), size);
return true;
}
namespace {
class AsyncQpsServerTest {
class AsyncQpsServerTest : public Server {
public:
AsyncQpsServerTest() : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
AsyncQpsServerTest(const ServerConfig& config, int port) : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
char *server_address = NULL;
gpr_join_host_port(&server_address, "::", FLAGS_port);
gpr_join_host_port(&server_address, "::", port);
ServerBuilder builder;
builder.AddPort(server_address);
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
server_ = builder.BuildAndStart();
gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
gpr_free(server_address);
using namespace std::placeholders;
request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall,
&async_service_, _1, _2, _3, &srv_cq_, _4);
request_stats_ =
std::bind(&TestService::AsyncService::RequestCollectServerStats,
&async_service_, _1, _2, _3, &srv_cq_, _4);
for (int i = 0; i < 100; i++) {
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary_, UnaryCall));
contexts_.push_front(
new ServerRpcContextUnaryImpl<StatsRequest, ServerStats>(
request_stats_, CollectServerStats));
}
}
~AsyncQpsServerTest() {
server_->Shutdown();
void *ignored_tag;
bool ignored_ok;
srv_cq_.Shutdown();
while (srv_cq_.Next(&ignored_tag, &ignored_ok)) {
}
while (!contexts_.empty()) {
delete contexts_.front();
contexts_.pop_front();
}
for (auto& thr: threads_) {
thr.join();
}
}
void ServeRpcs(int num_threads) {
for (int i = 0; i < num_threads; i++) {
for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
@ -166,8 +99,16 @@ class AsyncQpsServerTest {
return;
}));
}
while (!got_sigint) {
std::this_thread::sleep_for(std::chrono::seconds(5));
}
~AsyncQpsServerTest() {
server_->Shutdown();
srv_cq_.Shutdown();
for (auto& thr: threads_) {
thr.join();
}
while (!contexts_.empty()) {
delete contexts_.front();
contexts_.pop_front();
}
}
@ -240,17 +181,6 @@ class AsyncQpsServerTest {
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
static Status CollectServerStats(const StatsRequest *,
ServerStats *response) {
struct rusage usage;
struct timeval tv;
gettimeofday(&tv, NULL);
getrusage(RUSAGE_SELF, &usage);
response->set_time_now(time_double(&tv));
response->set_time_user(time_double(&usage.ru_utime));
response->set_time_system(time_double(&usage.ru_stime));
return Status::OK;
}
static Status UnaryCall(const SimpleRequest *request,
SimpleResponse *response) {
if (request->has_response_size() && request->response_size() > 0) {
@ -264,40 +194,16 @@ class AsyncQpsServerTest {
CompletionQueue srv_cq_;
TestService::AsyncService async_service_;
std::vector<std::thread> threads_;
std::unique_ptr<Server> server_;
std::unique_ptr<grpc::Server> server_;
std::function<void(ServerContext *, SimpleRequest *,
grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
std::function<void(ServerContext *, StatsRequest *,
grpc::ServerAsyncResponseWriter<ServerStats> *, void *)>
request_stats_;
std::forward_list<ServerRpcContext *> contexts_;
};
} // namespace
static void RunServer() {
AsyncQpsServerTest server;
grpc_profiler_start("qps_server_async.prof");
server.ServeRpcs(FLAGS_server_threads);
grpc_profiler_stop();
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
}
int main(int argc, char **argv) {
grpc_init();
ParseCommandLineFlags(&argc, &argv, true);
GPR_ASSERT(FLAGS_port != 0);
GPR_ASSERT(!FLAGS_enable_ssl);
signal(SIGINT, sigint_handler);
RunServer();
grpc_shutdown();
google::protobuf::ShutdownProtobufLibrary();
return 0;
}
}// namespace testing
}// namespace grpc

@ -57,24 +57,12 @@
namespace grpc {
namespace testing {
static bool SetPayload(PayloadType type, int size, Payload* payload) {
PayloadType response_type = type;
// TODO(yangg): Support UNCOMPRESSABLE payload.
if (type != PayloadType::COMPRESSABLE) {
return false;
}
payload->set_type(response_type);
std::unique_ptr<char[]> body(new char[size]());
payload->set_body(body.get(), size);
return true;
}
class TestServiceImpl GRPC_FINAL : public TestService::Service {
public:
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) override {
if (request->has_response_size() && request->response_size() > 0) {
if (!SetPayload(request->response_type(), request->response_size(),
if (!Server::SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
}
@ -87,21 +75,7 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
public:
SynchronousServer(const ServerConfig& config, int port)
: thread_pool_(config.threads()),
impl_(MakeImpl(port)),
timer_(new Timer) {}
ServerStats Mark() GRPC_OVERRIDE {
std::unique_ptr<Timer> timer(new Timer);
timer.swap(timer_);
auto timer_result = timer->Mark();
ServerStats stats;
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
return stats;
}
impl_(MakeImpl(port)) {}
private:
std::unique_ptr<grpc::Server> MakeImpl(int port) {
@ -120,7 +94,6 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
TestServiceImpl service_;
ThreadPool thread_pool_;
std::unique_ptr<grpc::Server> impl_;
std::unique_ptr<Timer> timer_;
};
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(

@ -70,8 +70,6 @@ using namespace gflags;
static bool got_sigint = false;
static void sigint_handler(int x) { got_sigint = 1; }
namespace grpc {
namespace testing {
@ -90,7 +88,7 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
case ServerType::SYNCHRONOUS_SERVER:
return CreateSynchronousServer(config, FLAGS_server_port);
case ServerType::ASYNC_SERVER:
abort(); // return CreateAsyncServer(config, FLAGS_server_port);
return CreateAsyncServer(config, FLAGS_server_port);
}
abort();
}

Loading…
Cancel
Save