From e46f2d2c8fea5aa61c72f1fe1696aac9fdef9efe Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Wed, 20 Sep 2017 14:24:19 -0700
Subject: [PATCH 1/4] Fix memory leak

---
 src/core/lib/iomgr/resource_quota.c | 31 +++++++++++++++++++++++++++++
 1 file changed, 31 insertions(+)

diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 4895e0d1c92..2be2cad0ad2 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -87,6 +87,8 @@ struct grpc_resource_user {
   grpc_closure_list on_allocated;
   /* True if we are currently trying to allocate from the quota, false if not */
   bool allocating;
+  /* How many bytes of allocations are outstanding */
+  int64_t outstanding_allocations;
   /* True if we are currently trying to add ourselves to the non-free quota
      list, false otherwise */
   bool added_to_free_pool;
@@ -151,6 +153,9 @@ struct grpc_resource_quota {
   char *name;
 };
 
+static void ru_unref_by(grpc_exec_ctx *exec_ctx,
+                        grpc_resource_user *resource_user, gpr_atm amount);
+
 /*******************************************************************************
  * list management
  */
@@ -287,6 +292,25 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
   while ((resource_user = rulist_pop_head(resource_quota,
                                           GRPC_RULIST_AWAITING_ALLOCATION))) {
     gpr_mu_lock(&resource_user->mu);
+    if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+      gpr_log(GPR_DEBUG, "RQ: check allocation for user %p shutdown=%" PRIdPTR
+                         " free_pool=%" PRId64,
+              resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
+              resource_user->free_pool);
+    }
+    if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
+      resource_user->allocating = false;
+      grpc_closure_list_fail_all(
+          &resource_user->on_allocated,
+          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
+      int64_t aborted_allocations = resource_user->outstanding_allocations;
+      resource_user->outstanding_allocations = 0;
+      resource_user->free_pool += aborted_allocations;
+      GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated);
+      gpr_mu_unlock(&resource_user->mu);
+      ru_unref_by(exec_ctx, resource_user, aborted_allocations);
+      continue;
+    }
     if (resource_user->free_pool < 0 &&
         -resource_user->free_pool <= resource_quota->free_pool) {
       int64_t amt = -resource_user->free_pool;
@@ -306,6 +330,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
     }
     if (resource_user->free_pool >= 0) {
       resource_user->allocating = false;
+      resource_user->outstanding_allocations = 0;
       GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated);
       gpr_mu_unlock(&resource_user->mu);
     } else {
@@ -486,6 +511,9 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
 }
 
 static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
+  if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+    gpr_log(GPR_DEBUG, "RU shutdown %p", ru);
+  }
   grpc_resource_user *resource_user = (grpc_resource_user *)ru;
   GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0],
                      GRPC_ERROR_CANCELLED);
@@ -716,6 +744,7 @@ grpc_resource_user *grpc_resource_user_create(
   resource_user->reclaimers[1] = NULL;
   resource_user->new_reclaimers[0] = NULL;
   resource_user->new_reclaimers[1] = NULL;
+  resource_user->outstanding_allocations = 0;
   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
     resource_user->links[i].next = resource_user->links[i].prev = NULL;
   }
@@ -776,6 +805,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
   gpr_mu_lock(&resource_user->mu);
   ru_ref_by(resource_user, (gpr_atm)size);
   resource_user->free_pool -= (int64_t)size;
+  resource_user->outstanding_allocations += (int64_t)size;
   if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
     gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
             resource_user->resource_quota->name, resource_user->name, size,
@@ -790,6 +820,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
                          GRPC_ERROR_NONE);
     }
   } else {
+    resource_user->outstanding_allocations -= (int64_t)size;
     GRPC_CLOSURE_SCHED(exec_ctx, optional_on_done, GRPC_ERROR_NONE);
   }
   gpr_mu_unlock(&resource_user->mu);

From a42a22b2bec661c5c46c8f884a98f7c004cd5b3a Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Wed, 20 Sep 2017 16:08:25 -0700
Subject: [PATCH 2/4] Update window overflow test

---
 test/core/bad_client/bad_client.c            | 15 ++++++++++-----
 test/core/bad_client/tests/window_overflow.c |  2 +-
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index 383d1240cb0..888a7c85a91 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -84,13 +84,18 @@ void grpc_run_bad_client_test(
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   grpc_completion_queue *shutdown_cq;
 
-  hex = gpr_dump(client_payload, client_payload_length,
-                 GPR_DUMP_HEX | GPR_DUMP_ASCII);
+  if (client_payload_length < 4 * 1024) {
+    hex = gpr_dump(client_payload, client_payload_length,
+                   GPR_DUMP_HEX | GPR_DUMP_ASCII);
 
-  /* Add a debug log */
-  gpr_log(GPR_INFO, "TEST: %s", hex);
+    /* Add a debug log */
+    gpr_log(GPR_INFO, "TEST: %s", hex);
 
-  gpr_free(hex);
+    gpr_free(hex);
+  } else {
+    gpr_log(GPR_INFO, "TEST: (%" PRIdPTR " byte long string)",
+            client_payload_length);
+  }
 
   /* Init grpc */
   grpc_init();
diff --git a/test/core/bad_client/tests/window_overflow.c b/test/core/bad_client/tests/window_overflow.c
index 1f29bd32fbd..9ced956957d 100644
--- a/test/core/bad_client/tests/window_overflow.c
+++ b/test/core/bad_client/tests/window_overflow.c
@@ -69,7 +69,7 @@ int main(int argc, char **argv) {
 #define MAX_FRAME_SIZE 16384
 #define MESSAGES_PER_FRAME (MAX_FRAME_SIZE / 5)
 #define FRAME_SIZE (MESSAGES_PER_FRAME * 5)
-#define SEND_SIZE (100 * 1024)
+#define SEND_SIZE (6 * 1024 * 1024)
 #define NUM_FRAMES (SEND_SIZE / FRAME_SIZE + 1)
   grpc_test_init(argc, argv);
 

From c1f288dedbe8eeb9591b78e305a277ddeb1f9f39 Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Wed, 20 Sep 2017 21:59:21 -0700
Subject: [PATCH 3/4] Bug fixes, remove async e2e dependence on flow control
 size

---
 src/core/lib/iomgr/resource_quota.c      |   2 +-
 test/core/transport/bdp_estimator_test.c |   2 +
 test/cpp/end2end/async_end2end_test.cc   | 150 +++++++++++------------
 3 files changed, 76 insertions(+), 78 deletions(-)

diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 2be2cad0ad2..e0a9829cd73 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -308,7 +308,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
       resource_user->free_pool += aborted_allocations;
       GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated);
       gpr_mu_unlock(&resource_user->mu);
-      ru_unref_by(exec_ctx, resource_user, aborted_allocations);
+      ru_unref_by(exec_ctx, resource_user, (gpr_atm)aborted_allocations);
       continue;
     }
     if (resource_user->free_pool < 0 &&
diff --git a/test/core/transport/bdp_estimator_test.c b/test/core/transport/bdp_estimator_test.c
index a256eb3a4a8..c672b535181 100644
--- a/test/core/transport/bdp_estimator_test.c
+++ b/test/core/transport/bdp_estimator_test.c
@@ -24,6 +24,7 @@
 #include <grpc/support/string_util.h>
 #include <grpc/support/useful.h>
 #include <limits.h>
+#include "src/core/lib/iomgr/timer_manager.h"
 #include "src/core/lib/support/string.h"
 #include "test/core/util/test_config.h"
 
@@ -145,6 +146,7 @@ int main(int argc, char **argv) {
   grpc_test_init(argc, argv);
   gpr_now_impl = fake_gpr_now;
   grpc_init();
+  grpc_timer_manager_set_threading(false);
   test_noop();
   test_get_estimate_no_samples();
   test_get_estimate_1_sample();
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 41090d161aa..e2531ef95af 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -105,6 +105,13 @@ class Verifier {
     expectations_[tag(i)] = expect_ok;
     return *this;
   }
+  // AcceptOnce sets the expected ok value for a specific tag, but does not
+  // require it to appear
+  // If it does, sets *seen to true
+  Verifier& AcceptOnce(int i, bool expect_ok, bool* seen) {
+    maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen};
+    return *this;
+  }
 
   // Next waits for 1 async tag to complete, checks its
   // expectations, and returns the tag
@@ -122,12 +129,7 @@ class Verifier {
     } else {
       EXPECT_TRUE(cq->Next(&got_tag, &ok));
     }
-    auto it = expectations_.find(got_tag);
-    EXPECT_TRUE(it != expectations_.end());
-    if (!ignore_ok) {
-      EXPECT_EQ(it->second, ok);
-    }
-    expectations_.erase(it);
+    GotTag(got_tag, ok, ignore_ok);
     return detag(got_tag);
   }
 
@@ -138,7 +140,7 @@ class Verifier {
   // This version of Verify allows optionally ignoring the
   // outcome of the expectation
   void Verify(CompletionQueue* cq, bool ignore_ok) {
-    GPR_ASSERT(!expectations_.empty());
+    GPR_ASSERT(!expectations_.empty() || !maybe_expectations_.empty());
     while (!expectations_.empty()) {
       Next(cq, ignore_ok);
     }
@@ -177,16 +179,43 @@ class Verifier {
           EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
                     CompletionQueue::GOT_EVENT);
         }
-        auto it = expectations_.find(got_tag);
-        EXPECT_TRUE(it != expectations_.end());
-        EXPECT_EQ(it->second, ok);
-        expectations_.erase(it);
+        GotTag(got_tag, ok, false);
       }
     }
   }
 
  private:
+  void GotTag(void* got_tag, bool ok, bool ignore_ok) {
+    auto it = expectations_.find(got_tag);
+    if (it != expectations_.end()) {
+      if (!ignore_ok) {
+        EXPECT_EQ(it->second, ok);
+      }
+      expectations_.erase(it);
+    } else {
+      auto it2 = maybe_expectations_.find(got_tag);
+      if (it2 != maybe_expectations_.end()) {
+        if (it2->second.seen != nullptr) {
+          EXPECT_FALSE(*it2->second.seen);
+          *it2->second.seen = true;
+        }
+        if (!ignore_ok) {
+          EXPECT_EQ(it2->second.ok, ok);
+        }
+      } else {
+        gpr_log(GPR_ERROR, "Unexpected tag: %p", tag);
+        abort();
+      }
+    }
+  }
+
+  struct MaybeExpect {
+    bool ok;
+    bool* seen;
+  };
+
   std::map<void*, bool> expectations_;
+  std::map<void*, MaybeExpect> maybe_expectations_;
   bool spin_;
 };
 
@@ -539,31 +568,19 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
 
   cli_stream->Write(send_request, tag(3));
 
-  // 65536(64KB) is the default flow control window size. Should change this
-  // number when default flow control window size changes. For the write of
-  // send_request larger than the flow control window size, tag:3 will not come
-  // up until server read is initiated. For write of send_request smaller than
-  // the flow control window size, the request can take the free ride with
-  // initial metadata due to coalescing, thus write tag:3 will come up here.
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking)
-        .Expect(2, true)
-        .Expect(3, true)
-        .Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
-  }
+  bool seen3 = false;
+
+  Verifier(GetParam().disable_blocking)
+      .Expect(2, true)
+      .AcceptOnce(3, true, &seen3)
+      .Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(4));
 
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking)
-        .Expect(3, true)
-        .Expect(4, true)
-        .Verify(cq_.get());
-  }
+  Verifier(GetParam().disable_blocking)
+      .AcceptOnce(3, true, &seen3)
+      .Expect(4, true)
+      .Verify(cq_.get());
 
   EXPECT_EQ(send_request.message(), recv_request.message());
 
@@ -588,6 +605,7 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
 
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
+  EXPECT_TRUE(seen3);
 }
 
 // One ping, two pongs.
@@ -834,31 +852,19 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
 
   cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
 
-  // 65536(64KB) is the default flow control window size. Should change this
-  // number when default flow control window size changes. For the write of
-  // send_request larger than the flow control window size, tag:3 will not come
-  // up until server read is initiated. For write of send_request smaller than
-  // the flow control window size, the request can take the free ride with
-  // initial metadata due to coalescing, thus write tag:3 will come up here.
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking)
-        .Expect(2, true)
-        .Expect(3, true)
-        .Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
-  }
+  bool seen3 = false;
+
+  Verifier(GetParam().disable_blocking)
+      .Expect(2, true)
+      .AcceptOnce(3, true, &seen3)
+      .Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(4));
 
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking)
-        .Expect(3, true)
-        .Expect(4, true)
-        .Verify(cq_.get());
-  }
+  Verifier(GetParam().disable_blocking)
+      .AcceptOnce(3, true, &seen3)
+      .Expect(4, true)
+      .Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   srv_stream.Read(&recv_request, tag(5));
@@ -877,6 +883,7 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
   Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
 
   EXPECT_TRUE(recv_status.ok());
+  EXPECT_TRUE(seen3);
 }
 
 // One ping, one pong. Using server:WriteLast api
@@ -902,31 +909,19 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
 
   cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
 
-  // 65536(64KB) is the default flow control window size. Should change this
-  // number when default flow control window size changes. For the write of
-  // send_request larger than the flow control window size, tag:3 will not come
-  // up until server read is initiated. For write of send_request smaller than
-  // the flow control window size, the request can take the free ride with
-  // initial metadata due to coalescing, thus write tag:3 will come up here.
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking)
-        .Expect(2, true)
-        .Expect(3, true)
-        .Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
-  }
+  bool seen3 = false;
+
+  Verifier(GetParam().disable_blocking)
+      .Expect(2, true)
+      .AcceptOnce(3, true, &seen3)
+      .Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(4));
 
-  if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
-    Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
-  } else {
-    Verifier(GetParam().disable_blocking)
-        .Expect(3, true)
-        .Expect(4, true)
-        .Verify(cq_.get());
-  }
+  Verifier(GetParam().disable_blocking)
+      .AcceptOnce(3, true, &seen3)
+      .Expect(4, true)
+      .Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   srv_stream.Read(&recv_request, tag(5));
@@ -947,6 +942,7 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
   Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
 
   EXPECT_TRUE(recv_status.ok());
+  EXPECT_TRUE(seen3);
 }
 
 // Metadata tests

From ddc81ca5566e463c5139b37248347aad2315bf63 Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Thu, 21 Sep 2017 12:06:29 -0700
Subject: [PATCH 4/4] Fix resource user shutdown path while allocating

---
 src/core/lib/iomgr/resource_quota.c | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index e0a9829cd73..85817080d62 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -523,6 +523,9 @@ static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
   resource_user->reclaimers[1] = NULL;
   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
+  if (resource_user->allocating) {
+    rq_step_sched(exec_ctx, resource_user->resource_quota);
+  }
 }
 
 static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {