From 8856875900b6f9ae00cd5ffa311a9ed33c40c7e1 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 4 Mar 2015 10:50:43 -0800 Subject: [PATCH] Async client progress --- Makefile | 2 + build.json | 1 + test/cpp/qps/client.cc | 120 ++++------------------------ test/cpp/qps/client.h | 120 +++++++++++++++++++++++++++- test/cpp/qps/client_async.cc | 149 +++++++++++++++++------------------ test/cpp/qps/qps_driver.cc | 4 + test/cpp/qps/qpstest.proto | 3 + test/cpp/qps/worker.cc | 2 +- 8 files changed, 217 insertions(+), 184 deletions(-) diff --git a/Makefile b/Makefile index a1523e2254d..36eedb95add 100644 --- a/Makefile +++ b/Makefile @@ -8179,6 +8179,7 @@ endif QPS_WORKER_SRC = \ test/cpp/qps/client.cc \ + test/cpp/qps/client_async.cc \ test/cpp/qps/server.cc \ test/cpp/qps/worker.cc \ @@ -8211,6 +8212,7 @@ endif endif $(OBJDIR)/$(CONFIG)/test/cpp/qps/client.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a +$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/test/cpp/qps/server.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/test/cpp/qps/worker.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a diff --git a/build.json b/build.json index 922e9159d16..08b1e1842c3 100644 --- a/build.json +++ b/build.json @@ -1847,6 +1847,7 @@ ], "src": [ "test/cpp/qps/client.cc", + "test/cpp/qps/client_async.cc", "test/cpp/qps/server.cc", "test/cpp/qps/worker.cc" ], diff --git a/test/cpp/qps/client.cc b/test/cpp/qps/client.cc index 877007038ef..0be01e137fb 100644 --- a/test/cpp/qps/client.cc +++ b/test/cpp/qps/client.cc @@ -63,118 +63,26 @@ namespace testing { class SynchronousClient GRPC_FINAL : public Client { public: - SynchronousClient(const ClientConfig& config) : timer_(new Timer) { - for (int i = 0; i < config.client_channels(); i++) { - channels_.push_back(ClientChannelInfo( - config.server_targets(i % config.server_targets_size()), config)); - auto* stub = channels_.back().get_stub(); - for (int j = 0; j < config.outstanding_rpcs_per_channel(); j++) { - threads_.emplace_back(new Thread(stub, config)); - } - } + SynchronousClient(const ClientConfig& config) : Client(config) { + size_t num_threads = config.outstanding_rpcs_per_channel() * config.client_channels(); + responses_.resize(num_threads); + StartThreads(num_threads); } - ClientStats Mark() { - Histogram latencies; - std::vector to_merge(threads_.size()); - for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->BeginSwap(&to_merge[i]); - } - std::unique_ptr timer(new Timer); - timer_.swap(timer); - for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->EndSwap(); - latencies.Merge(&to_merge[i]); - } - - auto timer_result = timer->Mark(); + ~SynchronousClient() { + EndThreads(); + } - ClientStats stats; - latencies.FillProto(stats.mutable_latencies()); - stats.set_time_elapsed(timer_result.wall); - stats.set_time_system(timer_result.system); - stats.set_time_user(timer_result.user); - return stats; + void ThreadFunc(Histogram* histogram, size_t thread_idx) { + auto* stub = channels_[thread_idx % channels_.size()].get_stub(); + double start = Timer::Now(); + grpc::ClientContext context; + grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); + histogram->Add((Timer::Now() - start) * 1e9); } private: - class Thread { - public: - Thread(TestService::Stub* stub, const ClientConfig& config) - : stub_(stub), - config_(config), - done_(false), - new_(nullptr), - impl_([this]() { - SimpleRequest request; - SimpleResponse response; - request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request.set_response_size(config_.payload_size()); - for (;;) { - { - std::lock_guard g(mu_); - if (done_) return; - if (new_) { - new_->Swap(&histogram_); - new_ = nullptr; - cv_.notify_one(); - } - } - double start = Timer::Now(); - grpc::ClientContext context; - grpc::Status s = stub_->UnaryCall(&context, request, &response); - histogram_.Add((Timer::Now() - start) * 1e9); - } - }) {} - - ~Thread() { - { - std::lock_guard g(mu_); - done_ = true; - } - impl_.join(); - } - - void BeginSwap(Histogram* n) { - std::lock_guard g(mu_); - new_ = n; - } - - void EndSwap() { - std::unique_lock g(mu_); - cv_.wait(g, [this]() { return new_ == nullptr; }); - } - - private: - Thread(const Thread&); - Thread& operator=(const Thread&); - - TestService::Stub* stub_; - ClientConfig config_; - std::mutex mu_; - std::condition_variable cv_; - bool done_; - Histogram* new_; - Histogram histogram_; - std::thread impl_; - }; - - class ClientChannelInfo { - public: - explicit ClientChannelInfo(const grpc::string& target, - const ClientConfig& config) - : channel_(CreateTestChannel(target, config.enable_ssl())), - stub_(TestService::NewStub(channel_)) {} - ChannelInterface* get_channel() { return channel_.get(); } - TestService::Stub* get_stub() { return stub_.get(); } - - private: - std::shared_ptr channel_; - std::unique_ptr stub_; - }; - std::vector channels_; - std::vector> threads_; - std::unique_ptr timer_; + std::vector responses_; }; std::unique_ptr CreateSynchronousClient(const ClientConfig& config) { diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 97701d3d18f..c79661c4566 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -34,6 +34,8 @@ #ifndef TEST_QPS_CLIENT_H #define TEST_QPS_CLIENT_H +#include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/timer.h" #include "test/cpp/qps/qpstest.pb.h" namespace grpc { @@ -41,9 +43,125 @@ namespace testing { class Client { public: + explicit Client(const ClientConfig& config) : timer_(new Timer) { + for (int i = 0; i < config.client_channels(); i++) { + channels_.push_back(ClientChannelInfo( + config.server_targets(i % config.server_targets_size()), config)); + } + request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + request_.set_response_size(config.payload_size()); + } virtual ~Client() {} - virtual ClientStats Mark() = 0; + ClientStats Mark() { + Histogram latencies; + std::vector to_merge(threads_.size()); + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->BeginSwap(&to_merge[i]); + } + std::unique_ptr timer(new Timer); + timer_.swap(timer); + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->EndSwap(); + latencies.Merge(&to_merge[i]); + } + + auto timer_result = timer->Mark(); + + ClientStats stats; + latencies.FillProto(stats.mutable_latencies()); + stats.set_time_elapsed(timer_result.wall); + stats.set_time_system(timer_result.system); + stats.set_time_user(timer_result.user); + return stats; + } + + protected: + SimpleRequest request_; + + class ClientChannelInfo { + public: + explicit ClientChannelInfo(const grpc::string& target, + const ClientConfig& config) + : channel_(CreateTestChannel(target, config.enable_ssl())), + stub_(TestService::NewStub(channel_)) {} + ChannelInterface* get_channel() { return channel_.get(); } + TestService::Stub* get_stub() { return stub_.get(); } + + private: + std::shared_ptr channel_; + std::unique_ptr stub_; + }; + std::vector channels_; + + void StartThreads(size_t num_threads) { + for (size_t i = 0; i < num_threads; i++) { + threads_.emplace_back(new Thread(this, i)); + } + } + + void EndThreads() { + threads_.clear(); + } + + virtual void ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + + private: + class Thread { + public: + Thread(Client* client, size_t idx) + : done_(false), + new_(nullptr), + impl_([this, idx, client]() { + for (;;) { + // run the loop body + client->ThreadFunc(&histogram_, idx); + // lock, see if we're done + std::lock_guard g(mu_); + if (done_) return; + // also check if we're marking, and swap out the histogram if so + if (new_) { + new_->Swap(&histogram_); + new_ = nullptr; + cv_.notify_one(); + } + } + }) {} + + ~Thread() { + { + std::lock_guard g(mu_); + done_ = true; + } + impl_.join(); + } + + void BeginSwap(Histogram* n) { + std::lock_guard g(mu_); + new_ = n; + } + + void EndSwap() { + std::unique_lock g(mu_); + cv_.wait(g, [this]() { return new_ == nullptr; }); + } + + private: + Thread(const Thread&); + Thread& operator=(const Thread&); + + TestService::Stub* stub_; + ClientConfig config_; + std::mutex mu_; + std::condition_variable cv_; + bool done_; + Histogram* new_; + Histogram histogram_; + std::thread impl_; + }; + + std::vector> threads_; + std::unique_ptr timer_; }; std::unique_ptr CreateSynchronousClient(const ClientConfig& args); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 9ea9cfe8b9b..30b85afc299 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -49,50 +49,11 @@ #include "test/core/util/grpc_profiler.h" #include "test/cpp/util/create_test_channel.h" #include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/timer.h" +#include "test/cpp/qps/client.h" -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); -DEFINE_int32(server_port, 0, "Server port."); -DEFINE_string(server_host, "127.0.0.1", "Server host."); -DEFINE_int32(client_threads, 4, "Number of client threads."); - -// We have a configurable number of channels for sending RPCs. -// RPCs are sent round-robin on the available channels by the -// various threads. Interesting cases are 1 global channel or -// 1 per-thread channel, but we can support any number. -// The channels are assigned round-robin on an RPC by RPC basis -// rather than just at initialization time in order to also measure the -// impact of cache thrashing caused by channel changes. This is an issue -// if you are not in one of the above "interesting cases" -DEFINE_int32(client_channels, 4, "Number of client channels."); - -DEFINE_int32(num_rpcs, 1000, "Number of RPCs per thread."); -DEFINE_int32(payload_size, 1, "Payload size in bytes"); - -// Alternatively, specify parameters for test as a workload so that multiple -// tests are initiated back-to-back. This is convenient for keeping a borg -// allocation consistent. This is a space-separated list of -// [threads channels num_rpcs payload_size ]* -DEFINE_string(workload, "", "Workload parameters"); - -using grpc::ChannelInterface; -using grpc::CreateTestChannel; -using grpc::testing::ServerStats; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google {} -namespace gflags {} -using namespace google; -using namespace gflags; - -static double now() { - gpr_timespec tv = gpr_now(); - return 1e9 * tv.tv_sec + tv.tv_nsec; -} +namespace grpc { +namespace testing { class ClientRpcContext { public: @@ -103,8 +64,9 @@ class ClientRpcContext { static ClientRpcContext *detag(void *t) { return reinterpret_cast(t); } - virtual void report_stats(gpr_histogram *hist) = 0; + virtual void report_stats(Histogram *hist) = 0; }; + template class ClientRpcContextUnaryImpl : public ClientRpcContext { public: @@ -113,22 +75,22 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { const RequestType &req, std::function< std::unique_ptr>( - TestService::Stub *, grpc::ClientContext *, const RequestType &, - void *)> start_req, + TestService::Stub *, grpc::ClientContext *, const RequestType &, + void *)> start_req, std::function on_done) : context_(), - stub_(stub), + stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextUnaryImpl::ReqSent), callback_(on_done), - start_(now()), + start_(Timer::Now()), response_reader_( - start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} + start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} bool RunNextState() GRPC_OVERRIDE { return (this->*next_state_)(); } - void report_stats(gpr_histogram *hist) GRPC_OVERRIDE { - gpr_histogram_add(hist, now() - start_); + void report_stats(Histogram *hist) GRPC_OVERRIDE { + hist->Add((Timer::Now() - start_) * 1e9); } private: @@ -157,6 +119,64 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { response_reader_; }; +class AsyncClient GRPC_FINAL : public Client { + public: + explicit AsyncClient(const ClientConfig& config) : Client(config) { + for (int i = 0; i < config.async_client_threads(); i++) { + cli_cqs_.emplace_back(new CompletionQueue); + } + + auto payload_size = config.payload_size(); + auto check_done = [payload_size](grpc::Status s, SimpleResponse *response) { + GPR_ASSERT(s.IsOk() && (response->payload().type() == + grpc::testing::PayloadType::COMPRESSABLE) && + (response->payload().body().length() == + static_cast(payload_size))); + }; + + int t = 0; + for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { + for (auto& channel : channels_) { + auto *cq = cli_cqs_[t].get(); + t = (t + 1) % cli_cqs_.size(); + auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx, const SimpleRequest& request, void *tag) { + return stub->AsyncUnaryCall(ctx, request, cq, tag); + }; + + TestService::Stub* stub = channel.get_stub(); + const SimpleRequest& request = request_; + new ClientRpcContextUnaryImpl(stub, request, start_req, check_done); + } + } + + StartThreads(config.async_client_threads()); + } + + void ThreadFunc(Histogram *histogram, size_t thread_idx) { + void *got_tag; + bool ok; + cli_cqs_[thread_idx]->Next(&got_tag, &ok); + + ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); + if (ctx->RunNextState() == false) { + // call the callback and then delete it + ctx->report_stats(histogram); + ctx->RunNextState(); + delete ctx; + } + } + + std::vector> cli_cqs_; +}; + +std::unique_ptr CreateAsyncClient(const ClientConfig& args) { + return std::unique_ptr(new AsyncClient(args)); +} + +} // namespace testing +} // namespace grpc + +#if 0 static void RunTest(const int client_threads, const int client_channels, const int num_rpcs, const int payload_size) { gpr_log(GPR_INFO, @@ -173,23 +193,7 @@ static void RunTest(const int client_threads, const int client_channels, std::ostringstream oss; oss << FLAGS_server_host << ":" << FLAGS_server_port; - class ClientChannelInfo { - public: - explicit ClientChannelInfo(const grpc::string &server) - : channel_(CreateTestChannel(server, FLAGS_enable_ssl)), - stub_(TestService::NewStub(channel_)) {} - ChannelInterface *get_channel() { return channel_.get(); } - TestService::Stub *get_stub() { return stub_.get(); } - - private: - std::shared_ptr channel_; - std::unique_ptr stub_; - }; - - std::vector channels; - for (int i = 0; i < client_channels; i++) { - channels.push_back(ClientChannelInfo(oss.str())); - } + std::vector threads; // Will add threads when ready to execute std::vector< ::gpr_histogram *> thread_stats(client_threads); @@ -204,12 +208,6 @@ static void RunTest(const int client_threads, const int client_channels, grpc_profiler_start("qps_client_async.prof"); - auto CheckDone = [=](grpc::Status s, SimpleResponse *response) { - GPR_ASSERT(s.IsOk() && (response->payload().type() == - grpc::testing::PayloadType::COMPRESSABLE) && - (response->payload().body().length() == - static_cast(payload_size))); - }; for (int i = 0; i < client_threads; i++) { gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); @@ -225,8 +223,6 @@ static void RunTest(const int client_threads, const int client_channels, request.set_response_size(payload_size); grpc::CompletionQueue cli_cq; - auto start_req = std::bind(&TestService::Stub::AsyncUnaryCall, _1, - _2, _3, &cli_cq, _4); int rpcs_sent = 0; while (rpcs_sent < num_rpcs) { @@ -339,3 +335,4 @@ int main(int argc, char **argv) { grpc_shutdown(); return 0; } +#endif diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 58656b6f881..7d73bb40d29 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -53,6 +53,7 @@ DEFINE_int32(outstanding_rpcs_per_channel, 1, DEFINE_int32(client_channels, 1, "Number of client channels"); DEFINE_int32(payload_size, 1, "Payload size"); DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type"); +DEFINE_int32(async_client_threads, 1, "Async client threads"); using grpc::testing::ClientConfig; using grpc::testing::ServerConfig; @@ -84,6 +85,7 @@ int main(int argc, char **argv) { FLAGS_outstanding_rpcs_per_channel); client_config.set_client_channels(FLAGS_client_channels); client_config.set_payload_size(FLAGS_payload_size); + client_config.set_async_client_threads(FLAGS_async_client_threads); ServerConfig server_config; server_config.set_server_type(server_type); @@ -93,6 +95,8 @@ int main(int argc, char **argv) { auto result = RunScenario(client_config, FLAGS_num_clients, server_config, FLAGS_num_servers); + gpr_log(GPR_INFO, "QPS: %.1f", result.latencies.Count() / average(result.client_resources, [](ResourceUsage u) { return u.wall_time; })); + gpr_log(GPR_INFO, "Latencies (50/95/99/99.9%%-ile): %.1f/%.1f/%.1f/%.1f us", result.latencies.Percentile(50) / 1000, result.latencies.Percentile(95) / 1000, result.latencies.Percentile(99) / 1000, result.latencies.Percentile(99.9) / 1000); diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto index 6543e64a014..6a7170bf580 100644 --- a/test/cpp/qps/qpstest.proto +++ b/test/cpp/qps/qpstest.proto @@ -94,8 +94,11 @@ message ClientConfig { required int32 outstanding_rpcs_per_channel = 4; required int32 client_channels = 5; required int32 payload_size = 6; + // only for async client: + optional int32 async_client_threads = 7; } +// Request current stats message Mark {} message ClientArgs { diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index b5dbc1570db..4a2e798a477 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -80,7 +80,7 @@ std::unique_ptr CreateClient(const ClientConfig& config) { case ClientType::SYNCHRONOUS_CLIENT: return CreateSynchronousClient(config); case ClientType::ASYNC_CLIENT: - abort(); // return CreateAsyncClient(config); + return CreateAsyncClient(config); } abort(); }