From 32baa5e62209b67ff247e4a03803c70cb364b457 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 27 Jan 2016 10:01:41 -0800 Subject: [PATCH] Allow dynamic sizing of async client and server thread pools --- test/cpp/qps/client_async.cc | 62 ++++++++++++++++++++++++------------ test/cpp/qps/server_async.cc | 38 ++++++++++++++-------- 2 files changed, 65 insertions(+), 35 deletions(-) diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index f270cd09875..13e71fec7d9 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -46,13 +46,14 @@ #include #include #include +#include #include #include +#include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/timer.h" #include "test/cpp/util/create_test_channel.h" -#include "src/proto/grpc/testing/services.grpc.pb.h" namespace grpc { namespace testing { @@ -93,7 +94,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function< std::unique_ptr>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req, + CompletionQueue*)> + start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -139,7 +141,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function callback_; std::function>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req_; + CompletionQueue*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr> @@ -158,20 +161,22 @@ class AsyncClient : public ClientImpl { using Client::closed_loop_; using ClientImpl::channels_; using ClientImpl::request_; - AsyncClient(const ClientConfig& config, - std::function setup_ctx, - std::function(std::shared_ptr)> - create_stub) + AsyncClient( + const ClientConfig& config, + std::function + setup_ctx, + std::function(std::shared_ptr)> + create_stub) : ClientImpl(config, create_stub), + num_async_threads_(NumThreads(config)), channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), channel_count_(config.client_channels()), - pref_channel_inc_(config.async_client_threads()) { - SetupLoadTest(config, config.async_client_threads()); + pref_channel_inc_(num_async_threads_) { + SetupLoadTest(config, num_async_threads_); - for (int i = 0; i < config.async_client_threads(); i++) { + for (int i = 0; i < num_async_threads_; i++) { cli_cqs_.emplace_back(new CompletionQueue); if (!closed_loop_) { rpc_deadlines_.emplace_back(); @@ -324,6 +329,9 @@ class AsyncClient : public ClientImpl { return true; } + protected: + int num_async_threads_; + private: class boolean { // exists only to avoid data-race on vector public: @@ -338,6 +346,15 @@ class AsyncClient : public ClientImpl { private: bool val_; }; + static int NumThreads(const ClientConfig& config) { + int num_threads = config.async_client_threads(); + if (num_threads <= 0) { // Use dynamic sizing + num_threads = gpr_cpu_num_cores(); + gpr_log(GPR_INFO, "Sizing client server to %d threads\n", num_threads); + } + return num_threads; + } + std::vector> cli_cqs_; std::vector rpc_deadlines_; // per thread deadlines @@ -363,7 +380,7 @@ class AsyncUnaryClient GRPC_FINAL public: explicit AsyncUnaryClient(const ClientConfig& config) : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { - StartThreads(config.async_client_threads()); + StartThreads(num_async_threads_); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } @@ -391,7 +408,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { std::function>( BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> start_req, + void*)> + start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -443,10 +461,10 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { ResponseType response_; bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*); std::function callback_; - std::function< - std::unique_ptr>( - BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> start_req_; + std::function>( + BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr> @@ -461,7 +479,7 @@ class AsyncStreamingClient GRPC_FINAL // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); - StartThreads(config.async_client_threads()); + StartThreads(num_async_threads_); } ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } @@ -490,7 +508,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, std::function( grpc::GenericStub*, grpc::ClientContext*, - const grpc::string& method_name, CompletionQueue*, void*)> start_req, + const grpc::string& method_name, CompletionQueue*, void*)> + start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -547,7 +566,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { std::function callback_; std::function( grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, - CompletionQueue*, void*)> start_req_; + CompletionQueue*, void*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr stream_; @@ -566,7 +586,7 @@ class GenericAsyncStreamingClient GRPC_FINAL // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); - StartThreads(config.async_client_threads()); + StartThreads(num_async_threads_); } ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index cd8d546c283..8e1503b4604 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -50,8 +50,8 @@ #include #include -#include "test/cpp/qps/server.h" #include "src/proto/grpc/testing/services.grpc.pb.h" +#include "test/cpp/qps/server.h" namespace grpc { namespace testing { @@ -72,7 +72,8 @@ class AsyncQpsServerTest : public Server { CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, std::function process_rpc) + ResponseType *)> + process_rpc) : Server(config) { char *server_address = NULL; @@ -85,7 +86,13 @@ class AsyncQpsServerTest : public Server { register_service(&builder, &async_service_); - for (int i = 0; i < config.async_server_threads(); i++) { + int num_threads = config.async_server_threads(); + if (num_threads <= 0) { // dynamic sizing + num_threads = cores(); + gpr_log(GPR_INFO, "Sizing async server to %d threads\n", num_threads); + } + + for (int i = 0; i < num_threads; i++) { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } @@ -96,8 +103,8 @@ class AsyncQpsServerTest : public Server { auto process_rpc_bound = std::bind(process_rpc, config.payload_config(), _1, _2); - for (int i = 0; i < 10000 / config.async_server_threads(); i++) { - for (int j = 0; j < config.async_server_threads(); j++) { + for (int i = 0; i < 10000 / num_threads; i++) { + for (int j = 0; j < num_threads; j++) { if (request_unary_function) { auto request_unary = std::bind(request_unary_function, &async_service_, _1, _2, _3, @@ -115,10 +122,10 @@ class AsyncQpsServerTest : public Server { } } - for (int i = 0; i < config.async_server_threads(); i++) { + for (int i = 0; i < num_threads; i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); } - for (int i = 0; i < config.async_server_threads(); i++) { + for (int i = 0; i < num_threads; i++) { threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } @@ -184,7 +191,8 @@ class AsyncQpsServerTest : public Server { ServerRpcContextUnaryImpl( std::function *, - void *)> request_method, + void *)> + request_method, std::function invoke_method) : srv_ctx_(new ServerContextType), @@ -381,12 +389,14 @@ static Status ProcessGenericRPC(const PayloadConfig &payload_config, } std::unique_ptr CreateAsyncServer(const ServerConfig &config) { - return std::unique_ptr(new AsyncQpsServerTest< - SimpleRequest, SimpleResponse, BenchmarkService::AsyncService, - grpc::ServerContext>( - config, RegisterBenchmarkService, - &BenchmarkService::AsyncService::RequestUnaryCall, - &BenchmarkService::AsyncService::RequestStreamingCall, ProcessSimpleRPC)); + return std::unique_ptr( + new AsyncQpsServerTest( + config, RegisterBenchmarkService, + &BenchmarkService::AsyncService::RequestUnaryCall, + &BenchmarkService::AsyncService::RequestStreamingCall, + ProcessSimpleRPC)); } std::unique_ptr CreateAsyncGenericServer(const ServerConfig &config) { return std::unique_ptr(