Merge pull request #11483 from kpayson64/threads_per_cq_rebased

Support threads per CQ
pull/11327/head
kpayson64 8 years ago committed by GitHub
commit f24ab98a9f
  1. 6
      src/proto/grpc/testing/control.proto
  2. 43
      test/cpp/qps/client_async.cc
  3. 18
      test/cpp/qps/server_async.cc
  4. 1252
      tools/run_tests/generated/tests.json
  5. 70
      tools/run_tests/performance/scenario_config.py

@ -102,6 +102,9 @@ message ClientConfig {
repeated ChannelArg channel_args = 16;
// Number of threads that share each completion queue
int32 threads_per_cq = 17;
// Number of messages on a stream before it gets finished/restarted
int32 messages_per_stream = 18;
}
@ -142,6 +145,9 @@ message ServerConfig {
// If we use an OTHER_SERVER client_type, this string gives more detail
string other_server_api = 11;
// Number of threads that share each completion queue
int32 threads_per_cq = 12;
// c++-only options (for now) --------------------------------
// Buffer pool size (no buffer pool specified if unset)

@ -55,6 +55,11 @@ class ClientRpcContext {
}
virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
void lock() { mu_.lock(); }
void unlock() { mu_.unlock(); }
private:
std::mutex mu_;
};
template <class RequestType, class ResponseType>
@ -106,6 +111,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
start_req_, callback_);
std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@ -163,8 +169,14 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
num_async_threads_(NumThreads(config)) {
SetupLoadTest(config, num_async_threads_);
for (int i = 0; i < num_async_threads_; i++) {
int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
int num_cqs = (num_async_threads_ + tpc - 1) / tpc; // ceiling operator
for (int i = 0; i < num_cqs; i++) {
cli_cqs_.emplace_back(new CompletionQueue);
}
for (int i = 0; i < num_async_threads_; i++) {
cq_.emplace_back(i % cli_cqs_.size());
next_issuers_.emplace_back(NextIssuer(i));
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
@ -231,20 +243,36 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
void* got_tag;
bool ok;
if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
// Got a regular event, so process it
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
if (shutdown_state_[thread_idx]->shutdown) {
// We want to delete the context. However, it is possible that
// another thread that just initiated an action on this
// context still has its lock even though the action on the
// context has completed. To delay for that, just grab the
// lock for serialization. Take a new scope.
{ std::lock_guard<ClientRpcContext> lctx(*ctx); }
delete ctx;
return true;
} else if (!ctx->RunNextState(ok, entry)) {
}
bool del = false;
// Create a new scope for a lock_guard'ed region
{
std::lock_guard<ClientRpcContext> lctx(*ctx);
if (!ctx->RunNextState(ok, entry)) {
// The RPC and callback are done, so clone the ctx
// and kickstart the new one
ctx->StartNewClone(cli_cqs_[thread_idx].get());
// delete the old version
ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
// set the old version to delete
del = true;
}
}
if (del) {
delete ctx;
}
return true;
@ -255,6 +283,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<int> cq_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
@ -377,6 +406,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingPingPongImpl(
stub_, req_, next_issue_, start_req_, callback_);
std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq, messages_per_stream_);
}
@ -515,6 +545,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromClientImpl(
stub_, req_, next_issue_, start_req_, callback_);
std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@ -632,6 +663,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromServerImpl(
stub_, req_, next_issue_, start_req_, callback_);
std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@ -774,6 +806,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextGenericStreamingImpl(
stub_, req_, next_issue_, start_req_, callback_);
std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq, messages_per_stream_);
}

@ -16,6 +16,7 @@
*
*/
#include <algorithm>
#include <forward_list>
#include <functional>
#include <memory>
@ -89,9 +90,14 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads);
}
for (int i = 0; i < num_threads; i++) {
int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator
for (int i = 0; i < num_cqs; i++) {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}
for (int i = 0; i < num_threads; i++) {
cq_.emplace_back(i % srv_cqs_.size());
}
if (config.resource_quota_size() > 0) {
builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest")
@ -105,7 +111,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::placeholders::_2);
for (int i = 0; i < 5000; i++) {
for (int j = 0; j < num_threads; j++) {
for (int j = 0; j < num_cqs; j++) {
if (request_unary_function) {
auto request_unary = std::bind(
request_unary_function, &async_service_, std::placeholders::_1,
@ -190,7 +196,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) {
while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that
@ -199,6 +205,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
if (shutdown_state_[thread_idx]->shutdown) {
return;
}
std::lock_guard<ServerRpcContext> l2(*ctx);
const bool still_going = ctx->RunNextState(ok);
// if this RPC context is done, refresh it
if (!still_going) {
@ -211,9 +218,13 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
class ServerRpcContext {
public:
ServerRpcContext() {}
void lock() { mu_.lock(); }
void unlock() { mu_.unlock(); }
virtual ~ServerRpcContext(){};
virtual bool RunNextState(bool) = 0; // next state, return false if done
virtual void Reset() = 0; // start this back at a clean state
private:
std::mutex mu_;
};
static void *tag(ServerRpcContext *func) {
return reinterpret_cast<void *>(func);
@ -503,6 +514,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
std::vector<int> cq_;
ServiceType async_service_;
std::vector<std::unique_ptr<ServerRpcContext>> contexts_;

File diff suppressed because it is too large Load Diff

@ -38,7 +38,7 @@ HISTOGRAM_PARAMS = {
# actual target will be slightly higher)
OUTSTANDING_REQUESTS={
'async': 6400,
'async-1core': 800,
'async-limited': 800,
'sync': 1000
}
@ -93,6 +93,8 @@ def _ping_pong_scenario(name, rpc_type,
client_language=None,
server_language=None,
async_server_threads=0,
server_threads_per_cq=0,
client_threads_per_cq=0,
warmup_seconds=WARMUP_SECONDS,
categories=DEFAULT_CATEGORIES,
channels=None,
@ -112,6 +114,7 @@ def _ping_pong_scenario(name, rpc_type,
'outstanding_rpcs_per_channel': 1,
'client_channels': 1,
'async_client_threads': 1,
'threads_per_cq': client_threads_per_cq,
'rpc_type': rpc_type,
'load_params': {
'closed_loop': {}
@ -122,6 +125,7 @@ def _ping_pong_scenario(name, rpc_type,
'server_type': server_type,
'security_params': _get_secargs(secure),
'async_server_threads': async_server_threads,
'threads_per_cq': server_threads_per_cq,
},
'warmup_seconds': warmup_seconds,
'benchmark_seconds': BENCHMARK_SECONDS
@ -265,12 +269,72 @@ class CXXLanguage:
secure=secure,
categories=smoketest_categories+[SCALABLE])
yield _ping_pong_scenario(
'cpp_generic_async_streaming_qps_unconstrained_1cq_%s' % secstr,
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_GENERIC_SERVER',
unconstrained_client='async-limited', use_generic_payload=True,
secure=secure,
client_threads_per_cq=1000000, server_threads_per_cq=1000000,
categories=smoketest_categories+[SCALABLE])
yield _ping_pong_scenario(
'cpp_generic_async_streaming_qps_unconstrained_2waysharedcq_%s' % secstr,
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_GENERIC_SERVER',
unconstrained_client='async', use_generic_payload=True,
secure=secure,
client_threads_per_cq=2, server_threads_per_cq=2,
categories=smoketest_categories+[SCALABLE])
yield _ping_pong_scenario(
'cpp_protobuf_async_streaming_qps_unconstrained_1cq_%s' % secstr,
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_SERVER',
unconstrained_client='async-limited',
secure=secure,
client_threads_per_cq=1000000, server_threads_per_cq=1000000,
categories=smoketest_categories+[SCALABLE])
yield _ping_pong_scenario(
'cpp_protobuf_async_streaming_qps_unconstrained_2waysharedcq_%s' % secstr,
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_SERVER',
unconstrained_client='async',
secure=secure,
client_threads_per_cq=2, server_threads_per_cq=2,
categories=smoketest_categories+[SCALABLE])
yield _ping_pong_scenario(
'cpp_protobuf_async_unary_qps_unconstrained_1cq_%s' % secstr,
rpc_type='UNARY',
client_type='ASYNC_CLIENT',
server_type='ASYNC_SERVER',
unconstrained_client='async-limited',
secure=secure,
client_threads_per_cq=1000000, server_threads_per_cq=1000000,
categories=smoketest_categories+[SCALABLE])
yield _ping_pong_scenario(
'cpp_protobuf_async_unary_qps_unconstrained_2waysharedcq_%s' % secstr,
rpc_type='UNARY',
client_type='ASYNC_CLIENT',
server_type='ASYNC_SERVER',
unconstrained_client='async',
secure=secure,
client_threads_per_cq=2, server_threads_per_cq=2,
categories=smoketest_categories+[SCALABLE])
yield _ping_pong_scenario(
'cpp_generic_async_streaming_qps_one_server_core_%s' % secstr,
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_GENERIC_SERVER',
unconstrained_client='async-1core', use_generic_payload=True,
unconstrained_client='async-limited', use_generic_payload=True,
async_server_threads=1,
secure=secure)
@ -753,7 +817,7 @@ class JavaLanguage:
yield _ping_pong_scenario(
'java_generic_async_streaming_qps_one_server_core_%s' % secstr, rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
unconstrained_client='async-1core', use_generic_payload=True,
unconstrained_client='async-limited', use_generic_payload=True,
async_server_threads=1,
secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS)

Loading…
Cancel
Save