PID controller stabilization

pull/9511/head
Craig Tiller 8 years ago
parent c0118b494e
commit 91f77eacce
  1. 59
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 6
      src/core/ext/transport/chttp2/transport/internal.h
  3. 34
      src/core/lib/transport/bdp_estimator.c
  4. 5
      src/core/lib/transport/bdp_estimator.h

@ -252,8 +252,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_bdp_estimator_init(&t->bdp_estimator);
t->last_bdp_ping_finished = gpr_now(GPR_CLOCK_MONOTONIC);
t->last_pid_update = t->last_bdp_ping_finished;
grpc_pid_controller_init(&t->pid_controller, 16, 8, 0);
t->bdp_guess = DEFAULT_WINDOW;
grpc_pid_controller_init(&t->pid_controller, 4, 4, 0);
t->log2_bdp_guess = log2(DEFAULT_WINDOW);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(&t->hpack_parser);
@ -1897,26 +1897,29 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
int64_t estimate = -1;
double bdp_error = 0.0;
if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) {
bdp_error = 2.0 * (double)estimate - t->bdp_guess;
}
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update);
double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
if (dt > 3) {
grpc_pid_controller_reset(&t->pid_controller);
}
double memory_pressure = grpc_resource_quota_get_memory_pressure(
grpc_resource_user_get_quota(grpc_endpoint_get_resource_user(t->ep)));
if (memory_pressure > 0.8) {
bdp_error -= GPR_MAX(0, t->bdp_guess) * (memory_pressure - 0.8) / 0.2;
}
if (t->bdp_guess < 1e-6 && bdp_error < 0) {
bdp_error = 0;
double target = (double)estimate;
double memory_pressure = grpc_resource_quota_get_memory_pressure(
grpc_resource_user_get_quota(grpc_endpoint_get_resource_user(t->ep)));
if (memory_pressure > 0.8) {
target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
}
bdp_error = target > 0 ? log2(target) - t->log2_bdp_guess
: GPR_MIN(0, -t->log2_bdp_guess);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update);
double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
if (dt > 3) {
grpc_pid_controller_reset(&t->pid_controller);
}
t->log2_bdp_guess +=
grpc_pid_controller_update(&t->pid_controller, bdp_error, dt);
t->log2_bdp_guess = GPR_CLAMP(t->log2_bdp_guess, -5, 21);
gpr_log(GPR_DEBUG, "%s: err=%lf cur=%lf pressure=%lf target=%lf",
t->peer_string, bdp_error, t->log2_bdp_guess, memory_pressure,
target);
update_bdp(exec_ctx, t, pow(2, t->log2_bdp_guess));
t->last_pid_update = now;
}
t->bdp_guess +=
grpc_pid_controller_update(&t->pid_controller, bdp_error, dt);
update_bdp(exec_ctx, t, t->bdp_guess);
t->last_pid_update = now;
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@ -1929,21 +1932,23 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action_locked", 0);
}
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string);
grpc_bdp_estimator_start_ping(&t->bdp_estimator);
}
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
grpc_bdp_estimator_complete_ping(&t->bdp_estimator);
t->last_bdp_ping_finished = gpr_now(GPR_CLOCK_MONOTONIC);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
}
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
grpc_bdp_estimator_start_ping(&t->bdp_estimator);
}
/*******************************************************************************
* CALLBACK LOOP
*/

@ -82,8 +82,8 @@ typedef enum {
} grpc_chttp2_ping_type;
typedef enum {
GRPC_CHTTP2_PCL_NEXT = 0,
GRPC_CHTTP2_PCL_INITIATE,
GRPC_CHTTP2_PCL_INITIATE = 0,
GRPC_CHTTP2_PCL_NEXT,
GRPC_CHTTP2_PCL_INFLIGHT,
GRPC_CHTTP2_PCL_COUNT /* must be last */
} grpc_chttp2_ping_closure_list;
@ -330,7 +330,7 @@ struct grpc_chttp2_transport {
/* bdp estimator */
grpc_bdp_estimator bdp_estimator;
grpc_pid_controller pid_controller;
double bdp_guess;
double log2_bdp_guess;
grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping_locked;
gpr_timespec last_bdp_ping_finished;

@ -39,33 +39,16 @@
#include <grpc/support/useful.h>
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator) {
estimator->num_samples = 0;
estimator->first_sample_idx = 0;
estimator->estimate = 65536;
estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
}
bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
int64_t *estimate) {
if (estimator->num_samples < GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE) {
return false;
}
*estimate = -1;
for (uint8_t i = 0; i < estimator->num_samples; i++) {
*estimate = GPR_MAX(
*estimate,
estimator
->samples[(estimator->first_sample_idx + i) % GRPC_BDP_SAMPLES]);
}
*estimate = estimator->estimate;
return true;
}
static int64_t *sampling(grpc_bdp_estimator *estimator) {
return &estimator
->samples[(estimator->first_sample_idx + estimator->num_samples) %
GRPC_BDP_SAMPLES];
}
bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes) {
switch (estimator->ping_state) {
@ -74,7 +57,7 @@ bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
case GRPC_BDP_PING_SCHEDULED:
return false;
case GRPC_BDP_PING_STARTED:
*sampling(estimator) += num_bytes;
estimator->accumulator += num_bytes;
return false;
}
GPR_UNREACHABLE_CODE(return false);
@ -88,15 +71,14 @@ void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator) {
void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) {
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED);
estimator->ping_state = GRPC_BDP_PING_STARTED;
if (estimator->num_samples == GRPC_BDP_SAMPLES) {
estimator->first_sample_idx++;
estimator->num_samples--;
}
*sampling(estimator) = 0;
estimator->accumulator = 0;
}
void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) {
GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED);
estimator->num_samples++;
if (estimator->accumulator > 2 * estimator->estimate / 3) {
estimator->estimate *= 2;
gpr_log(GPR_DEBUG, "est --> %" PRId64, estimator->estimate);
}
estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
}

@ -47,10 +47,9 @@ typedef enum {
} grpc_bdp_estimator_ping_state;
typedef struct grpc_bdp_estimator {
uint8_t num_samples;
uint8_t first_sample_idx;
grpc_bdp_estimator_ping_state ping_state;
int64_t samples[GRPC_BDP_SAMPLES];
int64_t accumulator;
int64_t estimate;
} grpc_bdp_estimator;
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator);

Loading…
Cancel
Save