Merge branch 'bm_trickle' of github.com:ctiller/grpc into bm_trickle

pull/9697/head
Craig Tiller 8 years ago
commit 22482a41b6
  1. 42
      test/core/util/trickle_endpoint.c
  2. 5
      test/core/util/trickle_endpoint.h
  3. 44
      test/cpp/microbenchmarks/bm_fullstack.cc

@ -47,7 +47,9 @@
typedef struct {
grpc_endpoint base;
double bytes_per_second;
grpc_endpoint *wrapped;
gpr_timespec last_write;
gpr_mu mu;
grpc_slice_buffer write_buffer;
@ -68,8 +70,11 @@ static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
for (size_t i = 0; i < slices->count; i++) {
grpc_slice_ref_internal(slices->slices[i]);
}
grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count);
gpr_mu_lock(&te->mu);
if (te->write_buffer.length == 0) {
te->last_write = gpr_now(GPR_CLOCK_MONOTONIC);
}
grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count);
grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error));
gpr_mu_unlock(&te->mu);
}
@ -147,10 +152,12 @@ static const grpc_endpoint_vtable vtable = {te_read,
te_get_peer,
te_get_fd};
grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap) {
grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
double bytes_per_second) {
trickle_endpoint *te = gpr_malloc(sizeof(*te));
te->base.vtable = &vtable;
te->wrapped = wrap;
te->bytes_per_second = bytes_per_second;
gpr_mu_init(&te->mu);
grpc_slice_buffer_init(&te->write_buffer);
grpc_slice_buffer_init(&te->writing_buffer);
@ -159,18 +166,29 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap) {
return &te->base;
}
size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
size_t bytes) {
static double ts2dbl(gpr_timespec s) {
return (double)s.tv_sec + 1e-9 * (double)s.tv_nsec;
}
size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep) {
trickle_endpoint *te = (trickle_endpoint *)ep;
gpr_mu_lock(&te->mu);
if (bytes > 0 && !te->writing) {
grpc_slice_buffer_move_first(&te->write_buffer,
GPR_MIN(bytes, te->write_buffer.length),
&te->writing_buffer);
te->writing = true;
grpc_endpoint_write(
exec_ctx, te->wrapped, &te->writing_buffer,
grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx));
if (!te->writing && te->write_buffer.length > 0) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
double elapsed = ts2dbl(gpr_time_sub(now, te->last_write));
size_t bytes = (size_t)(te->bytes_per_second * elapsed);
// gpr_log(GPR_DEBUG, "%lf elapsed --> %" PRIdPTR " bytes", elapsed, bytes);
if (bytes > 0) {
grpc_slice_buffer_move_first(&te->write_buffer,
GPR_MIN(bytes, te->write_buffer.length),
&te->writing_buffer);
te->writing = true;
te->last_write = now;
grpc_endpoint_write(
exec_ctx, te->wrapped, &te->writing_buffer,
grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx));
}
}
size_t backlog = te->write_buffer.length;
gpr_mu_unlock(&te->mu);

@ -36,10 +36,11 @@
#include "src/core/lib/iomgr/endpoint.h"
grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap);
grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
double bytes_per_second);
/* Allow up to \a bytes through the endpoint. Returns the new backlog. */
size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
grpc_endpoint *endpoint, size_t bytes);
grpc_endpoint *endpoint);
#endif

@ -306,8 +306,8 @@ class InProcessCHTTP2 : public EndpointPairFixture {
class TrickledCHTTP2 : public EndpointPairFixture {
public:
TrickledCHTTP2(Service* service)
: EndpointPairFixture(service, MakeEndpoints()) {}
TrickledCHTTP2(Service* service, size_t megabits_per_second)
: EndpointPairFixture(service, MakeEndpoints(megabits_per_second)) {}
void AddToLabel(std::ostream& out, benchmark::State& state) {
out << " writes/iter:"
@ -328,12 +328,12 @@ class TrickledCHTTP2 : public EndpointPairFixture {
(double)state.iterations());
}
void Step(size_t write_size) {
void Step() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
size_t client_backlog = grpc_trickle_endpoint_trickle(
&exec_ctx, endpoint_pair_.client, write_size);
size_t server_backlog = grpc_trickle_endpoint_trickle(
&exec_ctx, endpoint_pair_.server, write_size);
size_t client_backlog =
grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.client);
size_t server_backlog =
grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.server);
grpc_exec_ctx_finish(&exec_ctx);
UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_,
@ -351,12 +351,13 @@ class TrickledCHTTP2 : public EndpointPairFixture {
Stats client_stats_;
Stats server_stats_;
grpc_endpoint_pair MakeEndpoints() {
grpc_endpoint_pair MakeEndpoints(size_t kilobits) {
grpc_endpoint_pair p;
grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(),
&stats_);
p.client = grpc_trickle_endpoint_create(p.client);
p.server = grpc_trickle_endpoint_create(p.server);
double bytes_per_second = 125.0 * kilobits;
p.client = grpc_trickle_endpoint_create(p.client, bytes_per_second);
p.server = grpc_trickle_endpoint_create(p.server, bytes_per_second);
return p;
}
@ -854,12 +855,13 @@ static void BM_PumpStreamServerToClient(benchmark::State& state) {
state.SetBytesProcessed(state.range(0) * state.iterations());
}
static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok,
size_t size) {
static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) {
while (true) {
switch (fixture->cq()->AsyncNext(t, ok, gpr_now(GPR_CLOCK_MONOTONIC))) {
switch (fixture->cq()->AsyncNext(
t, ok, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(100, GPR_TIMESPAN)))) {
case CompletionQueue::TIMEOUT:
fixture->Step(size);
fixture->Step();
break;
case CompletionQueue::SHUTDOWN:
GPR_ASSERT(false);
@ -872,7 +874,8 @@ static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok,
static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
EchoTestService::AsyncService service;
std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2(&service));
std::unique_ptr<TrickledCHTTP2> fixture(
new TrickledCHTTP2(&service, state.range(1)));
{
EchoResponse send_response;
EchoResponse recv_response;
@ -892,7 +895,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
void* t;
bool ok;
while (need_tags) {
TrickleCQNext(fixture.get(), &t, &ok, state.range(1));
TrickleCQNext(fixture.get(), &t, &ok);
GPR_ASSERT(ok);
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
@ -903,7 +906,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
GPR_TIMER_SCOPE("BenchmarkCycle", 0);
response_rw.Write(send_response, tag(1));
while (true) {
TrickleCQNext(fixture.get(), &t, &ok, state.range(1));
TrickleCQNext(fixture.get(), &t, &ok);
if (t == tag(0)) {
request_rw->Read(&recv_response, tag(0));
} else if (t == tag(1)) {
@ -916,7 +919,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
response_rw.Finish(Status::OK, tag(1));
need_tags = (1 << 0) | (1 << 1);
while (need_tags) {
TrickleCQNext(fixture.get(), &t, &ok, state.range(1));
TrickleCQNext(fixture.get(), &t, &ok);
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
@ -1018,7 +1021,10 @@ BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2)
static void TrickleArgs(benchmark::internal::Benchmark* b) {
for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) {
for (int j = 1024; j <= 8 * 1024 * 1024; j *= 8) {
for (int j = 1; j <= 128 * 1024 * 1024; j *= 8) {
double expected_time =
static_cast<double>(14 + i) / (125.0 * static_cast<double>(j));
if (expected_time > 0.01) continue;
b->Args({i, j});
}
}

Loading…
Cancel
Save