Merge branch 'flowctlN' into flowctl+millis

reviewable/pr12677/r1
Craig Tiller 7 years ago
commit ced7653a1c
  1. 34
      src/core/lib/iomgr/resource_quota.c
  2. 15
      test/core/bad_client/bad_client.c
  3. 2
      test/core/bad_client/tests/window_overflow.c
  4. 2
      test/core/transport/bdp_estimator_test.c
  5. 150
      test/cpp/end2end/async_end2end_test.cc

@ -88,6 +88,8 @@ struct grpc_resource_user {
grpc_closure_list on_allocated; grpc_closure_list on_allocated;
/* True if we are currently trying to allocate from the quota, false if not */ /* True if we are currently trying to allocate from the quota, false if not */
bool allocating; bool allocating;
/* How many bytes of allocations are outstanding */
int64_t outstanding_allocations;
/* True if we are currently trying to add ourselves to the non-free quota /* True if we are currently trying to add ourselves to the non-free quota
list, false otherwise */ list, false otherwise */
bool added_to_free_pool; bool added_to_free_pool;
@ -152,6 +154,9 @@ struct grpc_resource_quota {
char *name; char *name;
}; };
static void ru_unref_by(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user, gpr_atm amount);
/******************************************************************************* /*******************************************************************************
* list management * list management
*/ */
@ -288,6 +293,25 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
while ((resource_user = rulist_pop_head(resource_quota, while ((resource_user = rulist_pop_head(resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION))) { GRPC_RULIST_AWAITING_ALLOCATION))) {
gpr_mu_lock(&resource_user->mu); gpr_mu_lock(&resource_user->mu);
if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
gpr_log(GPR_DEBUG, "RQ: check allocation for user %p shutdown=%" PRIdPTR
" free_pool=%" PRId64,
resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
resource_user->free_pool);
}
if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
resource_user->allocating = false;
grpc_closure_list_fail_all(
&resource_user->on_allocated,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
int64_t aborted_allocations = resource_user->outstanding_allocations;
resource_user->outstanding_allocations = 0;
resource_user->free_pool += aborted_allocations;
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated);
gpr_mu_unlock(&resource_user->mu);
ru_unref_by(exec_ctx, resource_user, (gpr_atm)aborted_allocations);
continue;
}
if (resource_user->free_pool < 0 && if (resource_user->free_pool < 0 &&
-resource_user->free_pool <= resource_quota->free_pool) { -resource_user->free_pool <= resource_quota->free_pool) {
int64_t amt = -resource_user->free_pool; int64_t amt = -resource_user->free_pool;
@ -307,6 +331,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
} }
if (resource_user->free_pool >= 0) { if (resource_user->free_pool >= 0) {
resource_user->allocating = false; resource_user->allocating = false;
resource_user->outstanding_allocations = 0;
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated); GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated);
gpr_mu_unlock(&resource_user->mu); gpr_mu_unlock(&resource_user->mu);
} else { } else {
@ -487,6 +512,9 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
} }
static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
gpr_log(GPR_DEBUG, "RU shutdown %p", ru);
}
grpc_resource_user *resource_user = (grpc_resource_user *)ru; grpc_resource_user *resource_user = (grpc_resource_user *)ru;
GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0], GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED); GRPC_ERROR_CANCELLED);
@ -496,6 +524,9 @@ static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
resource_user->reclaimers[1] = NULL; resource_user->reclaimers[1] = NULL;
rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
if (resource_user->allocating) {
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
} }
static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
@ -717,6 +748,7 @@ grpc_resource_user *grpc_resource_user_create(
resource_user->reclaimers[1] = NULL; resource_user->reclaimers[1] = NULL;
resource_user->new_reclaimers[0] = NULL; resource_user->new_reclaimers[0] = NULL;
resource_user->new_reclaimers[1] = NULL; resource_user->new_reclaimers[1] = NULL;
resource_user->outstanding_allocations = 0;
for (int i = 0; i < GRPC_RULIST_COUNT; i++) { for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_user->links[i].next = resource_user->links[i].prev = NULL; resource_user->links[i].next = resource_user->links[i].prev = NULL;
} }
@ -777,6 +809,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&resource_user->mu); gpr_mu_lock(&resource_user->mu);
ru_ref_by(resource_user, (gpr_atm)size); ru_ref_by(resource_user, (gpr_atm)size);
resource_user->free_pool -= (int64_t)size; resource_user->free_pool -= (int64_t)size;
resource_user->outstanding_allocations += (int64_t)size;
if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64, gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size, resource_user->resource_quota->name, resource_user->name, size,
@ -791,6 +824,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
} else { } else {
resource_user->outstanding_allocations -= (int64_t)size;
GRPC_CLOSURE_SCHED(exec_ctx, optional_on_done, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(exec_ctx, optional_on_done, GRPC_ERROR_NONE);
} }
gpr_mu_unlock(&resource_user->mu); gpr_mu_unlock(&resource_user->mu);

@ -84,13 +84,18 @@ void grpc_run_bad_client_test(
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_completion_queue *shutdown_cq; grpc_completion_queue *shutdown_cq;
hex = gpr_dump(client_payload, client_payload_length, if (client_payload_length < 4 * 1024) {
GPR_DUMP_HEX | GPR_DUMP_ASCII); hex = gpr_dump(client_payload, client_payload_length,
GPR_DUMP_HEX | GPR_DUMP_ASCII);
/* Add a debug log */ /* Add a debug log */
gpr_log(GPR_INFO, "TEST: %s", hex); gpr_log(GPR_INFO, "TEST: %s", hex);
gpr_free(hex); gpr_free(hex);
} else {
gpr_log(GPR_INFO, "TEST: (%" PRIdPTR " byte long string)",
client_payload_length);
}
/* Init grpc */ /* Init grpc */
grpc_init(); grpc_init();

@ -69,7 +69,7 @@ int main(int argc, char **argv) {
#define MAX_FRAME_SIZE 16384 #define MAX_FRAME_SIZE 16384
#define MESSAGES_PER_FRAME (MAX_FRAME_SIZE / 5) #define MESSAGES_PER_FRAME (MAX_FRAME_SIZE / 5)
#define FRAME_SIZE (MESSAGES_PER_FRAME * 5) #define FRAME_SIZE (MESSAGES_PER_FRAME * 5)
#define SEND_SIZE (100 * 1024) #define SEND_SIZE (6 * 1024 * 1024)
#define NUM_FRAMES (SEND_SIZE / FRAME_SIZE + 1) #define NUM_FRAMES (SEND_SIZE / FRAME_SIZE + 1)
grpc_test_init(argc, argv); grpc_test_init(argc, argv);

@ -24,6 +24,7 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include <limits.h> #include <limits.h>
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
@ -145,6 +146,7 @@ int main(int argc, char **argv) {
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
gpr_now_impl = fake_gpr_now; gpr_now_impl = fake_gpr_now;
grpc_init(); grpc_init();
grpc_timer_manager_set_threading(false);
test_noop(); test_noop();
test_get_estimate_no_samples(); test_get_estimate_no_samples();
test_get_estimate_1_sample(); test_get_estimate_1_sample();

@ -105,6 +105,13 @@ class Verifier {
expectations_[tag(i)] = expect_ok; expectations_[tag(i)] = expect_ok;
return *this; return *this;
} }
// AcceptOnce sets the expected ok value for a specific tag, but does not
// require it to appear
// If it does, sets *seen to true
Verifier& AcceptOnce(int i, bool expect_ok, bool* seen) {
maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen};
return *this;
}
// Next waits for 1 async tag to complete, checks its // Next waits for 1 async tag to complete, checks its
// expectations, and returns the tag // expectations, and returns the tag
@ -122,12 +129,7 @@ class Verifier {
} else { } else {
EXPECT_TRUE(cq->Next(&got_tag, &ok)); EXPECT_TRUE(cq->Next(&got_tag, &ok));
} }
auto it = expectations_.find(got_tag); GotTag(got_tag, ok, ignore_ok);
EXPECT_TRUE(it != expectations_.end());
if (!ignore_ok) {
EXPECT_EQ(it->second, ok);
}
expectations_.erase(it);
return detag(got_tag); return detag(got_tag);
} }
@ -138,7 +140,7 @@ class Verifier {
// This version of Verify allows optionally ignoring the // This version of Verify allows optionally ignoring the
// outcome of the expectation // outcome of the expectation
void Verify(CompletionQueue* cq, bool ignore_ok) { void Verify(CompletionQueue* cq, bool ignore_ok) {
GPR_ASSERT(!expectations_.empty()); GPR_ASSERT(!expectations_.empty() || !maybe_expectations_.empty());
while (!expectations_.empty()) { while (!expectations_.empty()) {
Next(cq, ignore_ok); Next(cq, ignore_ok);
} }
@ -177,16 +179,43 @@ class Verifier {
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
CompletionQueue::GOT_EVENT); CompletionQueue::GOT_EVENT);
} }
auto it = expectations_.find(got_tag); GotTag(got_tag, ok, false);
EXPECT_TRUE(it != expectations_.end());
EXPECT_EQ(it->second, ok);
expectations_.erase(it);
} }
} }
} }
private: private:
void GotTag(void* got_tag, bool ok, bool ignore_ok) {
auto it = expectations_.find(got_tag);
if (it != expectations_.end()) {
if (!ignore_ok) {
EXPECT_EQ(it->second, ok);
}
expectations_.erase(it);
} else {
auto it2 = maybe_expectations_.find(got_tag);
if (it2 != maybe_expectations_.end()) {
if (it2->second.seen != nullptr) {
EXPECT_FALSE(*it2->second.seen);
*it2->second.seen = true;
}
if (!ignore_ok) {
EXPECT_EQ(it2->second.ok, ok);
}
} else {
gpr_log(GPR_ERROR, "Unexpected tag: %p", tag);
abort();
}
}
}
struct MaybeExpect {
bool ok;
bool* seen;
};
std::map<void*, bool> expectations_; std::map<void*, bool> expectations_;
std::map<void*, MaybeExpect> maybe_expectations_;
bool spin_; bool spin_;
}; };
@ -539,31 +568,19 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
cli_stream->Write(send_request, tag(3)); cli_stream->Write(send_request, tag(3));
// 65536(64KB) is the default flow control window size. Should change this bool seen3 = false;
// number when default flow control window size changes. For the write of
// send_request larger than the flow control window size, tag:3 will not come Verifier(GetParam().disable_blocking)
// up until server read is initiated. For write of send_request smaller than .Expect(2, true)
// the flow control window size, the request can take the free ride with .AcceptOnce(3, true, &seen3)
// initial metadata due to coalescing, thus write tag:3 will come up here. .Verify(cq_.get());
if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
Verifier(GetParam().disable_blocking)
.Expect(2, true)
.Expect(3, true)
.Verify(cq_.get());
} else {
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
}
srv_stream.Read(&recv_request, tag(4)); srv_stream.Read(&recv_request, tag(4));
if (GetParam().message_content.length() < 65536 || GetParam().inproc) { Verifier(GetParam().disable_blocking)
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); .AcceptOnce(3, true, &seen3)
} else { .Expect(4, true)
Verifier(GetParam().disable_blocking) .Verify(cq_.get());
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
}
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
@ -588,6 +605,7 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
EXPECT_TRUE(seen3);
} }
// One ping, two pongs. // One ping, two pongs.
@ -834,31 +852,19 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
// 65536(64KB) is the default flow control window size. Should change this bool seen3 = false;
// number when default flow control window size changes. For the write of
// send_request larger than the flow control window size, tag:3 will not come Verifier(GetParam().disable_blocking)
// up until server read is initiated. For write of send_request smaller than .Expect(2, true)
// the flow control window size, the request can take the free ride with .AcceptOnce(3, true, &seen3)
// initial metadata due to coalescing, thus write tag:3 will come up here. .Verify(cq_.get());
if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
Verifier(GetParam().disable_blocking)
.Expect(2, true)
.Expect(3, true)
.Verify(cq_.get());
} else {
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
}
srv_stream.Read(&recv_request, tag(4)); srv_stream.Read(&recv_request, tag(4));
if (GetParam().message_content.length() < 65536 || GetParam().inproc) { Verifier(GetParam().disable_blocking)
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); .AcceptOnce(3, true, &seen3)
} else { .Expect(4, true)
Verifier(GetParam().disable_blocking) .Verify(cq_.get());
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
}
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
srv_stream.Read(&recv_request, tag(5)); srv_stream.Read(&recv_request, tag(5));
@ -877,6 +883,7 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
EXPECT_TRUE(seen3);
} }
// One ping, one pong. Using server:WriteLast api // One ping, one pong. Using server:WriteLast api
@ -902,31 +909,19 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
// 65536(64KB) is the default flow control window size. Should change this bool seen3 = false;
// number when default flow control window size changes. For the write of
// send_request larger than the flow control window size, tag:3 will not come Verifier(GetParam().disable_blocking)
// up until server read is initiated. For write of send_request smaller than .Expect(2, true)
// the flow control window size, the request can take the free ride with .AcceptOnce(3, true, &seen3)
// initial metadata due to coalescing, thus write tag:3 will come up here. .Verify(cq_.get());
if (GetParam().message_content.length() < 65536 || GetParam().inproc) {
Verifier(GetParam().disable_blocking)
.Expect(2, true)
.Expect(3, true)
.Verify(cq_.get());
} else {
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
}
srv_stream.Read(&recv_request, tag(4)); srv_stream.Read(&recv_request, tag(4));
if (GetParam().message_content.length() < 65536 || GetParam().inproc) { Verifier(GetParam().disable_blocking)
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); .AcceptOnce(3, true, &seen3)
} else { .Expect(4, true)
Verifier(GetParam().disable_blocking) .Verify(cq_.get());
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
}
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
srv_stream.Read(&recv_request, tag(5)); srv_stream.Read(&recv_request, tag(5));
@ -947,6 +942,7 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
EXPECT_TRUE(seen3);
} }
// Metadata tests // Metadata tests

Loading…
Cancel
Save