Pull bdp estimation into flowctl module

pull/11831/head
ncteisen 8 years ago
parent a27680b597
commit 41ba268c20
  1. 116
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 169
      src/core/ext/transport/chttp2/transport/flow_control.c
  3. 32
      src/core/ext/transport/chttp2/transport/internal.h
  4. 10
      src/core/lib/transport/bdp_estimator.c
  5. 10
      src/core/lib/transport/bdp_estimator.h
  6. 7
      test/core/transport/bdp_estimator_test.c

@ -304,10 +304,10 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
keepalive_watchdog_fired_locked, t, keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner)); grpc_combiner_scheduler(t->combiner));
grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string); grpc_bdp_estimator_init(&t->flow_control.bdp_estimator, t->peer_string);
t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); t->flow_control.last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_pid_controller_init( grpc_pid_controller_init(
&t->pid_controller, &t->flow_control.pid_controller,
(grpc_pid_controller_args){.gain_p = 4, (grpc_pid_controller_args){.gain_p = 4,
.gain_i = 8, .gain_i = 8,
.gain_d = 0, .gain_d = 0,
@ -340,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0; t->sent_local_settings = 0;
t->write_buffer_size = DEFAULT_WINDOW; t->write_buffer_size = DEFAULT_WINDOW;
t->enable_bdp_probe = true; t->flow_control.enable_bdp_probe = true;
if (is_client) { if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@ -457,7 +457,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
(grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE}); (grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE});
} else if (0 == } else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
t->enable_bdp_probe = grpc_channel_arg_get_integer( t->flow_control.enable_bdp_probe = grpc_channel_arg_get_integer(
&channel_args->args[i], (grpc_integer_options){1, 0, 1}); &channel_args->args[i], (grpc_integer_options){1, 0, 1});
} else if (0 == strcmp(channel_args->args[i].key, } else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_TIME_MS)) { GRPC_ARG_KEEPALIVE_TIME_MS)) {
@ -2253,46 +2253,27 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
break; break;
} }
if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
if (action.initial_window_size > 0) {
queue_setting_update(exec_ctx, t,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
(uint32_t)action.initial_window_size);
} }
if (action.max_frame_size > 0) {
static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
double bdp_dbl) { (uint32_t)action.max_frame_size);
// initial window size bounded [1,2^31-1], but we set the min to 128.
int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX);
int64_t delta =
(int64_t)bdp -
(int64_t)t->settings[GRPC_LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
if (delta == 0 || (delta > -bdp / 10 && delta < bdp / 10)) {
return;
} }
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace) || if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) {
GRPC_TRACER_ON(grpc_flowctl_trace)) { grpc_chttp2_initiate_write(exec_ctx, t, "immediate setting update");
gpr_log(GPR_DEBUG, "%s | %p[%s] | update initial window size to %d",
t->peer_string, t, t->is_client ? "cli" : "svr", (int)bdp);
} }
queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
(uint32_t)bdp);
}
static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
double bw_dbl, double bdp_dbl) {
int32_t bdp = (int32_t)GPR_CLAMP(bdp_dbl, 128.0, INT32_MAX);
int32_t target = (int32_t)GPR_MAX(bw_dbl / 1000, bdp);
// frame size is bounded [2^14,2^24-1]
int32_t frame_size = GPR_CLAMP(target, 16384, 16777215);
int64_t delta = (int64_t)frame_size -
(int64_t)t->settings[GRPC_LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
if (delta == 0 || (delta > -frame_size / 10 && delta < frame_size / 10)) {
return;
} }
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { if (action.need_ping) {
gpr_log(GPR_DEBUG, "%s: update max_frame size to %d", t->peer_string, GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
(int)frame_size); grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator);
send_ping_locked(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
&t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
} }
queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
(uint32_t)frame_size);
} }
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@ -2330,7 +2311,6 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("reading_action_locked", 0); GPR_TIMER_BEGIN("reading_action_locked", 0);
grpc_chttp2_transport *t = tp; grpc_chttp2_transport *t = tp;
bool need_bdp_ping = false;
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
@ -2349,11 +2329,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE}; GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
if (grpc_bdp_estimator_add_incoming_bytes( grpc_bdp_estimator_add_incoming_bytes(
&t->bdp_estimator, &t->flow_control.bdp_estimator,
(int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]))) { (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
need_bdp_ping = true;
}
errors[1] = errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
} }
@ -2400,45 +2378,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (keep_reading) { if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked); &t->read_action_locked);
grpc_chttp2_act_on_flowctl_action(
if (t->enable_bdp_probe) { exec_ctx, grpc_chttp2_flowctl_get_bdp_action(&t->flow_control), t,
if (need_bdp_ping) { NULL);
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
grpc_bdp_estimator_schedule_ping(&t->bdp_estimator);
send_ping_locked(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
&t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
}
int64_t estimate = -1;
double bdp_guess = -1;
if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) {
double target = 1 + log2((double)estimate);
double memory_pressure = grpc_resource_quota_get_memory_pressure(
grpc_resource_user_quota(grpc_endpoint_get_resource_user(t->ep)));
if (memory_pressure > 0.8) {
target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
}
double bdp_error =
target - grpc_pid_controller_last(&t->pid_controller);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update);
double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
if (dt > 0.1) {
dt = 0.1;
}
double log2_bdp_guess =
grpc_pid_controller_update(&t->pid_controller, bdp_error, dt);
bdp_guess = pow(2, log2_bdp_guess);
update_bdp(exec_ctx, t, bdp_guess);
t->last_pid_update = now;
}
double bw = -1;
if (grpc_bdp_estimator_get_bw(&t->bdp_estimator, &bw)) {
update_frame(exec_ctx, t, bw, bdp_guess);
}
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else { } else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@ -2461,7 +2403,7 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
} }
grpc_bdp_estimator_start_ping(&t->bdp_estimator); grpc_bdp_estimator_start_ping(&t->flow_control.bdp_estimator);
} }
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
@ -2470,7 +2412,7 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (GRPC_TRACER_ON(grpc_http_trace)) { if (GRPC_TRACER_ON(grpc_http_trace)) {
gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
} }
grpc_bdp_estimator_complete_ping(&t->bdp_estimator); grpc_bdp_estimator_complete_ping(&t->flow_control.bdp_estimator);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
} }

@ -18,6 +18,7 @@
#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/internal.h"
#include <math.h>
#include <string.h> #include <string.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
@ -39,6 +40,8 @@ typedef struct {
int64_t remote_window_delta; int64_t remote_window_delta;
int64_t local_window_delta; int64_t local_window_delta;
int64_t announced_window_delta; int64_t announced_window_delta;
uint32_t local_init_window;
uint32_t local_max_frame;
} shadow_flow_control; } shadow_flow_control;
static void pretrace(shadow_flow_control* shadow_fc, static void pretrace(shadow_flow_control* shadow_fc,
@ -54,14 +57,28 @@ static void pretrace(shadow_flow_control* shadow_fc,
} }
} }
static char* fmt_str(int64_t old, int64_t new) { #define TRACE_PADDING 30
static char* fmt_int64_diff_str(int64_t old, int64_t new) {
char* str; char* str;
if (old != new) { if (old != new) {
gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new); gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new);
} else { } else {
gpr_asprintf(&str, "%" PRId64 "", old); gpr_asprintf(&str, "%" PRId64 "", old);
} }
char* str_lp = gpr_leftpad(str, ' ', 30); char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
gpr_free(str);
return str_lp;
}
static char* fmt_uint32_diff_str(uint32_t old, uint32_t new) {
char* str;
if (new > 0 && old != new) {
gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old, new);
} else {
gpr_asprintf(&str, "%" PRIu32 "", old);
}
char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
gpr_free(str); gpr_free(str);
return str_lp; return str_lp;
} }
@ -75,24 +92,28 @@ static void posttrace(shadow_flow_control* shadow_fc,
uint32_t remote_window = uint32_t remote_window =
tfc->t->settings[GRPC_PEER_SETTINGS] tfc->t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
char* trw_str = fmt_str(shadow_fc->remote_window, tfc->remote_window); char* trw_str =
char* tlw_str = fmt_str(shadow_fc->target_window, fmt_int64_diff_str(shadow_fc->remote_window, tfc->remote_window);
char* tlw_str = fmt_int64_diff_str(shadow_fc->target_window,
grpc_chttp2_target_announced_window(tfc)); grpc_chttp2_target_announced_window(tfc));
char* taw_str = fmt_str(shadow_fc->announced_window, tfc->announced_window); char* taw_str =
fmt_int64_diff_str(shadow_fc->announced_window, tfc->announced_window);
char* srw_str; char* srw_str;
char* slw_str; char* slw_str;
char* saw_str; char* saw_str;
if (sfc != NULL) { if (sfc != NULL) {
srw_str = fmt_str(shadow_fc->remote_window_delta + remote_window, srw_str = fmt_int64_diff_str(shadow_fc->remote_window_delta + remote_window,
sfc->remote_window_delta + remote_window); sfc->remote_window_delta + remote_window);
slw_str = fmt_str(shadow_fc->local_window_delta + acked_local_window, slw_str =
fmt_int64_diff_str(shadow_fc->local_window_delta + acked_local_window,
sfc->local_window_delta + acked_local_window); sfc->local_window_delta + acked_local_window);
saw_str = fmt_str(shadow_fc->announced_window_delta + acked_local_window, saw_str = fmt_int64_diff_str(
shadow_fc->announced_window_delta + acked_local_window,
sfc->announced_window_delta + acked_local_window); sfc->announced_window_delta + acked_local_window);
} else { } else {
srw_str = gpr_leftpad("", ' ', 30); srw_str = gpr_leftpad("", ' ', TRACE_PADDING);
slw_str = gpr_leftpad("", ' ', 30); slw_str = gpr_leftpad("", ' ', TRACE_PADDING);
saw_str = gpr_leftpad("", ' ', 30); saw_str = gpr_leftpad("", ' ', TRACE_PADDING);
} }
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s", "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
@ -120,10 +141,21 @@ static char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) {
GPR_UNREACHABLE_CODE(return "unknown"); GPR_UNREACHABLE_CODE(return "unknown");
} }
static void trace_action(grpc_chttp2_flowctl_action action) { static void trace_action(grpc_chttp2_transport_flowctl* tfc,
gpr_log(GPR_DEBUG, "transport: %s, stream: %s", grpc_chttp2_flowctl_action action) {
char* iw_str = fmt_uint32_diff_str(
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
action.initial_window_size);
char* mf_str = fmt_uint32_diff_str(
tfc->t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
action.max_frame_size);
gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s",
urgency_to_string(action.send_transport_update), urgency_to_string(action.send_transport_update),
urgency_to_string(action.send_stream_update)); urgency_to_string(action.send_stream_update),
urgency_to_string(action.send_setting_update), iw_str, mf_str);
gpr_free(iw_str);
gpr_free(mf_str);
} }
#define PRETRACE(tfc, sfc) \ #define PRETRACE(tfc, sfc) \
@ -131,11 +163,12 @@ static void trace_action(grpc_chttp2_flowctl_action action) {
GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc)) GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc))
#define POSTTRACE(tfc, sfc, reason) \ #define POSTTRACE(tfc, sfc, reason) \
GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason)) GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason))
#define TRACEACTION(action) GRPC_FLOW_CONTROL_IF_TRACING(trace_action(action)) #define TRACEACTION(tfc, action) \
GRPC_FLOW_CONTROL_IF_TRACING(trace_action(tfc, action))
#else #else
#define PRETRACE(tfc, sfc) #define PRETRACE(tfc, sfc)
#define POSTTRACE(tfc, sfc, reason) #define POSTTRACE(tfc, sfc, reason)
#define TRACEACTION(action) #define TRACEACTION(tfc, action)
#endif #endif
/* How many bytes of incoming flow control would we like to advertise */ /* How many bytes of incoming flow control would we like to advertise */
@ -342,15 +375,58 @@ void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc,
announced_window_delta_preupdate(tfc, sfc); announced_window_delta_preupdate(tfc, sfc);
} }
// Returns an urgency with which to make an update
static grpc_chttp2_flowctl_urgency delta_is_significant(
const grpc_chttp2_transport_flowctl* tfc, int32_t value,
grpc_chttp2_setting_id setting_id) {
int64_t delta = (int64_t)value -
(int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id];
// TODO(ncteisen): tune this
if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
} else {
return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED;
}
}
// Takes in a target and uses the pid controller to return a stabilized
// guess at the new bdp.
static double get_pid_controller_guess(grpc_chttp2_transport_flowctl* tfc,
double target) {
double bdp_error = target - grpc_pid_controller_last(&tfc->pid_controller);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec dt_timespec = gpr_time_sub(now, tfc->last_pid_update);
double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
if (dt > 0.1) {
dt = 0.1;
}
double log2_bdp_guess =
grpc_pid_controller_update(&tfc->pid_controller, bdp_error, dt);
tfc->last_pid_update = now;
return pow(2, log2_bdp_guess);
}
// Take in a target and modifies it based on the memory pressure of the system
static double get_target_under_memory_pressure(
grpc_chttp2_transport_flowctl* tfc, double target) {
// do not increase window under heavy memory pressure.
double memory_pressure = grpc_resource_quota_get_memory_pressure(
grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep)));
if (memory_pressure > 0.8) {
target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
}
return target;
}
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
const grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
const grpc_chttp2_stream_flowctl* sfc) {
grpc_chttp2_flowctl_action action; grpc_chttp2_flowctl_action action;
memset(&action, 0, sizeof(action)); memset(&action, 0, sizeof(action));
uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
if (tfc->announced_window < target_announced_window / 2) { if (tfc->announced_window < target_announced_window / 2) {
action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
} }
// TODO(ncteisen): tune this
if (sfc != NULL && !sfc->s->read_closed) { if (sfc != NULL && !sfc->s->read_closed) {
uint32_t sent_init_window = uint32_t sent_init_window =
tfc->t->settings[GRPC_SENT_SETTINGS] tfc->t->settings[GRPC_SENT_SETTINGS]
@ -364,6 +440,61 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
} }
} }
TRACEACTION(action); TRACEACTION(tfc, action);
return action;
}
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action(
grpc_chttp2_transport_flowctl* tfc) {
grpc_chttp2_flowctl_action action;
memset(&action, 0, sizeof(action));
if (tfc->enable_bdp_probe) {
action.need_ping = grpc_bdp_estimator_need_ping(&tfc->bdp_estimator);
// get bdp estimate and update initial_window accordingly.
int64_t estimate = -1;
int32_t bdp = -1;
if (grpc_bdp_estimator_get_estimate(&tfc->bdp_estimator, &estimate)) {
double target = 1 + log2((double)estimate);
// target might change based on how much memory pressure we are under
// TODO(ncteisen): experiment with setting target to be huge under low
// memory pressure.
target = get_target_under_memory_pressure(tfc, target);
// run our target through the pid controller to stabilize change.
// TODO(ncteisen): experiment with other controllers here.
double bdp_guess = get_pid_controller_guess(tfc, target);
// Though initial window 'could' drop to 0, we keep the floor at 128
bdp = GPR_MAX((int32_t)bdp_guess, 128);
grpc_chttp2_flowctl_urgency init_window_update_urgency =
delta_is_significant(tfc, bdp,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
action.send_setting_update = init_window_update_urgency;
action.initial_window_size = (uint32_t)bdp;
}
}
// get bandwidth estimate and update max_frame accordingly.
double bw_dbl = -1;
if (grpc_bdp_estimator_get_bw(&tfc->bdp_estimator, &bw_dbl)) {
// we target the max of BDP or bandwidth in microseconds.
int32_t frame_size =
GPR_CLAMP(GPR_MAX((int32_t)bw_dbl / 1000, bdp), 16384, 16777215);
grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant(
tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE);
if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
if (frame_size_urgency > action.send_setting_update) {
action.send_setting_update = frame_size_urgency;
}
action.max_frame_size = (uint32_t)frame_size;
}
}
}
TRACEACTION(tfc, action);
return action; return action;
} }

@ -238,7 +238,17 @@ typedef struct {
* to send WINDOW_UPDATE frames. */ * to send WINDOW_UPDATE frames. */
int64_t announced_window; int64_t announced_window;
// read only pointer back to transport for certain data /** should we probe bdp? */
bool enable_bdp_probe;
/* bdp estimation */
grpc_bdp_estimator bdp_estimator;
/* pid controller */
grpc_pid_controller pid_controller;
gpr_timespec last_pid_update;
// pointer back to transport for tracing
const grpc_chttp2_transport *t; const grpc_chttp2_transport *t;
} grpc_chttp2_transport_flowctl; } grpc_chttp2_transport_flowctl;
@ -261,9 +271,6 @@ struct grpc_chttp2_transport {
/** is there a read request to the endpoint outstanding? */ /** is there a read request to the endpoint outstanding? */
uint8_t endpoint_reading; uint8_t endpoint_reading;
/** should we probe bdp? */
bool enable_bdp_probe;
grpc_chttp2_optimization_target opt_target; grpc_chttp2_optimization_target opt_target;
/** various lists of streams */ /** various lists of streams */
@ -358,13 +365,6 @@ struct grpc_chttp2_transport {
grpc_chttp2_transport_flowctl flow_control; grpc_chttp2_transport_flowctl flow_control;
/* bdp estimation */
grpc_bdp_estimator bdp_estimator;
/* pid controller */
grpc_pid_controller pid_controller;
gpr_timespec last_pid_update;
/* deframing */ /* deframing */
grpc_chttp2_deframe_transport_state deframe_state; grpc_chttp2_deframe_transport_state deframe_state;
uint8_t incoming_frame_type; uint8_t incoming_frame_type;
@ -704,13 +704,19 @@ typedef enum {
typedef struct { typedef struct {
grpc_chttp2_flowctl_urgency send_stream_update; grpc_chttp2_flowctl_urgency send_stream_update;
grpc_chttp2_flowctl_urgency send_transport_update; grpc_chttp2_flowctl_urgency send_transport_update;
grpc_chttp2_flowctl_urgency send_setting_update;
uint32_t initial_window_size;
uint32_t max_frame_size;
bool need_ping;
} grpc_chttp2_flowctl_action; } grpc_chttp2_flowctl_action;
// Reads the flow control data and returns and actionable struct that will tell // Reads the flow control data and returns and actionable struct that will tell
// chttp2 exactly what it needs to do // chttp2 exactly what it needs to do
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
const grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc);
const grpc_chttp2_stream_flowctl *sfc);
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action(
grpc_chttp2_transport_flowctl *tfc);
// Takes in a flow control action and performs all the needed operations. // Takes in a flow control action and performs all the needed operations.
void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,

@ -33,20 +33,24 @@ void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) {
estimator->bw_est = 0; estimator->bw_est = 0;
} }
bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator, bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator,
int64_t *estimate) { int64_t *estimate) {
*estimate = estimator->estimate; *estimate = estimator->estimate;
return true; return true;
} }
bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw) { bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator,
double *bw) {
*bw = estimator->bw_est; *bw = estimator->bw_est;
return true; return true;
} }
bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes) { int64_t num_bytes) {
estimator->accumulator += num_bytes; estimator->accumulator += num_bytes;
}
bool grpc_bdp_estimator_need_ping(const grpc_bdp_estimator *estimator) {
switch (estimator->ping_state) { switch (estimator->ping_state) {
case GRPC_BDP_PING_UNSCHEDULED: case GRPC_BDP_PING_UNSCHEDULED:
return true; return true;

@ -47,13 +47,15 @@ typedef struct grpc_bdp_estimator {
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name); void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name);
// Returns true if a reasonable estimate could be obtained // Returns true if a reasonable estimate could be obtained
bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator, bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator,
int64_t *estimate); int64_t *estimate);
// Returns true if a reasonable estimate could be obtained // Tracks new bytes read.
bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw); bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator, double *bw);
// Returns true if the user should schedule a ping // Returns true if the user should schedule a ping
bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes); int64_t num_bytes);
// Returns true if the user should schedule a ping
bool grpc_bdp_estimator_need_ping(const grpc_bdp_estimator *estimator);
// Schedule a ping: call in response to receiving a true from // Schedule a ping: call in response to receiving a true from
// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a // grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
// transport (but not necessarily started) // transport (but not necessarily started)

@ -43,12 +43,13 @@ static void test_get_estimate_no_samples(void) {
static void add_samples(grpc_bdp_estimator *estimator, int64_t *samples, static void add_samples(grpc_bdp_estimator *estimator, int64_t *samples,
size_t n) { size_t n) {
GPR_ASSERT(grpc_bdp_estimator_add_incoming_bytes(estimator, 1234567) == true); grpc_bdp_estimator_add_incoming_bytes(estimator, 1234567);
GPR_ASSERT(grpc_bdp_estimator_need_ping(estimator) == true);
grpc_bdp_estimator_schedule_ping(estimator); grpc_bdp_estimator_schedule_ping(estimator);
grpc_bdp_estimator_start_ping(estimator); grpc_bdp_estimator_start_ping(estimator);
for (size_t i = 0; i < n; i++) { for (size_t i = 0; i < n; i++) {
GPR_ASSERT(grpc_bdp_estimator_add_incoming_bytes(estimator, samples[i]) == grpc_bdp_estimator_add_incoming_bytes(estimator, samples[i]);
false); GPR_ASSERT(grpc_bdp_estimator_need_ping(estimator) == false);
} }
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1, GPR_TIMESPAN))); gpr_time_from_millis(1, GPR_TIMESPAN)));

Loading…
Cancel
Save