diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index 8d661e04c69..5b6c666950a 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -47,7 +47,9 @@ typedef struct { grpc_endpoint base; + double bytes_per_second; grpc_endpoint *wrapped; + gpr_timespec last_write; gpr_mu mu; grpc_slice_buffer write_buffer; @@ -68,8 +70,11 @@ static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, for (size_t i = 0; i < slices->count; i++) { grpc_slice_ref_internal(slices->slices[i]); } - grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count); gpr_mu_lock(&te->mu); + if (te->write_buffer.length == 0) { + te->last_write = gpr_now(GPR_CLOCK_MONOTONIC); + } + grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count); grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error)); gpr_mu_unlock(&te->mu); } @@ -147,10 +152,12 @@ static const grpc_endpoint_vtable vtable = {te_read, te_get_peer, te_get_fd}; -grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap) { +grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, + double bytes_per_second) { trickle_endpoint *te = gpr_malloc(sizeof(*te)); te->base.vtable = &vtable; te->wrapped = wrap; + te->bytes_per_second = bytes_per_second; gpr_mu_init(&te->mu); grpc_slice_buffer_init(&te->write_buffer); grpc_slice_buffer_init(&te->writing_buffer); @@ -159,18 +166,27 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap) { return &te->base; } -size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - size_t bytes) { +static double ts2dbl(gpr_timespec s) { return s.tv_sec + 1e-9 * s.tv_nsec; } + +size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep) { trickle_endpoint *te = (trickle_endpoint *)ep; gpr_mu_lock(&te->mu); - if (bytes > 0 && !te->writing) { - grpc_slice_buffer_move_first(&te->write_buffer, - GPR_MIN(bytes, te->write_buffer.length), - &te->writing_buffer); - te->writing = true; - grpc_endpoint_write( - exec_ctx, te->wrapped, &te->writing_buffer, - grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); + if (!te->writing && te->write_buffer.length > 0) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + double elapsed = ts2dbl(gpr_time_sub(now, te->last_write)); + size_t bytes = (size_t)(te->bytes_per_second * elapsed); + // gpr_log(GPR_DEBUG, "%lf elapsed --> %" PRIdPTR " bytes", elapsed, bytes); + if (bytes > 0) { + grpc_slice_buffer_move_first(&te->write_buffer, + GPR_MIN(bytes, te->write_buffer.length), + &te->writing_buffer); + te->writing = true; + te->last_write = now; + grpc_endpoint_write( + exec_ctx, te->wrapped, &te->writing_buffer, + grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); + } } size_t backlog = te->write_buffer.length; gpr_mu_unlock(&te->mu); diff --git a/test/core/util/trickle_endpoint.h b/test/core/util/trickle_endpoint.h index 5f16818ebb7..7e8d9d91e33 100644 --- a/test/core/util/trickle_endpoint.h +++ b/test/core/util/trickle_endpoint.h @@ -36,10 +36,11 @@ #include "src/core/lib/iomgr/endpoint.h" -grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap); +grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, + double bytes_per_second); /* Allow up to \a bytes through the endpoint. Returns the new backlog. */ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, - grpc_endpoint *endpoint, size_t bytes); + grpc_endpoint *endpoint); #endif diff --git a/test/cpp/microbenchmarks/bm_fullstack.cc b/test/cpp/microbenchmarks/bm_fullstack.cc index f83c3599616..b4f57c3e476 100644 --- a/test/cpp/microbenchmarks/bm_fullstack.cc +++ b/test/cpp/microbenchmarks/bm_fullstack.cc @@ -306,8 +306,8 @@ class InProcessCHTTP2 : public EndpointPairFixture { class TrickledCHTTP2 : public EndpointPairFixture { public: - TrickledCHTTP2(Service* service) - : EndpointPairFixture(service, MakeEndpoints()) {} + TrickledCHTTP2(Service* service, size_t megabits_per_second) + : EndpointPairFixture(service, MakeEndpoints(megabits_per_second)) {} void AddToLabel(std::ostream& out, benchmark::State& state) { out << " writes/iter:" @@ -328,12 +328,12 @@ class TrickledCHTTP2 : public EndpointPairFixture { (double)state.iterations()); } - void Step(size_t write_size) { + void Step() { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - size_t client_backlog = grpc_trickle_endpoint_trickle( - &exec_ctx, endpoint_pair_.client, write_size); - size_t server_backlog = grpc_trickle_endpoint_trickle( - &exec_ctx, endpoint_pair_.server, write_size); + size_t client_backlog = + grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.client); + size_t server_backlog = + grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.server); grpc_exec_ctx_finish(&exec_ctx); UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_, @@ -351,12 +351,13 @@ class TrickledCHTTP2 : public EndpointPairFixture { Stats client_stats_; Stats server_stats_; - grpc_endpoint_pair MakeEndpoints() { + grpc_endpoint_pair MakeEndpoints(size_t kilobits) { grpc_endpoint_pair p; grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(), &stats_); - p.client = grpc_trickle_endpoint_create(p.client); - p.server = grpc_trickle_endpoint_create(p.server); + double bytes_per_second = 125 * kilobits; + p.client = grpc_trickle_endpoint_create(p.client, bytes_per_second); + p.server = grpc_trickle_endpoint_create(p.server, bytes_per_second); return p; } @@ -854,12 +855,13 @@ static void BM_PumpStreamServerToClient(benchmark::State& state) { state.SetBytesProcessed(state.range(0) * state.iterations()); } -static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok, - size_t size) { +static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) { while (true) { - switch (fixture->cq()->AsyncNext(t, ok, gpr_now(GPR_CLOCK_MONOTONIC))) { + switch (fixture->cq()->AsyncNext( + t, ok, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_micros(100, GPR_TIMESPAN)))) { case CompletionQueue::TIMEOUT: - fixture->Step(size); + fixture->Step(); break; case CompletionQueue::SHUTDOWN: GPR_ASSERT(false); @@ -872,7 +874,8 @@ static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok, static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { EchoTestService::AsyncService service; - std::unique_ptr fixture(new TrickledCHTTP2(&service)); + std::unique_ptr fixture( + new TrickledCHTTP2(&service, state.range(1))); { EchoResponse send_response; EchoResponse recv_response; @@ -892,7 +895,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { void* t; bool ok; while (need_tags) { - TrickleCQNext(fixture.get(), &t, &ok, state.range(1)); + TrickleCQNext(fixture.get(), &t, &ok); GPR_ASSERT(ok); int i = (int)(intptr_t)t; GPR_ASSERT(need_tags & (1 << i)); @@ -903,7 +906,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { GPR_TIMER_SCOPE("BenchmarkCycle", 0); response_rw.Write(send_response, tag(1)); while (true) { - TrickleCQNext(fixture.get(), &t, &ok, state.range(1)); + TrickleCQNext(fixture.get(), &t, &ok); if (t == tag(0)) { request_rw->Read(&recv_response, tag(0)); } else if (t == tag(1)) { @@ -916,7 +919,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { response_rw.Finish(Status::OK, tag(1)); need_tags = (1 << 0) | (1 << 1); while (need_tags) { - TrickleCQNext(fixture.get(), &t, &ok, state.range(1)); + TrickleCQNext(fixture.get(), &t, &ok); int i = (int)(intptr_t)t; GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); @@ -1018,7 +1021,9 @@ BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2) static void TrickleArgs(benchmark::internal::Benchmark* b) { for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) { - for (int j = 1024; j <= 8 * 1024 * 1024; j *= 8) { + for (int j = 1; j <= 40000; j *= 8) { + double expected_time = (double)(20 + i) / (125 * (double)j); + if (expected_time > 0.1) continue; b->Args({i, j}); } }