Merge pull request #11831 from ncteisen/flow-control-part2

Flow Control Part 2: BDP
reviewable/pr12024/r9^2
Noah Eisen 7 years ago committed by GitHub
commit d1ba31d134
  1. 118
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 169
      src/core/ext/transport/chttp2/transport/flow_control.c
  3. 32
      src/core/ext/transport/chttp2/transport/internal.h
  4. 10
      src/core/lib/transport/bdp_estimator.c
  5. 10
      src/core/lib/transport/bdp_estimator.h
  6. 7
      test/core/transport/bdp_estimator_test.c

@ -304,10 +304,10 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner));
grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string);
t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_bdp_estimator_init(&t->flow_control.bdp_estimator, t->peer_string);
t->flow_control.last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_pid_controller_init(
&t->pid_controller,
&t->flow_control.pid_controller,
(grpc_pid_controller_args){.gain_p = 4,
.gain_i = 8,
.gain_d = 0,
@ -340,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
t->write_buffer_size = DEFAULT_WINDOW;
t->enable_bdp_probe = true;
t->flow_control.enable_bdp_probe = true;
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@ -457,7 +457,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
(grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE});
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
t->enable_bdp_probe = grpc_channel_arg_get_integer(
t->flow_control.enable_bdp_probe = grpc_channel_arg_get_integer(
&channel_args->args[i], (grpc_integer_options){1, 0, 1});
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_TIME_MS)) {
@ -2253,46 +2253,27 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
break;
}
}
static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
double bdp_dbl) {
// initial window size bounded [1,2^31-1], but we set the min to 128.
int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX);
int64_t delta =
(int64_t)bdp -
(int64_t)t->settings[GRPC_LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
if (delta == 0 || (delta > -bdp / 10 && delta < bdp / 10)) {
return;
if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
if (action.initial_window_size > 0) {
queue_setting_update(exec_ctx, t,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
(uint32_t)action.initial_window_size);
}
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace) ||
GRPC_TRACER_ON(grpc_flowctl_trace)) {
gpr_log(GPR_DEBUG, "%s | %p[%s] | update initial window size to %d",
t->peer_string, t, t->is_client ? "cli" : "svr", (int)bdp);
if (action.max_frame_size > 0) {
queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
(uint32_t)action.max_frame_size);
}
queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
(uint32_t)bdp);
}
static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
double bw_dbl, double bdp_dbl) {
int32_t bdp = (int32_t)GPR_CLAMP(bdp_dbl, 128.0, INT32_MAX);
int32_t target = (int32_t)GPR_MAX(bw_dbl / 1000, bdp);
// frame size is bounded [2^14,2^24-1]
int32_t frame_size = GPR_CLAMP(target, 16384, 16777215);
int64_t delta = (int64_t)frame_size -
(int64_t)t->settings[GRPC_LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
if (delta == 0 || (delta > -frame_size / 10 && delta < frame_size / 10)) {
return;
if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) {
grpc_chttp2_initiate_write(exec_ctx, t, "immediate setting update");
}
if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
gpr_log(GPR_DEBUG, "%s: update max_frame size to %d", t->peer_string,
(int)frame_size);
}
queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
(uint32_t)frame_size);
if (action.need_ping) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator);
send_ping_locked(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
&t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
}
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@ -2330,7 +2311,6 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("reading_action_locked", 0);
grpc_chttp2_transport *t = tp;
bool need_bdp_ping = false;
GRPC_ERROR_REF(error);
@ -2349,11 +2329,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
if (grpc_bdp_estimator_add_incoming_bytes(
&t->bdp_estimator,
(int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]))) {
need_bdp_ping = true;
}
grpc_bdp_estimator_add_incoming_bytes(
&t->flow_control.bdp_estimator,
(int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
}
@ -2400,45 +2378,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked);
if (t->enable_bdp_probe) {
if (need_bdp_ping) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
grpc_bdp_estimator_schedule_ping(&t->bdp_estimator);
send_ping_locked(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
&t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
}
int64_t estimate = -1;
double bdp_guess = -1;
if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) {
double target = 1 + log2((double)estimate);
double memory_pressure = grpc_resource_quota_get_memory_pressure(
grpc_resource_user_quota(grpc_endpoint_get_resource_user(t->ep)));
if (memory_pressure > 0.8) {
target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
}
double bdp_error =
target - grpc_pid_controller_last(&t->pid_controller);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update);
double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
if (dt > 0.1) {
dt = 0.1;
}
double log2_bdp_guess =
grpc_pid_controller_update(&t->pid_controller, bdp_error, dt);
bdp_guess = pow(2, log2_bdp_guess);
update_bdp(exec_ctx, t, bdp_guess);
t->last_pid_update = now;
}
double bw = -1;
if (grpc_bdp_estimator_get_bw(&t->bdp_estimator, &bw)) {
update_frame(exec_ctx, t, bw, bdp_guess);
}
}
grpc_chttp2_act_on_flowctl_action(
exec_ctx, grpc_chttp2_flowctl_get_bdp_action(&t->flow_control), t,
NULL);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@ -2461,7 +2403,7 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
}
grpc_bdp_estimator_start_ping(&t->bdp_estimator);
grpc_bdp_estimator_start_ping(&t->flow_control.bdp_estimator);
}
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
@ -2470,7 +2412,7 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (GRPC_TRACER_ON(grpc_http_trace)) {
gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
}
grpc_bdp_estimator_complete_ping(&t->bdp_estimator);
grpc_bdp_estimator_complete_ping(&t->flow_control.bdp_estimator);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
}

@ -18,6 +18,7 @@
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include <math.h>
#include <string.h>
#include <grpc/support/alloc.h>
@ -39,6 +40,8 @@ typedef struct {
int64_t remote_window_delta;
int64_t local_window_delta;
int64_t announced_window_delta;
uint32_t local_init_window;
uint32_t local_max_frame;
} shadow_flow_control;
static void pretrace(shadow_flow_control* shadow_fc,
@ -54,14 +57,28 @@ static void pretrace(shadow_flow_control* shadow_fc,
}
}
static char* fmt_str(int64_t old, int64_t new) {
#define TRACE_PADDING 30
static char* fmt_int64_diff_str(int64_t old, int64_t new) {
char* str;
if (old != new) {
gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new);
} else {
gpr_asprintf(&str, "%" PRId64 "", old);
}
char* str_lp = gpr_leftpad(str, ' ', 30);
char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
gpr_free(str);
return str_lp;
}
static char* fmt_uint32_diff_str(uint32_t old, uint32_t new) {
char* str;
if (new > 0 && old != new) {
gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old, new);
} else {
gpr_asprintf(&str, "%" PRIu32 "", old);
}
char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
gpr_free(str);
return str_lp;
}
@ -75,24 +92,28 @@ static void posttrace(shadow_flow_control* shadow_fc,
uint32_t remote_window =
tfc->t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
char* trw_str = fmt_str(shadow_fc->remote_window, tfc->remote_window);
char* tlw_str = fmt_str(shadow_fc->target_window,
char* trw_str =
fmt_int64_diff_str(shadow_fc->remote_window, tfc->remote_window);
char* tlw_str = fmt_int64_diff_str(shadow_fc->target_window,
grpc_chttp2_target_announced_window(tfc));
char* taw_str = fmt_str(shadow_fc->announced_window, tfc->announced_window);
char* taw_str =
fmt_int64_diff_str(shadow_fc->announced_window, tfc->announced_window);
char* srw_str;
char* slw_str;
char* saw_str;
if (sfc != NULL) {
srw_str = fmt_str(shadow_fc->remote_window_delta + remote_window,
srw_str = fmt_int64_diff_str(shadow_fc->remote_window_delta + remote_window,
sfc->remote_window_delta + remote_window);
slw_str = fmt_str(shadow_fc->local_window_delta + acked_local_window,
slw_str =
fmt_int64_diff_str(shadow_fc->local_window_delta + acked_local_window,
sfc->local_window_delta + acked_local_window);
saw_str = fmt_str(shadow_fc->announced_window_delta + acked_local_window,
saw_str = fmt_int64_diff_str(
shadow_fc->announced_window_delta + acked_local_window,
sfc->announced_window_delta + acked_local_window);
} else {
srw_str = gpr_leftpad("", ' ', 30);
slw_str = gpr_leftpad("", ' ', 30);
saw_str = gpr_leftpad("", ' ', 30);
srw_str = gpr_leftpad("", ' ', TRACE_PADDING);
slw_str = gpr_leftpad("", ' ', TRACE_PADDING);
saw_str = gpr_leftpad("", ' ', TRACE_PADDING);
}
gpr_log(GPR_DEBUG,
"%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
@ -120,10 +141,21 @@ static char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) {
GPR_UNREACHABLE_CODE(return "unknown");
}
static void trace_action(grpc_chttp2_flowctl_action action) {
gpr_log(GPR_DEBUG, "transport: %s, stream: %s",
static void trace_action(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_flowctl_action action) {
char* iw_str = fmt_uint32_diff_str(
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
action.initial_window_size);
char* mf_str = fmt_uint32_diff_str(
tfc->t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
action.max_frame_size);
gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s",
urgency_to_string(action.send_transport_update),
urgency_to_string(action.send_stream_update));
urgency_to_string(action.send_stream_update),
urgency_to_string(action.send_setting_update), iw_str, mf_str);
gpr_free(iw_str);
gpr_free(mf_str);
}
#define PRETRACE(tfc, sfc) \
@ -131,11 +163,12 @@ static void trace_action(grpc_chttp2_flowctl_action action) {
GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc))
#define POSTTRACE(tfc, sfc, reason) \
GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason))
#define TRACEACTION(action) GRPC_FLOW_CONTROL_IF_TRACING(trace_action(action))
#define TRACEACTION(tfc, action) \
GRPC_FLOW_CONTROL_IF_TRACING(trace_action(tfc, action))
#else
#define PRETRACE(tfc, sfc)
#define POSTTRACE(tfc, sfc, reason)
#define TRACEACTION(action)
#define TRACEACTION(tfc, action)
#endif
/* How many bytes of incoming flow control would we like to advertise */
@ -342,15 +375,58 @@ void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc,
announced_window_delta_preupdate(tfc, sfc);
}
// Returns an urgency with which to make an update
static grpc_chttp2_flowctl_urgency delta_is_significant(
const grpc_chttp2_transport_flowctl* tfc, int32_t value,
grpc_chttp2_setting_id setting_id) {
int64_t delta = (int64_t)value -
(int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id];
// TODO(ncteisen): tune this
if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
} else {
return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED;
}
}
// Takes in a target and uses the pid controller to return a stabilized
// guess at the new bdp.
static double get_pid_controller_guess(grpc_chttp2_transport_flowctl* tfc,
double target) {
double bdp_error = target - grpc_pid_controller_last(&tfc->pid_controller);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec dt_timespec = gpr_time_sub(now, tfc->last_pid_update);
double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
if (dt > 0.1) {
dt = 0.1;
}
double log2_bdp_guess =
grpc_pid_controller_update(&tfc->pid_controller, bdp_error, dt);
tfc->last_pid_update = now;
return pow(2, log2_bdp_guess);
}
// Take in a target and modifies it based on the memory pressure of the system
static double get_target_under_memory_pressure(
grpc_chttp2_transport_flowctl* tfc, double target) {
// do not increase window under heavy memory pressure.
double memory_pressure = grpc_resource_quota_get_memory_pressure(
grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep)));
if (memory_pressure > 0.8) {
target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
}
return target;
}
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
const grpc_chttp2_transport_flowctl* tfc,
const grpc_chttp2_stream_flowctl* sfc) {
grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
grpc_chttp2_flowctl_action action;
memset(&action, 0, sizeof(action));
uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
if (tfc->announced_window < target_announced_window / 2) {
action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
}
// TODO(ncteisen): tune this
if (sfc != NULL && !sfc->s->read_closed) {
uint32_t sent_init_window =
tfc->t->settings[GRPC_SENT_SETTINGS]
@ -364,6 +440,61 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
}
}
TRACEACTION(action);
TRACEACTION(tfc, action);
return action;
}
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action(
grpc_chttp2_transport_flowctl* tfc) {
grpc_chttp2_flowctl_action action;
memset(&action, 0, sizeof(action));
if (tfc->enable_bdp_probe) {
action.need_ping = grpc_bdp_estimator_need_ping(&tfc->bdp_estimator);
// get bdp estimate and update initial_window accordingly.
int64_t estimate = -1;
int32_t bdp = -1;
if (grpc_bdp_estimator_get_estimate(&tfc->bdp_estimator, &estimate)) {
double target = 1 + log2((double)estimate);
// target might change based on how much memory pressure we are under
// TODO(ncteisen): experiment with setting target to be huge under low
// memory pressure.
target = get_target_under_memory_pressure(tfc, target);
// run our target through the pid controller to stabilize change.
// TODO(ncteisen): experiment with other controllers here.
double bdp_guess = get_pid_controller_guess(tfc, target);
// Though initial window 'could' drop to 0, we keep the floor at 128
bdp = GPR_MAX((int32_t)bdp_guess, 128);
grpc_chttp2_flowctl_urgency init_window_update_urgency =
delta_is_significant(tfc, bdp,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
action.send_setting_update = init_window_update_urgency;
action.initial_window_size = (uint32_t)bdp;
}
}
// get bandwidth estimate and update max_frame accordingly.
double bw_dbl = -1;
if (grpc_bdp_estimator_get_bw(&tfc->bdp_estimator, &bw_dbl)) {
// we target the max of BDP or bandwidth in microseconds.
int32_t frame_size =
GPR_CLAMP(GPR_MAX((int32_t)bw_dbl / 1000, bdp), 16384, 16777215);
grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant(
tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE);
if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
if (frame_size_urgency > action.send_setting_update) {
action.send_setting_update = frame_size_urgency;
}
action.max_frame_size = (uint32_t)frame_size;
}
}
}
TRACEACTION(tfc, action);
return action;
}

@ -238,7 +238,17 @@ typedef struct {
* to send WINDOW_UPDATE frames. */
int64_t announced_window;
// read only pointer back to transport for certain data
/** should we probe bdp? */
bool enable_bdp_probe;
/* bdp estimation */
grpc_bdp_estimator bdp_estimator;
/* pid controller */
grpc_pid_controller pid_controller;
gpr_timespec last_pid_update;
// pointer back to transport for tracing
const grpc_chttp2_transport *t;
} grpc_chttp2_transport_flowctl;
@ -261,9 +271,6 @@ struct grpc_chttp2_transport {
/** is there a read request to the endpoint outstanding? */
uint8_t endpoint_reading;
/** should we probe bdp? */
bool enable_bdp_probe;
grpc_chttp2_optimization_target opt_target;
/** various lists of streams */
@ -358,13 +365,6 @@ struct grpc_chttp2_transport {
grpc_chttp2_transport_flowctl flow_control;
/* bdp estimation */
grpc_bdp_estimator bdp_estimator;
/* pid controller */
grpc_pid_controller pid_controller;
gpr_timespec last_pid_update;
/* deframing */
grpc_chttp2_deframe_transport_state deframe_state;
uint8_t incoming_frame_type;
@ -704,13 +704,19 @@ typedef enum {
typedef struct {
grpc_chttp2_flowctl_urgency send_stream_update;
grpc_chttp2_flowctl_urgency send_transport_update;
grpc_chttp2_flowctl_urgency send_setting_update;
uint32_t initial_window_size;
uint32_t max_frame_size;
bool need_ping;
} grpc_chttp2_flowctl_action;
// Reads the flow control data and returns and actionable struct that will tell
// chttp2 exactly what it needs to do
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
const grpc_chttp2_transport_flowctl *tfc,
const grpc_chttp2_stream_flowctl *sfc);
grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc);
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action(
grpc_chttp2_transport_flowctl *tfc);
// Takes in a flow control action and performs all the needed operations.
void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,

@ -33,20 +33,24 @@ void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) {
estimator->bw_est = 0;
}
bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator,
int64_t *estimate) {
*estimate = estimator->estimate;
return true;
}
bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw) {
bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator,
double *bw) {
*bw = estimator->bw_est;
return true;
}
bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes) {
estimator->accumulator += num_bytes;
}
bool grpc_bdp_estimator_need_ping(const grpc_bdp_estimator *estimator) {
switch (estimator->ping_state) {
case GRPC_BDP_PING_UNSCHEDULED:
return true;

@ -47,13 +47,15 @@ typedef struct grpc_bdp_estimator {
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name);
// Returns true if a reasonable estimate could be obtained
bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator,
int64_t *estimate);
// Returns true if a reasonable estimate could be obtained
bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw);
// Tracks new bytes read.
bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator, double *bw);
// Returns true if the user should schedule a ping
bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes);
// Returns true if the user should schedule a ping
bool grpc_bdp_estimator_need_ping(const grpc_bdp_estimator *estimator);
// Schedule a ping: call in response to receiving a true from
// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
// transport (but not necessarily started)

@ -43,12 +43,13 @@ static void test_get_estimate_no_samples(void) {
static void add_samples(grpc_bdp_estimator *estimator, int64_t *samples,
size_t n) {
GPR_ASSERT(grpc_bdp_estimator_add_incoming_bytes(estimator, 1234567) == true);
grpc_bdp_estimator_add_incoming_bytes(estimator, 1234567);
GPR_ASSERT(grpc_bdp_estimator_need_ping(estimator) == true);
grpc_bdp_estimator_schedule_ping(estimator);
grpc_bdp_estimator_start_ping(estimator);
for (size_t i = 0; i < n; i++) {
GPR_ASSERT(grpc_bdp_estimator_add_incoming_bytes(estimator, samples[i]) ==
false);
grpc_bdp_estimator_add_incoming_bytes(estimator, samples[i]);
GPR_ASSERT(grpc_bdp_estimator_need_ping(estimator) == false);
}
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1, GPR_TIMESPAN)));

Loading…
Cancel
Save