From dea740f3297ea78af9d5b76f89aedee869a74963 Mon Sep 17 00:00:00 2001 From: vjpai Date: Thu, 26 Feb 2015 16:35:35 -0800 Subject: [PATCH] New multithreaded async C++ tests. The server is architected the way that it should be with multiple threads waiting on a single completion queue. The client currently uses a separate completion queue per-thread, as trying to do a single unified queue was leading to crashes for me. I need to figure that out. --- test/cpp/qps/client_async.cc | 340 +++++++++++++++++++++++++++++++++++ test/cpp/qps/server_async.cc | 296 ++++++++++++++++++++++++++++++ 2 files changed, 636 insertions(+) create mode 100644 test/cpp/qps/client_async.cc create mode 100644 test/cpp/qps/server_async.cc diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc new file mode 100644 index 00000000000..13db4febae5 --- /dev/null +++ b/test/cpp/qps/client_async.cc @@ -0,0 +1,340 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include "test/core/util/grpc_profiler.h" +#include "test/cpp/util/create_test_channel.h" +#include "test/cpp/qps/qpstest.pb.h" + +DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); +DEFINE_int32(server_port, 0, "Server port."); +DEFINE_string(server_host, "127.0.0.1", "Server host."); +DEFINE_int32(client_threads, 4, "Number of client threads."); + +// We have a configurable number of channels for sending RPCs. +// RPCs are sent round-robin on the available channels by the +// various threads. Interesting cases are 1 global channel or +// 1 per-thread channel, but we can support any number. +// The channels are assigned round-robin on an RPC by RPC basis +// rather than just at initialization time in order to also measure the +// impact of cache thrashing caused by channel changes. This is an issue +// if you are not in one of the above "interesting cases" +DEFINE_int32(client_channels, 4, "Number of client channels."); + +DEFINE_int32(num_rpcs, 1000, "Number of RPCs per thread."); +DEFINE_int32(payload_size, 1, "Payload size in bytes"); + +// Alternatively, specify parameters for test as a workload so that multiple +// tests are initiated back-to-back. This is convenient for keeping a borg +// allocation consistent. This is a space-separated list of +// [threads channels num_rpcs payload_size ]* +DEFINE_string(workload, "", "Workload parameters"); + +using grpc::ChannelInterface; +using grpc::CreateTestChannel; +using grpc::testing::ServerStats; +using grpc::testing::SimpleRequest; +using grpc::testing::SimpleResponse; +using grpc::testing::StatsRequest; +using grpc::testing::TestService; + +// In some distros, gflags is in the namespace google, and in some others, +// in gflags. This hack is enabling us to find both. +namespace google { } +namespace gflags { } +using namespace google; +using namespace gflags; + +static double now() { + gpr_timespec tv = gpr_now(); + return 1e9 * tv.tv_sec + tv.tv_nsec; +} + + class ClientRpcContext { + public: + ClientRpcContext() {} + virtual ~ClientRpcContext() {} + virtual bool operator()() = 0; // do next state, return false if steps done + static void *tag(ClientRpcContext *c) {return reinterpret_cast(c);} + static ClientRpcContext *detag(void *t) { + return reinterpret_cast(t); + } + virtual void report_stats(gpr_histogram *hist) = 0; + }; + template + class ClientRpcContextUnaryImpl : public ClientRpcContext { + public: + ClientRpcContextUnaryImpl(const RequestType& req, + std::function *(grpc::ClientContext *, + const RequestType&, void *)> start_req, + std::function on_done): + context_(), req_(req), response_(), + next_state_(&ClientRpcContextUnaryImpl::ReqSent), + callback_(on_done), + start_(now()), + response_reader_(start_req(&context_, req_, + ClientRpcContext::tag(this))) { + } + ~ClientRpcContextUnaryImpl() override {} + bool operator()() override {return (this->*next_state_)();} + void report_stats(gpr_histogram *hist) override { + gpr_histogram_add(hist, now()-start_); + } + private: + bool ReqSent() { + next_state_ = &ClientRpcContextUnaryImpl::RespDone; + response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); + return true; + } + bool RespDone() { + next_state_ = &ClientRpcContextUnaryImpl::DoCallBack; + return false; + } + bool DoCallBack() { + callback_(status_, &response_); + return false; + } + grpc::ClientContext context_; + RequestType req_; + ResponseType response_; + bool (ClientRpcContextUnaryImpl::*next_state_)(); + std::function callback_; + grpc::Status status_; + double start_; + std::unique_ptr> response_reader_; + }; + +static void RunTest(const int client_threads, const int client_channels, + const int num_rpcs, const int payload_size) { + gpr_log(GPR_INFO, + "QPS test with parameters\n" + "enable_ssl = %d\n" + "client_channels = %d\n" + "client_threads = %d\n" + "num_rpcs = %d\n" + "payload_size = %d\n" + "server_host:server_port = %s:%d\n\n", + FLAGS_enable_ssl, client_channels, client_threads, num_rpcs, + payload_size, FLAGS_server_host.c_str(), FLAGS_server_port); + + std::ostringstream oss; + oss << FLAGS_server_host << ":" << FLAGS_server_port; + + class ClientChannelInfo { + public: + explicit ClientChannelInfo(const grpc::string &server) + : channel_(CreateTestChannel(server, FLAGS_enable_ssl)), + stub_(TestService::NewStub(channel_)) {} + ChannelInterface *get_channel() { return channel_.get(); } + TestService::Stub *get_stub() { return stub_.get(); } + + private: + std::shared_ptr channel_; + std::unique_ptr stub_; + }; + + std::vector channels; + for (int i = 0; i < client_channels; i++) { + channels.push_back(ClientChannelInfo(oss.str())); + } + + std::vector threads; // Will add threads when ready to execute + std::vector<::gpr_histogram *> thread_stats(client_threads); + + TestService::Stub *stub_stats = channels[0].get_stub(); + grpc::ClientContext context_stats_begin; + StatsRequest stats_request; + ServerStats server_stats_begin; + stats_request.set_test_num(0); + grpc::Status status_beg = stub_stats->CollectServerStats( + &context_stats_begin, stats_request, &server_stats_begin); + + grpc_profiler_start("qps_client_async.prof"); + + auto CheckDone = [=](grpc::Status s, SimpleResponse *response) { + GPR_ASSERT(s.IsOk() && + (response->payload().type() == + grpc::testing::PayloadType::COMPRESSABLE) && + (response->payload().body().length() == + static_cast(payload_size))); + }; + + for (int i = 0; i < client_threads; i++) { + gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); + GPR_ASSERT(hist != NULL); + thread_stats[i] = hist; + + threads.push_back( + std::thread([hist, client_threads, client_channels, num_rpcs, + payload_size, &channels, &CheckDone](int channel_num) { + using namespace std::placeholders; + SimpleRequest request; + request.set_response_type( + grpc::testing::PayloadType::COMPRESSABLE); + request.set_response_size(payload_size); + + grpc::CompletionQueue cli_cq; + + int rpcs_sent=0; + while (rpcs_sent < num_rpcs) { + rpcs_sent++; + TestService::Stub *stub = + channels[channel_num].get_stub(); + grpc::ClientContext context; + auto start_req = std::bind(static_cast*(TestService::Stub::*)(grpc::ClientContext *,const SimpleRequest &,grpc::CompletionQueue *,void *)> + (&TestService::Stub::UnaryCall), + stub, _1, _2, &cli_cq, _3); + new ClientRpcContextUnaryImpl(request, + start_req, + CheckDone); + void *got_tag; + bool ok; + + // Need to call 2 next for every 1 RPC (1 for req done, 1 for resp done) + cli_cq.Next(&got_tag,&ok); + if (!ok) + break; + ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); + if ((*ctx)() == false) { + // call the callback and then delete it + (*ctx)(); + delete ctx; + } + cli_cq.Next(&got_tag,&ok); + if (!ok) + break; + ctx = ClientRpcContext::detag(got_tag); + if ((*ctx)() == false) { + // call the callback and then delete it + ctx->report_stats(hist); + (*ctx)(); + delete ctx; + } + // Now do runtime round-robin assignment of the next + // channel number + channel_num += client_threads; + channel_num %= client_channels; + } + }, + i % client_channels)); + } + + gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); + GPR_ASSERT(hist != NULL); + for (auto &t : threads) { + t.join(); + } + + grpc_profiler_stop(); + + for (int i = 0; i < client_threads; i++) { + gpr_histogram *h = thread_stats[i]; + gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f", + i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90), + gpr_histogram_percentile(h, 95), gpr_histogram_percentile(h, 99), + gpr_histogram_percentile(h, 99.9)); + gpr_histogram_merge(hist, h); + gpr_histogram_destroy(h); + } + + gpr_log( + GPR_INFO, + "latency across %d threads with %d channels and %d payload " + "(50/90/95/99/99.9): %f / %f / %f / %f / %f", + client_threads, client_channels, payload_size, + gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 90), + gpr_histogram_percentile(hist, 95), gpr_histogram_percentile(hist, 99), + gpr_histogram_percentile(hist, 99.9)); + gpr_histogram_destroy(hist); + + grpc::ClientContext context_stats_end; + ServerStats server_stats_end; + grpc::Status status_end = stub_stats->CollectServerStats( + &context_stats_end, stats_request, &server_stats_end); + + double elapsed = server_stats_end.time_now() - server_stats_begin.time_now(); + int total_rpcs = client_threads * num_rpcs; + double utime = server_stats_end.time_user() - server_stats_begin.time_user(); + double stime = + server_stats_end.time_system() - server_stats_begin.time_system(); + gpr_log(GPR_INFO, + "Elapsed time: %.3f\n" + "RPC Count: %d\n" + "QPS: %.3f\n" + "System time: %.3f\n" + "User time: %.3f\n" + "Resource usage: %.1f%%\n", + elapsed, total_rpcs, total_rpcs / elapsed, stime, utime, + (stime + utime) / elapsed * 100.0); +} + +int main(int argc, char **argv) { + grpc_init(); + ParseCommandLineFlags(&argc, &argv, true); + + GPR_ASSERT(FLAGS_server_port); + + if (FLAGS_workload.length() == 0) { + RunTest(FLAGS_client_threads, FLAGS_client_channels, FLAGS_num_rpcs, + FLAGS_payload_size); + } else { + std::istringstream workload(FLAGS_workload); + int client_threads, client_channels, num_rpcs, payload_size; + workload >> client_threads; + while (!workload.eof()) { + workload >> client_channels >> num_rpcs >> payload_size; + RunTest(client_threads, client_channels, num_rpcs, payload_size); + workload >> client_threads; + } + gpr_log(GPR_INFO, "Done with specified workload."); + } + + grpc_shutdown(); + return 0; +} diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc new file mode 100644 index 00000000000..fec17ea79fa --- /dev/null +++ b/test/cpp/qps/server_async.cc @@ -0,0 +1,296 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "src/cpp/server/thread_pool.h" +#include "test/core/util/grpc_profiler.h" +#include "test/cpp/qps/qpstest.pb.h" + +#include +#include + +DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); +DEFINE_int32(port, 0, "Server port."); +DEFINE_int32(server_threads, 4, "Number of server threads."); + +using grpc::CompletionQueue; +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerContext; +using grpc::ThreadPool; +using grpc::testing::Payload; +using grpc::testing::PayloadType; +using grpc::testing::ServerStats; +using grpc::testing::SimpleRequest; +using grpc::testing::SimpleResponse; +using grpc::testing::StatsRequest; +using grpc::testing::TestService; +using grpc::Status; + +// In some distros, gflags is in the namespace google, and in some others, +// in gflags. This hack is enabling us to find both. +namespace google {} +namespace gflags {} +using namespace google; +using namespace gflags; + +static bool got_sigint = false; + +static void sigint_handler(int x) { got_sigint = 1; } + +static double time_double(struct timeval *tv) { + return tv->tv_sec + 1e-6 * tv->tv_usec; +} + +static bool SetPayload(PayloadType type, int size, Payload *payload) { + PayloadType response_type = type; + // TODO(yangg): Support UNCOMPRESSABLE payload. + if (type != PayloadType::COMPRESSABLE) { + return false; + } + payload->set_type(response_type); + std::unique_ptr body(new char[size]()); + payload->set_body(body.get(), size); + return true; +} + +namespace { + +class AsyncQpsServerTest { + public: + AsyncQpsServerTest() : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { + char *server_address = NULL; + gpr_join_host_port(&server_address, "::", FLAGS_port); + + ServerBuilder builder; + builder.AddPort(server_address); + + builder.RegisterAsyncService(&async_service_); + + server_ = builder.BuildAndStart(); + gpr_log(GPR_INFO, "Server listening on %s\n", server_address); + gpr_free(server_address); + + using namespace std::placeholders; + request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall, + &async_service_, _1, _2, _3, &srv_cq_, _4); + request_stats_ = + std::bind(&TestService::AsyncService::RequestCollectServerStats, + &async_service_, _1, _2, _3, &srv_cq_, _4); + for (int i = 0; i < 100; i++) { + contexts_.push_front(new ServerRpcContextUnaryImpl(request_unary_, UnaryCall)); + contexts_.push_front(new ServerRpcContextUnaryImpl(request_stats_, CollectServerStats)); + } + } + ~AsyncQpsServerTest() { + server_->Shutdown(); + void *ignored_tag; + bool ignored_ok; + srv_cq_.Shutdown(); + while (srv_cq_.Next(&ignored_tag, &ignored_ok)) { + } + while (!contexts_.empty()) { + delete contexts_.front(); + contexts_.pop_front(); + } + } + void ServeRpcs(int num_threads) { + std::vector threads; + for (int i = 0; i < num_threads; i++) { + threads.push_back(std::thread([=]() { + // Wait until work is available or we are shutting down + bool ok; + void *got_tag; + while (srv_cq_.Next(&got_tag, &ok)) { + EXPECT_EQ(ok, true); + ServerRpcContext *ctx = detag(got_tag); + // The tag is a pointer to an RPC context to invoke + if ((*ctx)() == false) { + // this RPC context is done, so refresh it + ctx->refresh(); + } + } + return; + })); + } + while (!got_sigint) { + std::this_thread::sleep_for(std::chrono::seconds(5)); + } + } + private: + class ServerRpcContext { + public: + ServerRpcContext() {} + virtual ~ServerRpcContext() {}; + virtual bool operator()() = 0; // do next state, return false if all done + virtual void refresh() = 0; // start this back at a clean state + }; + static void *tag(ServerRpcContext *func) { + return reinterpret_cast(func); + } + static ServerRpcContext *detag(void *tag) { + return reinterpret_cast(tag); + } + + template + class ServerRpcContextUnaryImpl : public ServerRpcContext { + public: + ServerRpcContextUnaryImpl( + std::function *, + void *)> request_method, + std::function + invoke_method) + : next_state_(&ServerRpcContextUnaryImpl::invoker), + request_method_(request_method), + invoke_method_(invoke_method), + response_writer_(&srv_ctx_) { + request_method_(&srv_ctx_, &req_, &response_writer_, + AsyncQpsServerTest::tag(this)); + } + ~ServerRpcContextUnaryImpl() override {} + bool operator()() override {return (this->*next_state_)();} + void refresh() override { + srv_ctx_ = ServerContext(); + req_ = RequestType(); + response_writer_ = + grpc::ServerAsyncResponseWriter(&srv_ctx_); + + // Then request the method + next_state_ = &ServerRpcContextUnaryImpl::invoker; + request_method_(&srv_ctx_, &req_, &response_writer_, + AsyncQpsServerTest::tag(this)); + } + private: + bool finisher() {return false;} + bool invoker() { + ResponseType response; + + // Call the RPC processing function + grpc::Status status = invoke_method_(&req_, &response); + + // Have the response writer work and invoke on_finish when done + next_state_ = &ServerRpcContextUnaryImpl::finisher; + response_writer_.Finish(response, status, + AsyncQpsServerTest::tag(this)); + return true; + } + ServerContext srv_ctx_; + RequestType req_; + bool (ServerRpcContextUnaryImpl::*next_state_)(); + std::function *, void *)> + request_method_; + std::function + invoke_method_; + grpc::ServerAsyncResponseWriter response_writer_; + }; + + static Status CollectServerStats(const StatsRequest *, + ServerStats *response) { + struct rusage usage; + struct timeval tv; + gettimeofday(&tv, NULL); + getrusage(RUSAGE_SELF, &usage); + response->set_time_now(time_double(&tv)); + response->set_time_user(time_double(&usage.ru_utime)); + response->set_time_system(time_double(&usage.ru_stime)); + return Status::OK; + } + static Status UnaryCall(const SimpleRequest *request, + SimpleResponse *response) { + if (request->has_response_size() && request->response_size() > 0) { + if (!SetPayload(request->response_type(), request->response_size(), + response->mutable_payload())) { + return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + } + } + return Status::OK; + } + CompletionQueue srv_cq_; + TestService::AsyncService async_service_; + std::unique_ptr server_; + std::function *, void *)> + request_unary_; + std::function *, void *)> + request_stats_; + std::forward_list contexts_; +}; + +} // namespace + +static void RunServer() { + AsyncQpsServerTest server; + + grpc_profiler_start("qps_server_async.prof"); + + server.ServeRpcs(FLAGS_server_threads); + + grpc_profiler_stop(); +} + +int main(int argc, char **argv) { + grpc_init(); + ParseCommandLineFlags(&argc, &argv, true); + GPR_ASSERT(FLAGS_port != 0); + GPR_ASSERT(!FLAGS_enable_ssl); + + signal(SIGINT, sigint_handler); + + RunServer(); + + grpc_shutdown(); + google::protobuf::ShutdownProtobufLibrary(); + + return 0; +}