|
|
|
@ -53,7 +53,7 @@ DEFINE_int32( |
|
|
|
|
"Number of megabytes to pump before collecting flow control stats"); |
|
|
|
|
DEFINE_int32( |
|
|
|
|
warmup_iterations, 100, |
|
|
|
|
"Number of megabytes to pump before collecting flow control stats"); |
|
|
|
|
"Number of iterations to run before collecting flow control stats"); |
|
|
|
|
DEFINE_int32(warmup_max_time_seconds, 10, |
|
|
|
|
"Maximum number of seconds to run warmup loop"); |
|
|
|
|
|
|
|
|
@ -77,13 +77,14 @@ static void write_csv(std::ostream* out, A0&& a0, Arg&&... arg) { |
|
|
|
|
|
|
|
|
|
class TrickledCHTTP2 : public EndpointPairFixture { |
|
|
|
|
public: |
|
|
|
|
TrickledCHTTP2(Service* service, size_t message_size, |
|
|
|
|
size_t kilobits_per_second) |
|
|
|
|
TrickledCHTTP2(Service* service, bool streaming, size_t req_size, |
|
|
|
|
size_t resp_size, size_t kilobits_per_second) |
|
|
|
|
: EndpointPairFixture(service, MakeEndpoints(kilobits_per_second), |
|
|
|
|
FixtureConfiguration()) { |
|
|
|
|
if (FLAGS_log) { |
|
|
|
|
std::ostringstream fn; |
|
|
|
|
fn << "trickle." << message_size << "." << kilobits_per_second << ".csv"; |
|
|
|
|
fn << "trickle." << (streaming ? "streaming" : "unary") << "." << req_size |
|
|
|
|
<< "." << resp_size << "." << kilobits_per_second << ".csv"; |
|
|
|
|
log_.reset(new std::ofstream(fn.str().c_str())); |
|
|
|
|
write_csv(log_.get(), "t", "iteration", "client_backlog", |
|
|
|
|
"server_backlog", "client_t_stall", "client_s_stall", |
|
|
|
@ -242,8 +243,9 @@ static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok, |
|
|
|
|
|
|
|
|
|
static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { |
|
|
|
|
EchoTestService::AsyncService service; |
|
|
|
|
std::unique_ptr<TrickledCHTTP2> fixture( |
|
|
|
|
new TrickledCHTTP2(&service, state.range(0), state.range(1))); |
|
|
|
|
std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2( |
|
|
|
|
&service, true, state.range(0) /* req_size */, |
|
|
|
|
state.range(0) /* resp_size */, state.range(1) /* bw in kbit/s */)); |
|
|
|
|
{ |
|
|
|
|
EchoResponse send_response; |
|
|
|
|
EchoResponse recv_response; |
|
|
|
@ -314,11 +316,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { |
|
|
|
|
state.SetBytesProcessed(state.range(0) * state.iterations()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* CONFIGURATIONS |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
static void TrickleArgs(benchmark::internal::Benchmark* b) { |
|
|
|
|
static void StreamingTrickleArgs(benchmark::internal::Benchmark* b) { |
|
|
|
|
for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) { |
|
|
|
|
for (int j = 64; j <= 128 * 1024 * 1024; j *= 8) { |
|
|
|
|
double expected_time = |
|
|
|
@ -328,8 +326,111 @@ static void TrickleArgs(benchmark::internal::Benchmark* b) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(StreamingTrickleArgs); |
|
|
|
|
|
|
|
|
|
BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(TrickleArgs); |
|
|
|
|
static void BM_PumpUnbalancedUnary_Trickle(benchmark::State& state) { |
|
|
|
|
EchoTestService::AsyncService service; |
|
|
|
|
std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2( |
|
|
|
|
&service, true, state.range(0) /* req_size */, |
|
|
|
|
state.range(1) /* resp_size */, state.range(2) /* bw in kbit/s */)); |
|
|
|
|
EchoRequest send_request; |
|
|
|
|
EchoResponse send_response; |
|
|
|
|
EchoResponse recv_response; |
|
|
|
|
if (state.range(0) > 0) { |
|
|
|
|
send_request.set_message(std::string(state.range(0), 'a')); |
|
|
|
|
} |
|
|
|
|
if (state.range(1) > 0) { |
|
|
|
|
send_response.set_message(std::string(state.range(1), 'a')); |
|
|
|
|
} |
|
|
|
|
Status recv_status; |
|
|
|
|
struct ServerEnv { |
|
|
|
|
ServerContext ctx; |
|
|
|
|
EchoRequest recv_request; |
|
|
|
|
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer; |
|
|
|
|
ServerEnv() : response_writer(&ctx) {} |
|
|
|
|
}; |
|
|
|
|
uint8_t server_env_buffer[2 * sizeof(ServerEnv)]; |
|
|
|
|
ServerEnv* server_env[2] = { |
|
|
|
|
reinterpret_cast<ServerEnv*>(server_env_buffer), |
|
|
|
|
reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))}; |
|
|
|
|
new (server_env[0]) ServerEnv; |
|
|
|
|
new (server_env[1]) ServerEnv; |
|
|
|
|
service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request, |
|
|
|
|
&server_env[0]->response_writer, fixture->cq(), |
|
|
|
|
fixture->cq(), tag(0)); |
|
|
|
|
service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request, |
|
|
|
|
&server_env[1]->response_writer, fixture->cq(), |
|
|
|
|
fixture->cq(), tag(1)); |
|
|
|
|
std::unique_ptr<EchoTestService::Stub> stub( |
|
|
|
|
EchoTestService::NewStub(fixture->channel())); |
|
|
|
|
auto inner_loop = [&](bool in_warmup) { |
|
|
|
|
GPR_TIMER_SCOPE("BenchmarkCycle", 0); |
|
|
|
|
recv_response.Clear(); |
|
|
|
|
ClientContext cli_ctx; |
|
|
|
|
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( |
|
|
|
|
stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); |
|
|
|
|
void* t; |
|
|
|
|
bool ok; |
|
|
|
|
TrickleCQNext(fixture.get(), &t, &ok, state.iterations()); |
|
|
|
|
GPR_ASSERT(ok); |
|
|
|
|
GPR_ASSERT(t == tag(0) || t == tag(1)); |
|
|
|
|
intptr_t slot = reinterpret_cast<intptr_t>(t); |
|
|
|
|
ServerEnv* senv = server_env[slot]; |
|
|
|
|
senv->response_writer.Finish(send_response, Status::OK, tag(3)); |
|
|
|
|
response_reader->Finish(&recv_response, &recv_status, tag(4)); |
|
|
|
|
for (int i = (1 << 3) | (1 << 4); i != 0;) { |
|
|
|
|
TrickleCQNext(fixture.get(), &t, &ok, state.iterations()); |
|
|
|
|
GPR_ASSERT(ok); |
|
|
|
|
int tagnum = (int)reinterpret_cast<intptr_t>(t); |
|
|
|
|
GPR_ASSERT(i & (1 << tagnum)); |
|
|
|
|
i -= 1 << tagnum; |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(recv_status.ok()); |
|
|
|
|
|
|
|
|
|
senv->~ServerEnv(); |
|
|
|
|
senv = new (senv) ServerEnv(); |
|
|
|
|
service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer, |
|
|
|
|
fixture->cq(), fixture->cq(), tag(slot)); |
|
|
|
|
}; |
|
|
|
|
gpr_timespec warmup_start = gpr_now(GPR_CLOCK_MONOTONIC); |
|
|
|
|
for (int i = 0; |
|
|
|
|
i < GPR_MAX(FLAGS_warmup_iterations, FLAGS_warmup_megabytes * 1024 * |
|
|
|
|
1024 / (14 + state.range(0))); |
|
|
|
|
i++) { |
|
|
|
|
inner_loop(true); |
|
|
|
|
if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), warmup_start), |
|
|
|
|
gpr_time_from_seconds(FLAGS_warmup_max_time_seconds, |
|
|
|
|
GPR_TIMESPAN)) > 0) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
while (state.KeepRunning()) { |
|
|
|
|
inner_loop(false); |
|
|
|
|
} |
|
|
|
|
fixture->Finish(state); |
|
|
|
|
fixture.reset(); |
|
|
|
|
server_env[0]->~ServerEnv(); |
|
|
|
|
server_env[1]->~ServerEnv(); |
|
|
|
|
state.SetBytesProcessed(state.range(0) * state.iterations() + |
|
|
|
|
state.range(1) * state.iterations()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void UnaryTrickleArgs(benchmark::internal::Benchmark* b) { |
|
|
|
|
const int cli_1024k = 1024 * 1024; |
|
|
|
|
const int cli_32M = 32 * 1024 * 1024; |
|
|
|
|
const int svr_256k = 256 * 1024; |
|
|
|
|
const int svr_4M = 4 * 1024 * 1024; |
|
|
|
|
const int svr_64M = 64 * 1024 * 1024; |
|
|
|
|
for (int bw = 64; bw <= 128 * 1024 * 1024; bw *= 16) { |
|
|
|
|
b->Args({bw, cli_1024k, svr_256k}); |
|
|
|
|
b->Args({bw, cli_1024k, svr_4M}); |
|
|
|
|
b->Args({bw, cli_1024k, svr_64M}); |
|
|
|
|
b->Args({bw, cli_32M, svr_256k}); |
|
|
|
|
b->Args({bw, cli_32M, svr_4M}); |
|
|
|
|
b->Args({bw, cli_32M, svr_64M}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
BENCHMARK(BM_PumpUnbalancedUnary_Trickle)->Apply(UnaryTrickleArgs); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|