diff --git a/test/cpp/qps/client.cc b/test/cpp/qps/client.cc index 11c39eb4f52..213e5d92149 100644 --- a/test/cpp/qps/client.cc +++ b/test/cpp/qps/client.cc @@ -33,48 +33,36 @@ #include #include +#include #include #include #include #include +#include + #include +#include #include #include +#include #include #include #include +#include +#include #include "test/core/util/grpc_profiler.h" #include "test/cpp/util/create_test_channel.h" #include "test/cpp/qps/qpstest.pb.h" -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); -DEFINE_int32(server_port, 0, "Server port."); -DEFINE_string(server_host, "127.0.0.1", "Server host."); -DEFINE_int32(client_threads, 4, "Number of client threads."); - -// We have a configurable number of channels for sending RPCs. -// RPCs are sent round-robin on the available channels by the -// various threads. Interesting cases are 1 global channel or -// 1 per-thread channel, but we can support any number. -// The channels are assigned round-robin on an RPC by RPC basis -// rather than just at initialization time in order to also measure the -// impact of cache thrashing caused by channel changes. This is an issue -// if you are not in one of the above "interesting cases" -DEFINE_int32(client_channels, 4, "Number of client channels."); - -DEFINE_int32(num_rpcs, 1000, "Number of RPCs per thread."); -DEFINE_int32(payload_size, 1, "Payload size in bytes"); - -// Alternatively, specify parameters for test as a workload so that multiple -// tests are initiated back-to-back. This is convenient for keeping a borg -// allocation consistent. This is a space-separated list of -// [threads channels num_rpcs payload_size ]* -DEFINE_string(workload, "", "Workload parameters"); +DEFINE_int32(driver_port, 0, "Client driver port."); using grpc::ChannelInterface; using grpc::CreateTestChannel; -using grpc::testing::ServerStats; +using grpc::ServerBuilder; +using grpc::testing::ClientArgs; +using grpc::testing::ClientResult; +using grpc::testing::QpsClient; using grpc::testing::SimpleRequest; using grpc::testing::SimpleResponse; using grpc::testing::StatsRequest; @@ -92,8 +80,11 @@ static double now() { return 1e9 * tv.tv_sec + tv.tv_nsec; } -void RunTest(const int client_threads, const int client_channels, - const int num_rpcs, const int payload_size) { +static bool got_sigint = false; + +static void sigint_handler(int x) { got_sigint = 1; } + +ClientResult RunTest(const ClientArgs& args) { gpr_log(GPR_INFO, "QPS test with parameters\n" "enable_ssl = %d\n" @@ -101,17 +92,14 @@ void RunTest(const int client_threads, const int client_channels, "client_threads = %d\n" "num_rpcs = %d\n" "payload_size = %d\n" - "server_host:server_port = %s:%d\n\n", - FLAGS_enable_ssl, client_channels, client_threads, num_rpcs, - payload_size, FLAGS_server_host.c_str(), FLAGS_server_port); - - std::ostringstream oss; - oss << FLAGS_server_host << ":" << FLAGS_server_port; + "server_target = %s\n\n", + args.enable_ssl(), args.client_channels(), args.client_threads(), args.num_rpcs(), + args.payload_size(), args.server_target().c_str()); class ClientChannelInfo { public: - explicit ClientChannelInfo(const grpc::string &server) - : channel_(CreateTestChannel(server, FLAGS_enable_ssl)), + explicit ClientChannelInfo(const ClientArgs& args) + : channel_(CreateTestChannel(args.server_target(), args.enable_ssl())), stub_(TestService::NewStub(channel_)) {} ChannelInterface *get_channel() { return channel_.get(); } TestService::Stub *get_stub() { return stub_.get(); } @@ -122,38 +110,33 @@ void RunTest(const int client_threads, const int client_channels, }; std::vector channels; - for (int i = 0; i < client_channels; i++) { - channels.push_back(ClientChannelInfo(oss.str())); + for (int i = 0; i < args.client_channels(); i++) { + channels.push_back(ClientChannelInfo(args)); } std::vector threads; // Will add threads when ready to execute - std::vector< ::gpr_histogram *> thread_stats(client_threads); + std::vector< ::gpr_histogram *> thread_stats(args.client_threads()); - TestService::Stub *stub_stats = channels[0].get_stub(); grpc::ClientContext context_stats_begin; - StatsRequest stats_request; - ServerStats server_stats_begin; - stats_request.set_test_num(0); - grpc::Status status_beg = stub_stats->CollectServerStats( - &context_stats_begin, stats_request, &server_stats_begin); grpc_profiler_start("qps_client.prof"); - for (int i = 0; i < client_threads; i++) { + gpr_timespec start = gpr_now(); + + for (int i = 0; i < args.client_threads(); i++) { gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); GPR_ASSERT(hist != NULL); thread_stats[i] = hist; threads.push_back( - std::thread([hist, client_threads, client_channels, num_rpcs, - payload_size, &channels](int channel_num) { + std::thread([hist, args, &channels](int channel_num) { SimpleRequest request; SimpleResponse response; request.set_response_type( grpc::testing::PayloadType::COMPRESSABLE); - request.set_response_size(payload_size); + request.set_response_size(args.payload_size()); - for (int j = 0; j < num_rpcs; j++) { + for (int j = 0; j < args.num_rpcs(); j++) { TestService::Stub *stub = channels[channel_num].get_stub(); double start = now(); @@ -166,26 +149,29 @@ void RunTest(const int client_threads, const int client_channels, (response.payload().type() == grpc::testing::PayloadType::COMPRESSABLE) && (response.payload().body().length() == - static_cast(payload_size))); + static_cast(args.payload_size()))); // Now do runtime round-robin assignment of the next // channel number - channel_num += client_threads; - channel_num %= client_channels; + channel_num += args.client_threads(); + channel_num %= args.client_channels(); } }, - i % client_channels)); + i % args.client_channels())); } - gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); - GPR_ASSERT(hist != NULL); for (auto &t : threads) { t.join(); } + gpr_timespec stop = gpr_now(); + grpc_profiler_stop(); - for (int i = 0; i < client_threads; i++) { + gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); + GPR_ASSERT(hist != NULL); + + for (int i = 0; i < args.client_threads(); i++) { gpr_histogram *h = thread_stats[i]; gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f", i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90), @@ -195,57 +181,54 @@ void RunTest(const int client_threads, const int client_channels, gpr_histogram_destroy(h); } - gpr_log( - GPR_INFO, - "latency across %d threads with %d channels and %d payload " - "(50/90/95/99/99.9): %f / %f / %f / %f / %f", - client_threads, client_channels, payload_size, - gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 90), - gpr_histogram_percentile(hist, 95), gpr_histogram_percentile(hist, 99), - gpr_histogram_percentile(hist, 99.9)); + ClientResult result; + auto* latencies = result.mutable_latencies(); + latencies->set_l_50(gpr_histogram_percentile(hist, 50)); + latencies->set_l_90(gpr_histogram_percentile(hist, 90)); + latencies->set_l_99(gpr_histogram_percentile(hist, 99)); + latencies->set_l_999(gpr_histogram_percentile(hist, 99.9)); + gpr_timespec elapsed = gpr_time_sub(stop, start); + result.set_num_rpcs(args.client_threads() * args.num_rpcs()); + result.set_time_elapsed(elapsed.tv_sec + 1e-9 * elapsed.tv_nsec); + gpr_histogram_destroy(hist); - grpc::ClientContext context_stats_end; - ServerStats server_stats_end; - grpc::Status status_end = stub_stats->CollectServerStats( - &context_stats_end, stats_request, &server_stats_end); + return result; +} - double elapsed = server_stats_end.time_now() - server_stats_begin.time_now(); - int total_rpcs = client_threads * num_rpcs; - double utime = server_stats_end.time_user() - server_stats_begin.time_user(); - double stime = - server_stats_end.time_system() - server_stats_begin.time_system(); - gpr_log(GPR_INFO, - "Elapsed time: %.3f\n" - "RPC Count: %d\n" - "QPS: %.3f\n" - "System time: %.3f\n" - "User time: %.3f\n" - "Resource usage: %.1f%%\n", - elapsed, total_rpcs, total_rpcs / elapsed, stime, utime, - (stime + utime) / elapsed * 100.0); +class ClientImpl : public QpsClient::Service { + public: + + private: + std::mutex client_mu_; +}; + +static void RunServer() { + char* server_address = NULL; + gpr_join_host_port(&server_address, "::", FLAGS_driver_port); + + ClientImpl service; + + ServerBuilder builder; + builder.AddPort(server_address); + builder.RegisterService(&service); + + gpr_free(server_address); + + auto server = builder.BuildAndStart(); + + while (!got_sigint) { + std::this_thread::sleep_for(std::chrono::seconds(5)); + } } int main(int argc, char **argv) { grpc_init(); ParseCommandLineFlags(&argc, &argv, true); - GPR_ASSERT(FLAGS_server_port); - - if (FLAGS_workload.length() == 0) { - RunTest(FLAGS_client_threads, FLAGS_client_channels, FLAGS_num_rpcs, - FLAGS_payload_size); - } else { - std::istringstream workload(FLAGS_workload); - int client_threads, client_channels, num_rpcs, payload_size; - workload >> client_threads; - while (!workload.eof()) { - workload >> client_channels >> num_rpcs >> payload_size; - RunTest(client_threads, client_channels, num_rpcs, payload_size); - workload >> client_threads; - } - gpr_log(GPR_INFO, "Done with specified workload."); - } + signal(SIGINT, sigint_handler); + + RunServer(); grpc_shutdown(); return 0; diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto index 68ec6149f59..fc589600176 100644 --- a/test/cpp/qps/qpstest.proto +++ b/test/cpp/qps/qpstest.proto @@ -78,17 +78,24 @@ message Latencies { required double l_999 = 4; } -message StartArgs { - required string server_host = 1; - required int32 server_port = 2; - optional bool enable_ssl = 3 [default = false]; - optional int32 client_threads = 4 [default = 1]; - optional int32 client_channels = 5 [default = -1]; - optional int32 num_rpcs = 6 [default = 1]; - optional int32 payload_size = 7 [default = 1]; +message ClientArgs { + required string server_target = 1; + required bool enable_ssl = 3; + required int32 client_threads = 4; + // We have a configurable number of channels for sending RPCs. + // RPCs are sent round-robin on the available channels by the + // various threads. Interesting cases are 1 global channel or + // 1 per-thread channel, but we can support any number. + // The channels are assigned round-robin on an RPC by RPC basis + // rather than just at initialization time in order to also measure the + // impact of cache thrashing caused by channel changes. This is an issue + // if you are not in one of the above "interesting cases" + required int32 client_channels = 5; + required int32 num_rpcs = 6; + required int32 payload_size = 7; } -message StartResult { +message ClientResult { required Latencies latencies = 1; required int32 num_rpcs = 2; required double time_elapsed = 3; @@ -96,6 +103,14 @@ message StartResult { required double time_system = 5; } +message ServerArgs { + required int32 threads = 1; +} + +message ServerStatus { + optional ServerStats stats = 1; +} + message SimpleRequest { // Desired payload type in the response from the server. // If response_type is RANDOM, server randomly chooses one from other formats. @@ -153,9 +168,6 @@ message StreamingOutputCallResponse { } service TestService { - // Start test with specified workload - rpc StartTest(StartArgs) returns (Latencies); - // Collect stats from server, ignore request content rpc CollectServerStats(StatsRequest) returns (ServerStats); @@ -186,3 +198,13 @@ service TestService { rpc HalfDuplexCall(stream StreamingOutputCallRequest) returns (stream StreamingOutputCallResponse); } + +service QpsClient { + // Start test with specified workload + rpc RunTest(ClientArgs) returns (ClientResult); +} + +service QpsServer { + // Start test with specified workload + rpc RunServer(stream ServerArgs) returns (stream ServerStatus); +} diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc index 8e136349a15..a5edb054938 100644 --- a/test/cpp/qps/server.cc +++ b/test/cpp/qps/server.cc @@ -44,6 +44,7 @@ #include #include #include +#include #include "src/cpp/server/thread_pool.h" #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/qpstest.pb.h" @@ -51,13 +52,13 @@ #include #include -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); DEFINE_int32(port, 0, "Server port."); -DEFINE_int32(server_threads, 4, "Number of server threads."); +DEFINE_int32(driver_port, 0, "Server driver port."); using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; +using grpc::ServerReaderWriter; using grpc::ThreadPool; using grpc::testing::Payload; using grpc::testing::PayloadType; @@ -66,6 +67,10 @@ using grpc::testing::SimpleRequest; using grpc::testing::SimpleResponse; using grpc::testing::StatsRequest; using grpc::testing::TestService; +using grpc::testing::QpsServer; +using grpc::testing::ServerArgs; +using grpc::testing::ServerStats; +using grpc::testing::ServerStatus; using grpc::Status; // In some distros, gflags is in the namespace google, and in some others, @@ -124,34 +129,76 @@ class TestServiceImpl final : public TestService::Service { } // namespace +class ServerImpl : public QpsServer::Service { + public: + Status RunServer(ServerContext* ctx, ServerReaderWriter* stream) { + ServerArgs args; + std::unique_ptr last_stats; + if (!stream->Read(&args)) return Status::OK; + + bool done = false; + while (!done) { + std::lock_guard lock(server_mu_); + + char* server_address = NULL; + gpr_join_host_port(&server_address, "::", FLAGS_port); + + TestServiceImpl service; + + ServerBuilder builder; + builder.AddPort(server_address); + builder.RegisterService(&service); + + gpr_free(server_address); + + std::unique_ptr pool(new ThreadPool(args.threads())); + builder.SetThreadPool(pool.get()); + + auto server = builder.BuildAndStart(); + gpr_log(GPR_INFO, "Server listening on %s\n", server_address); + + ServerStatus last_status; + if (last_stats.get()) { + *last_status.mutable_stats() = *last_stats; + } + if (!stream->Write(last_status)) return Status(grpc::UNKNOWN); + + grpc_profiler_start("qps_server.prof"); + + done = stream->Read(&args); + + grpc_profiler_stop(); + } + + ServerStatus last_status; + if (last_stats.get()) { + *last_status.mutable_stats() = *last_stats; + } + stream->Write(last_status); + return Status::OK; + } + + private: + std::mutex server_mu_; +}; + static void RunServer() { char* server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_port); - - TestServiceImpl service; + gpr_join_host_port(&server_address, "::", FLAGS_driver_port); - SimpleRequest request; - SimpleResponse response; + ServerImpl service; ServerBuilder builder; builder.AddPort(server_address); builder.RegisterService(&service); - std::unique_ptr pool(new ThreadPool(FLAGS_server_threads)); - builder.SetThreadPool(pool.get()); - - std::unique_ptr server(builder.BuildAndStart()); - gpr_log(GPR_INFO, "Server listening on %s\n", server_address); + gpr_free(server_address); - grpc_profiler_start("qps_server.prof"); + auto server = builder.BuildAndStart(); while (!got_sigint) { std::this_thread::sleep_for(std::chrono::seconds(5)); } - - grpc_profiler_stop(); - - gpr_free(server_address); } int main(int argc, char** argv) { @@ -161,7 +208,6 @@ int main(int argc, char** argv) { signal(SIGINT, sigint_handler); GPR_ASSERT(FLAGS_port != 0); - GPR_ASSERT(!FLAGS_enable_ssl); RunServer(); grpc_shutdown();