Async client progress

pull/837/head
Craig Tiller 10 years ago
parent f282514389
commit 8856875900
  1. 2
      Makefile
  2. 1
      build.json
  3. 120
      test/cpp/qps/client.cc
  4. 120
      test/cpp/qps/client.h
  5. 149
      test/cpp/qps/client_async.cc
  6. 4
      test/cpp/qps/qps_driver.cc
  7. 3
      test/cpp/qps/qpstest.proto
  8. 2
      test/cpp/qps/worker.cc

@ -8179,6 +8179,7 @@ endif
QPS_WORKER_SRC = \
test/cpp/qps/client.cc \
test/cpp/qps/client_async.cc \
test/cpp/qps/server.cc \
test/cpp/qps/worker.cc \
@ -8211,6 +8212,7 @@ endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/qps/client.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/server.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/worker.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a

@ -1847,6 +1847,7 @@
],
"src": [
"test/cpp/qps/client.cc",
"test/cpp/qps/client_async.cc",
"test/cpp/qps/server.cc",
"test/cpp/qps/worker.cc"
],

@ -63,118 +63,26 @@ namespace testing {
class SynchronousClient GRPC_FINAL : public Client {
public:
SynchronousClient(const ClientConfig& config) : timer_(new Timer) {
for (int i = 0; i < config.client_channels(); i++) {
channels_.push_back(ClientChannelInfo(
config.server_targets(i % config.server_targets_size()), config));
auto* stub = channels_.back().get_stub();
for (int j = 0; j < config.outstanding_rpcs_per_channel(); j++) {
threads_.emplace_back(new Thread(stub, config));
}
}
SynchronousClient(const ClientConfig& config) : Client(config) {
size_t num_threads = config.outstanding_rpcs_per_channel() * config.client_channels();
responses_.resize(num_threads);
StartThreads(num_threads);
}
ClientStats Mark() {
Histogram latencies;
std::vector<Histogram> to_merge(threads_.size());
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]);
}
std::unique_ptr<Timer> timer(new Timer);
timer_.swap(timer);
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->EndSwap();
latencies.Merge(&to_merge[i]);
}
auto timer_result = timer->Mark();
~SynchronousClient() {
EndThreads();
}
ClientStats stats;
latencies.FillProto(stats.mutable_latencies());
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
return stats;
void ThreadFunc(Histogram* histogram, size_t thread_idx) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
double start = Timer::Now();
grpc::ClientContext context;
grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]);
histogram->Add((Timer::Now() - start) * 1e9);
}
private:
class Thread {
public:
Thread(TestService::Stub* stub, const ClientConfig& config)
: stub_(stub),
config_(config),
done_(false),
new_(nullptr),
impl_([this]() {
SimpleRequest request;
SimpleResponse response;
request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request.set_response_size(config_.payload_size());
for (;;) {
{
std::lock_guard<std::mutex> g(mu_);
if (done_) return;
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
cv_.notify_one();
}
}
double start = Timer::Now();
grpc::ClientContext context;
grpc::Status s = stub_->UnaryCall(&context, request, &response);
histogram_.Add((Timer::Now() - start) * 1e9);
}
}) {}
~Thread() {
{
std::lock_guard<std::mutex> g(mu_);
done_ = true;
}
impl_.join();
}
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
new_ = n;
}
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
cv_.wait(g, [this]() { return new_ == nullptr; });
}
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
TestService::Stub* stub_;
ClientConfig config_;
std::mutex mu_;
std::condition_variable cv_;
bool done_;
Histogram* new_;
Histogram histogram_;
std::thread impl_;
};
class ClientChannelInfo {
public:
explicit ClientChannelInfo(const grpc::string& target,
const ClientConfig& config)
: channel_(CreateTestChannel(target, config.enable_ssl())),
stub_(TestService::NewStub(channel_)) {}
ChannelInterface* get_channel() { return channel_.get(); }
TestService::Stub* get_stub() { return stub_.get(); }
private:
std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<TestService::Stub> stub_;
};
std::vector<ClientChannelInfo> channels_;
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<Timer> timer_;
std::vector<SimpleResponse> responses_;
};
std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {

@ -34,6 +34,8 @@
#ifndef TEST_QPS_CLIENT_H
#define TEST_QPS_CLIENT_H
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/qpstest.pb.h"
namespace grpc {
@ -41,9 +43,125 @@ namespace testing {
class Client {
public:
explicit Client(const ClientConfig& config) : timer_(new Timer) {
for (int i = 0; i < config.client_channels(); i++) {
channels_.push_back(ClientChannelInfo(
config.server_targets(i % config.server_targets_size()), config));
}
request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request_.set_response_size(config.payload_size());
}
virtual ~Client() {}
virtual ClientStats Mark() = 0;
ClientStats Mark() {
Histogram latencies;
std::vector<Histogram> to_merge(threads_.size());
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]);
}
std::unique_ptr<Timer> timer(new Timer);
timer_.swap(timer);
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->EndSwap();
latencies.Merge(&to_merge[i]);
}
auto timer_result = timer->Mark();
ClientStats stats;
latencies.FillProto(stats.mutable_latencies());
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
return stats;
}
protected:
SimpleRequest request_;
class ClientChannelInfo {
public:
explicit ClientChannelInfo(const grpc::string& target,
const ClientConfig& config)
: channel_(CreateTestChannel(target, config.enable_ssl())),
stub_(TestService::NewStub(channel_)) {}
ChannelInterface* get_channel() { return channel_.get(); }
TestService::Stub* get_stub() { return stub_.get(); }
private:
std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<TestService::Stub> stub_;
};
std::vector<ClientChannelInfo> channels_;
void StartThreads(size_t num_threads) {
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
}
void EndThreads() {
threads_.clear();
}
virtual void ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
private:
class Thread {
public:
Thread(Client* client, size_t idx)
: done_(false),
new_(nullptr),
impl_([this, idx, client]() {
for (;;) {
// run the loop body
client->ThreadFunc(&histogram_, idx);
// lock, see if we're done
std::lock_guard<std::mutex> g(mu_);
if (done_) return;
// also check if we're marking, and swap out the histogram if so
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
cv_.notify_one();
}
}
}) {}
~Thread() {
{
std::lock_guard<std::mutex> g(mu_);
done_ = true;
}
impl_.join();
}
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
new_ = n;
}
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
cv_.wait(g, [this]() { return new_ == nullptr; });
}
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
TestService::Stub* stub_;
ClientConfig config_;
std::mutex mu_;
std::condition_variable cv_;
bool done_;
Histogram* new_;
Histogram histogram_;
std::thread impl_;
};
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<Timer> timer_;
};
std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args);

@ -49,50 +49,11 @@
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/qps/qpstest.pb.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/client.h"
DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
DEFINE_int32(server_port, 0, "Server port.");
DEFINE_string(server_host, "127.0.0.1", "Server host.");
DEFINE_int32(client_threads, 4, "Number of client threads.");
// We have a configurable number of channels for sending RPCs.
// RPCs are sent round-robin on the available channels by the
// various threads. Interesting cases are 1 global channel or
// 1 per-thread channel, but we can support any number.
// The channels are assigned round-robin on an RPC by RPC basis
// rather than just at initialization time in order to also measure the
// impact of cache thrashing caused by channel changes. This is an issue
// if you are not in one of the above "interesting cases"
DEFINE_int32(client_channels, 4, "Number of client channels.");
DEFINE_int32(num_rpcs, 1000, "Number of RPCs per thread.");
DEFINE_int32(payload_size, 1, "Payload size in bytes");
// Alternatively, specify parameters for test as a workload so that multiple
// tests are initiated back-to-back. This is convenient for keeping a borg
// allocation consistent. This is a space-separated list of
// [threads channels num_rpcs payload_size ]*
DEFINE_string(workload, "", "Workload parameters");
using grpc::ChannelInterface;
using grpc::CreateTestChannel;
using grpc::testing::ServerStats;
using grpc::testing::SimpleRequest;
using grpc::testing::SimpleResponse;
using grpc::testing::StatsRequest;
using grpc::testing::TestService;
// In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both.
namespace google {}
namespace gflags {}
using namespace google;
using namespace gflags;
static double now() {
gpr_timespec tv = gpr_now();
return 1e9 * tv.tv_sec + tv.tv_nsec;
}
namespace grpc {
namespace testing {
class ClientRpcContext {
public:
@ -103,8 +64,9 @@ class ClientRpcContext {
static ClientRpcContext *detag(void *t) {
return reinterpret_cast<ClientRpcContext *>(t);
}
virtual void report_stats(gpr_histogram *hist) = 0;
virtual void report_stats(Histogram *hist) = 0;
};
template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext {
public:
@ -113,22 +75,22 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
const RequestType &req,
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub *, grpc::ClientContext *, const RequestType &,
void *)> start_req,
TestService::Stub *, grpc::ClientContext *, const RequestType &,
void *)> start_req,
std::function<void(grpc::Status, ResponseType *)> on_done)
: context_(),
stub_(stub),
stub_(stub),
req_(req),
response_(),
next_state_(&ClientRpcContextUnaryImpl::ReqSent),
callback_(on_done),
start_(now()),
start_(Timer::Now()),
response_reader_(
start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {}
start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {}
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
bool RunNextState() GRPC_OVERRIDE { return (this->*next_state_)(); }
void report_stats(gpr_histogram *hist) GRPC_OVERRIDE {
gpr_histogram_add(hist, now() - start_);
void report_stats(Histogram *hist) GRPC_OVERRIDE {
hist->Add((Timer::Now() - start_) * 1e9);
}
private:
@ -157,6 +119,64 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
response_reader_;
};
class AsyncClient GRPC_FINAL : public Client {
public:
explicit AsyncClient(const ClientConfig& config) : Client(config) {
for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue);
}
auto payload_size = config.payload_size();
auto check_done = [payload_size](grpc::Status s, SimpleResponse *response) {
GPR_ASSERT(s.IsOk() && (response->payload().type() ==
grpc::testing::PayloadType::COMPRESSABLE) &&
(response->payload().body().length() ==
static_cast<size_t>(payload_size)));
};
int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (auto& channel : channels_) {
auto *cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size();
auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, const SimpleRequest& request, void *tag) {
return stub->AsyncUnaryCall(ctx, request, cq, tag);
};
TestService::Stub* stub = channel.get_stub();
const SimpleRequest& request = request_;
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(stub, request, start_req, check_done);
}
}
StartThreads(config.async_client_threads());
}
void ThreadFunc(Histogram *histogram, size_t thread_idx) {
void *got_tag;
bool ok;
cli_cqs_[thread_idx]->Next(&got_tag, &ok);
ClientRpcContext *ctx = ClientRpcContext::detag(got_tag);
if (ctx->RunNextState() == false) {
// call the callback and then delete it
ctx->report_stats(histogram);
ctx->RunNextState();
delete ctx;
}
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
};
std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args) {
return std::unique_ptr<Client>(new AsyncClient(args));
}
} // namespace testing
} // namespace grpc
#if 0
static void RunTest(const int client_threads, const int client_channels,
const int num_rpcs, const int payload_size) {
gpr_log(GPR_INFO,
@ -173,23 +193,7 @@ static void RunTest(const int client_threads, const int client_channels,
std::ostringstream oss;
oss << FLAGS_server_host << ":" << FLAGS_server_port;
class ClientChannelInfo {
public:
explicit ClientChannelInfo(const grpc::string &server)
: channel_(CreateTestChannel(server, FLAGS_enable_ssl)),
stub_(TestService::NewStub(channel_)) {}
ChannelInterface *get_channel() { return channel_.get(); }
TestService::Stub *get_stub() { return stub_.get(); }
private:
std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<TestService::Stub> stub_;
};
std::vector<ClientChannelInfo> channels;
for (int i = 0; i < client_channels; i++) {
channels.push_back(ClientChannelInfo(oss.str()));
}
std::vector<std::thread> threads; // Will add threads when ready to execute
std::vector< ::gpr_histogram *> thread_stats(client_threads);
@ -204,12 +208,6 @@ static void RunTest(const int client_threads, const int client_channels,
grpc_profiler_start("qps_client_async.prof");
auto CheckDone = [=](grpc::Status s, SimpleResponse *response) {
GPR_ASSERT(s.IsOk() && (response->payload().type() ==
grpc::testing::PayloadType::COMPRESSABLE) &&
(response->payload().body().length() ==
static_cast<size_t>(payload_size)));
};
for (int i = 0; i < client_threads; i++) {
gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);
@ -225,8 +223,6 @@ static void RunTest(const int client_threads, const int client_channels,
request.set_response_size(payload_size);
grpc::CompletionQueue cli_cq;
auto start_req = std::bind(&TestService::Stub::AsyncUnaryCall, _1,
_2, _3, &cli_cq, _4);
int rpcs_sent = 0;
while (rpcs_sent < num_rpcs) {
@ -339,3 +335,4 @@ int main(int argc, char **argv) {
grpc_shutdown();
return 0;
}
#endif

@ -53,6 +53,7 @@ DEFINE_int32(outstanding_rpcs_per_channel, 1,
DEFINE_int32(client_channels, 1, "Number of client channels");
DEFINE_int32(payload_size, 1, "Payload size");
DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type");
DEFINE_int32(async_client_threads, 1, "Async client threads");
using grpc::testing::ClientConfig;
using grpc::testing::ServerConfig;
@ -84,6 +85,7 @@ int main(int argc, char **argv) {
FLAGS_outstanding_rpcs_per_channel);
client_config.set_client_channels(FLAGS_client_channels);
client_config.set_payload_size(FLAGS_payload_size);
client_config.set_async_client_threads(FLAGS_async_client_threads);
ServerConfig server_config;
server_config.set_server_type(server_type);
@ -93,6 +95,8 @@ int main(int argc, char **argv) {
auto result = RunScenario(client_config, FLAGS_num_clients, server_config,
FLAGS_num_servers);
gpr_log(GPR_INFO, "QPS: %.1f", result.latencies.Count() / average(result.client_resources, [](ResourceUsage u) { return u.wall_time; }));
gpr_log(GPR_INFO, "Latencies (50/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f us",
result.latencies.Percentile(50) / 1000, result.latencies.Percentile(95) / 1000,
result.latencies.Percentile(99) / 1000, result.latencies.Percentile(99.9) / 1000);

@ -94,8 +94,11 @@ message ClientConfig {
required int32 outstanding_rpcs_per_channel = 4;
required int32 client_channels = 5;
required int32 payload_size = 6;
// only for async client:
optional int32 async_client_threads = 7;
}
// Request current stats
message Mark {}
message ClientArgs {

@ -80,7 +80,7 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
case ClientType::SYNCHRONOUS_CLIENT:
return CreateSynchronousClient(config);
case ClientType::ASYNC_CLIENT:
abort(); // return CreateAsyncClient(config);
return CreateAsyncClient(config);
}
abort();
}

Loading…
Cancel
Save