resource quota and lb policy

reviewable/pr20542/r1
Yash Tibrewal 5 years ago
parent 714e4c849f
commit 7a3388b077
  1. 5
      src/core/ext/filters/client_channel/lb_policy.cc
  2. 44
      src/core/lib/iomgr/resource_quota.cc

@ -105,9 +105,8 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
if (!exit_idle_called_) {
exit_idle_called_ = true;
parent_->Ref().release(); // ref held by closure.
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(&CallExitIdle, parent_.get(),
grpc_combiner_scheduler(parent_->combiner())),
parent_->combiner()->Run(
GRPC_CLOSURE_CREATE(&CallExitIdle, parent_.get(), nullptr),
GRPC_ERROR_NONE);
}
PickResult result;

@ -293,7 +293,8 @@ static void rq_step_sched(grpc_resource_quota* resource_quota) {
if (resource_quota->step_scheduled) return;
resource_quota->step_scheduled = true;
grpc_resource_quota_ref_internal(resource_quota);
GRPC_CLOSURE_SCHED(&resource_quota->rq_step_closure, GRPC_ERROR_NONE);
resource_quota->combiner->FinallyRun(&resource_quota->rq_step_closure,
GRPC_ERROR_NONE);
}
/* update the atomically available resource estimate - use no barriers since
@ -655,10 +656,9 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) {
(intptr_t)resource_quota);
}
GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
grpc_combiner_finally_scheduler(resource_quota->combiner));
nullptr);
GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
rq_reclamation_done, resource_quota,
grpc_combiner_scheduler(resource_quota->combiner));
rq_reclamation_done, resource_quota, nullptr);
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_quota->roots[i] = nullptr;
}
@ -774,19 +774,15 @@ grpc_resource_user* grpc_resource_user_create(
resource_user->resource_quota =
grpc_resource_quota_ref_internal(resource_quota);
GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
resource_user, nullptr);
GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
&ru_add_to_free_pool, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
&ru_add_to_free_pool, resource_user, nullptr);
GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
&ru_post_benign_reclaimer, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
&ru_post_benign_reclaimer, resource_user, nullptr);
GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
&ru_post_destructive_reclaimer, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
&ru_post_destructive_reclaimer, resource_user, nullptr);
GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
nullptr);
gpr_mu_init(&resource_user->mu);
gpr_atm_rel_store(&resource_user->refs, 1);
gpr_atm_rel_store(&resource_user->shutdown, 0);
@ -827,7 +823,8 @@ static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
GPR_ASSERT(old >= amount);
if (old == amount) {
GRPC_CLOSURE_SCHED(&resource_user->destroy_closure, GRPC_ERROR_NONE);
resource_user->resource_quota->combiner->Run(
&resource_user->destroy_closure, GRPC_ERROR_NONE);
}
}
@ -841,10 +838,8 @@ void grpc_resource_user_unref(grpc_resource_user* resource_user) {
void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(
ru_shutdown, resource_user,
grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
resource_user->resource_quota->combiner->Run(
GRPC_CLOSURE_CREATE(ru_shutdown, resource_user, nullptr),
GRPC_ERROR_NONE);
}
}
@ -902,7 +897,8 @@ static bool resource_user_alloc_locked(grpc_resource_user* resource_user,
}
if (!resource_user->allocating) {
resource_user->allocating = true;
GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE);
resource_user->resource_quota->combiner->Run(
&resource_user->allocate_closure, GRPC_ERROR_NONE);
}
return false;
}
@ -957,8 +953,8 @@ void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
if (is_bigger_than_zero && was_zero_or_negative &&
!resource_user->added_to_free_pool) {
resource_user->added_to_free_pool = true;
GRPC_CLOSURE_SCHED(&resource_user->add_to_free_pool_closure,
GRPC_ERROR_NONE);
resource_quota->combiner->Run(&resource_user->add_to_free_pool_closure,
GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);
ru_unref_by(resource_user, static_cast<gpr_atm>(size));
@ -969,8 +965,8 @@ void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
grpc_closure* closure) {
GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
resource_user->new_reclaimers[destructive] = closure;
GRPC_CLOSURE_SCHED(&resource_user->post_reclaimer_closure[destructive],
GRPC_ERROR_NONE);
resource_user->resource_quota->combiner->Run(
&resource_user->post_reclaimer_closure[destructive], GRPC_ERROR_NONE);
}
void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
@ -978,7 +974,7 @@ void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
resource_user->resource_quota->name, resource_user->name);
}
GRPC_CLOSURE_SCHED(
resource_user->resource_quota->combiner->Run(
&resource_user->resource_quota->rq_reclamation_done_closure,
GRPC_ERROR_NONE);
}

Loading…
Cancel
Save