From 4cb0a7a5a12802f7deaf77659adb9b07ee58393f Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Mon, 13 Feb 2017 17:14:20 -0800
Subject: [PATCH] Better test

---
 test/core/util/trickle_endpoint.c        | 40 +++++++++++++++-------
 test/core/util/trickle_endpoint.h        |  5 +--
 test/cpp/microbenchmarks/bm_fullstack.cc | 43 +++++++++++++-----------
 3 files changed, 55 insertions(+), 33 deletions(-)

diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c
index 8d661e04c69..5b6c666950a 100644
--- a/test/core/util/trickle_endpoint.c
+++ b/test/core/util/trickle_endpoint.c
@@ -47,7 +47,9 @@
 
 typedef struct {
   grpc_endpoint base;
+  double bytes_per_second;
   grpc_endpoint *wrapped;
+  gpr_timespec last_write;
 
   gpr_mu mu;
   grpc_slice_buffer write_buffer;
@@ -68,8 +70,11 @@ static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
   for (size_t i = 0; i < slices->count; i++) {
     grpc_slice_ref_internal(slices->slices[i]);
   }
-  grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count);
   gpr_mu_lock(&te->mu);
+  if (te->write_buffer.length == 0) {
+    te->last_write = gpr_now(GPR_CLOCK_MONOTONIC);
+  }
+  grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count);
   grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error));
   gpr_mu_unlock(&te->mu);
 }
@@ -147,10 +152,12 @@ static const grpc_endpoint_vtable vtable = {te_read,
                                             te_get_peer,
                                             te_get_fd};
 
-grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap) {
+grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
+                                            double bytes_per_second) {
   trickle_endpoint *te = gpr_malloc(sizeof(*te));
   te->base.vtable = &vtable;
   te->wrapped = wrap;
+  te->bytes_per_second = bytes_per_second;
   gpr_mu_init(&te->mu);
   grpc_slice_buffer_init(&te->write_buffer);
   grpc_slice_buffer_init(&te->writing_buffer);
@@ -159,18 +166,27 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap) {
   return &te->base;
 }
 
-size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
-                                     size_t bytes) {
+static double ts2dbl(gpr_timespec s) { return s.tv_sec + 1e-9 * s.tv_nsec; }
+
+size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
+                                     grpc_endpoint *ep) {
   trickle_endpoint *te = (trickle_endpoint *)ep;
   gpr_mu_lock(&te->mu);
-  if (bytes > 0 && !te->writing) {
-    grpc_slice_buffer_move_first(&te->write_buffer,
-                                 GPR_MIN(bytes, te->write_buffer.length),
-                                 &te->writing_buffer);
-    te->writing = true;
-    grpc_endpoint_write(
-        exec_ctx, te->wrapped, &te->writing_buffer,
-        grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx));
+  if (!te->writing && te->write_buffer.length > 0) {
+    gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+    double elapsed = ts2dbl(gpr_time_sub(now, te->last_write));
+    size_t bytes = (size_t)(te->bytes_per_second * elapsed);
+    // gpr_log(GPR_DEBUG, "%lf elapsed --> %" PRIdPTR " bytes", elapsed, bytes);
+    if (bytes > 0) {
+      grpc_slice_buffer_move_first(&te->write_buffer,
+                                   GPR_MIN(bytes, te->write_buffer.length),
+                                   &te->writing_buffer);
+      te->writing = true;
+      te->last_write = now;
+      grpc_endpoint_write(
+          exec_ctx, te->wrapped, &te->writing_buffer,
+          grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx));
+    }
   }
   size_t backlog = te->write_buffer.length;
   gpr_mu_unlock(&te->mu);
diff --git a/test/core/util/trickle_endpoint.h b/test/core/util/trickle_endpoint.h
index 5f16818ebb7..7e8d9d91e33 100644
--- a/test/core/util/trickle_endpoint.h
+++ b/test/core/util/trickle_endpoint.h
@@ -36,10 +36,11 @@
 
 #include "src/core/lib/iomgr/endpoint.h"
 
-grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap);
+grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
+                                            double bytes_per_second);
 
 /* Allow up to \a bytes through the endpoint. Returns the new backlog. */
 size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
-                                     grpc_endpoint *endpoint, size_t bytes);
+                                     grpc_endpoint *endpoint);
 
 #endif
diff --git a/test/cpp/microbenchmarks/bm_fullstack.cc b/test/cpp/microbenchmarks/bm_fullstack.cc
index f83c3599616..b4f57c3e476 100644
--- a/test/cpp/microbenchmarks/bm_fullstack.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack.cc
@@ -306,8 +306,8 @@ class InProcessCHTTP2 : public EndpointPairFixture {
 
 class TrickledCHTTP2 : public EndpointPairFixture {
  public:
-  TrickledCHTTP2(Service* service)
-      : EndpointPairFixture(service, MakeEndpoints()) {}
+  TrickledCHTTP2(Service* service, size_t megabits_per_second)
+      : EndpointPairFixture(service, MakeEndpoints(megabits_per_second)) {}
 
   void AddToLabel(std::ostream& out, benchmark::State& state) {
     out << " writes/iter:"
@@ -328,12 +328,12 @@ class TrickledCHTTP2 : public EndpointPairFixture {
             (double)state.iterations());
   }
 
-  void Step(size_t write_size) {
+  void Step() {
     grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-    size_t client_backlog = grpc_trickle_endpoint_trickle(
-        &exec_ctx, endpoint_pair_.client, write_size);
-    size_t server_backlog = grpc_trickle_endpoint_trickle(
-        &exec_ctx, endpoint_pair_.server, write_size);
+    size_t client_backlog =
+        grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.client);
+    size_t server_backlog =
+        grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.server);
     grpc_exec_ctx_finish(&exec_ctx);
 
     UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_,
@@ -351,12 +351,13 @@ class TrickledCHTTP2 : public EndpointPairFixture {
   Stats client_stats_;
   Stats server_stats_;
 
-  grpc_endpoint_pair MakeEndpoints() {
+  grpc_endpoint_pair MakeEndpoints(size_t kilobits) {
     grpc_endpoint_pair p;
     grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(),
                                   &stats_);
-    p.client = grpc_trickle_endpoint_create(p.client);
-    p.server = grpc_trickle_endpoint_create(p.server);
+    double bytes_per_second = 125 * kilobits;
+    p.client = grpc_trickle_endpoint_create(p.client, bytes_per_second);
+    p.server = grpc_trickle_endpoint_create(p.server, bytes_per_second);
     return p;
   }
 
@@ -854,12 +855,13 @@ static void BM_PumpStreamServerToClient(benchmark::State& state) {
   state.SetBytesProcessed(state.range(0) * state.iterations());
 }
 
-static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok,
-                          size_t size) {
+static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) {
   while (true) {
-    switch (fixture->cq()->AsyncNext(t, ok, gpr_now(GPR_CLOCK_MONOTONIC))) {
+    switch (fixture->cq()->AsyncNext(
+        t, ok, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+                            gpr_time_from_micros(100, GPR_TIMESPAN)))) {
       case CompletionQueue::TIMEOUT:
-        fixture->Step(size);
+        fixture->Step();
         break;
       case CompletionQueue::SHUTDOWN:
         GPR_ASSERT(false);
@@ -872,7 +874,8 @@ static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok,
 
 static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
   EchoTestService::AsyncService service;
-  std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2(&service));
+  std::unique_ptr<TrickledCHTTP2> fixture(
+      new TrickledCHTTP2(&service, state.range(1)));
   {
     EchoResponse send_response;
     EchoResponse recv_response;
@@ -892,7 +895,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
     void* t;
     bool ok;
     while (need_tags) {
-      TrickleCQNext(fixture.get(), &t, &ok, state.range(1));
+      TrickleCQNext(fixture.get(), &t, &ok);
       GPR_ASSERT(ok);
       int i = (int)(intptr_t)t;
       GPR_ASSERT(need_tags & (1 << i));
@@ -903,7 +906,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
       GPR_TIMER_SCOPE("BenchmarkCycle", 0);
       response_rw.Write(send_response, tag(1));
       while (true) {
-        TrickleCQNext(fixture.get(), &t, &ok, state.range(1));
+        TrickleCQNext(fixture.get(), &t, &ok);
         if (t == tag(0)) {
           request_rw->Read(&recv_response, tag(0));
         } else if (t == tag(1)) {
@@ -916,7 +919,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
     response_rw.Finish(Status::OK, tag(1));
     need_tags = (1 << 0) | (1 << 1);
     while (need_tags) {
-      TrickleCQNext(fixture.get(), &t, &ok, state.range(1));
+      TrickleCQNext(fixture.get(), &t, &ok);
       int i = (int)(intptr_t)t;
       GPR_ASSERT(need_tags & (1 << i));
       need_tags &= ~(1 << i);
@@ -1018,7 +1021,9 @@ BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2)
 
 static void TrickleArgs(benchmark::internal::Benchmark* b) {
   for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) {
-    for (int j = 1024; j <= 8 * 1024 * 1024; j *= 8) {
+    for (int j = 1; j <= 40000; j *= 8) {
+      double expected_time = (double)(20 + i) / (125 * (double)j);
+      if (expected_time > 0.1) continue;
       b->Args({i, j});
     }
   }