Reflow to remove bool

pull/12915/head
Craig Tiller 7 years ago
parent e3493e5c30
commit 96582b7f5e
  1. 16
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 92
      src/core/ext/transport/chttp2/transport/flow_control.cc
  3. 19
      src/core/ext/transport/chttp2/transport/flow_control.h
  4. 11
      src/core/lib/transport/bdp_estimator.h

@ -281,7 +281,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2;
t->is_client = is_client;
t->flow_control.Init(t);
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@ -389,6 +388,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
bool enable_bdp = true;
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key,
@ -449,8 +450,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE});
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
t->flow_control->SetBdpProbe(
grpc_channel_arg_get_bool(&channel_args->args[i], true));
enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_TIME_MS)) {
const int value = grpc_channel_arg_get_integer(
@ -545,6 +545,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
t->flow_control.Init(exec_ctx, t, enable_bdp);
/* No pings allowed before receiving a header or data frame. */
t->ping_state.pings_before_data_required = 0;
t->ping_state.is_delayed_ping_timer_set = false;
@ -565,13 +567,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
if (t->flow_control->bdp_probe()) {
if (enable_bdp) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
schedule_bdp_ping_locked(exec_ctx, t);
}
grpc_chttp2_act_on_flowctl_action(
exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, NULL);
grpc_chttp2_act_on_flowctl_action(
exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, NULL);
}
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);

@ -147,8 +147,21 @@ void FlowControlAction::Trace(grpc_chttp2_transport* t) const {
gpr_free(mf_str);
}
TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t)
: t_(t), bdp_estimator_(t->peer_string) {}
TransportFlowControl::TransportFlowControl(grpc_exec_ctx* exec_ctx,
const grpc_chttp2_transport* t,
bool enable_bdp_probe)
: t_(t),
enable_bdp_probe_(enable_bdp_probe),
bdp_estimator_(t->peer_string),
last_pid_update_(grpc_exec_ctx_now(exec_ctx)),
pid_controller_(grpc_core::PidController::Args()
.set_gain_p(4)
.set_gain_i(8)
.set_gain_d(0)
.set_initial_control_value(TargetLogBdp())
.set_min_control_value(-1)
.set_max_control_value(25)
.set_integral_range(10)) {}
uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
FlowControlTrace trace("t updt sent", this, nullptr);
@ -287,26 +300,19 @@ static double AdjustForMemoryPressure(grpc_resource_quota* quota,
return target;
}
double TransportFlowControl::TargetLogBdp() {
return AdjustForMemoryPressure(
grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)),
1 + log2(bdp_estimator_.EstimateBdp()));
}
double TransportFlowControl::SmoothLogBdp(grpc_exec_ctx* exec_ctx,
double value) {
grpc_millis now = grpc_exec_ctx_now(exec_ctx);
if (!pid_controller_initialized_) {
last_pid_update_ = now;
pid_controller_initialized_ = true;
pid_controller_.Init(grpc_core::PidController::Args()
.set_gain_p(4)
.set_gain_i(8)
.set_gain_d(0)
.set_initial_control_value(value)
.set_min_control_value(-1)
.set_max_control_value(25)
.set_integral_range(10));
return value;
}
double bdp_error = value - pid_controller_->last_control_value();
double bdp_error = value - pid_controller_.last_control_value();
const double dt = (double)(now - last_pid_update_) * 1e-3;
last_pid_update_ = now;
return pid_controller_->Update(bdp_error, dt);
return pid_controller_.Update(bdp_error, dt);
}
FlowControlAction::Urgency TransportFlowControl::DeltaUrgency(
@ -326,39 +332,29 @@ FlowControlAction TransportFlowControl::PeriodicUpdate(
FlowControlAction action;
if (enable_bdp_probe_) {
// get bdp estimate and update initial_window accordingly.
int64_t estimate = -1;
if (bdp_estimator_.EstimateBdp(&estimate)) {
// target might change based on how much memory pressure we are under
// TODO(ncteisen): experiment with setting target to be huge under low
// memory pressure.
const double target =
pow(2, SmoothLogBdp(exec_ctx,
AdjustForMemoryPressure(
grpc_resource_user_quota(
grpc_endpoint_get_resource_user(t_->ep)),
1 + log2((double)estimate))));
// Though initial window 'could' drop to 0, we keep the floor at 128
target_initial_window_size_ = (int32_t)GPR_CLAMP(target, 128, INT32_MAX);
action.set_send_initial_window_update(
DeltaUrgency(target_initial_window_size_,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE),
target_initial_window_size_);
}
// target might change based on how much memory pressure we are under
// TODO(ncteisen): experiment with setting target to be huge under low
// memory pressure.
const double target = pow(2, SmoothLogBdp(exec_ctx, TargetLogBdp()));
// Though initial window 'could' drop to 0, we keep the floor at 128
target_initial_window_size_ = (int32_t)GPR_CLAMP(target, 128, INT32_MAX);
action.set_send_initial_window_update(
DeltaUrgency(target_initial_window_size_,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE),
target_initial_window_size_);
// get bandwidth estimate and update max_frame accordingly.
double bw_dbl = -1;
if (bdp_estimator_.EstimateBandwidth(&bw_dbl)) {
// we target the max of BDP or bandwidth in microseconds.
int32_t frame_size = (int32_t)GPR_CLAMP(
GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
target_initial_window_size_),
16384, 16777215);
action.set_send_max_frame_size_update(
DeltaUrgency(frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE),
frame_size);
}
double bw_dbl = bdp_estimator_.EstimateBandwidth();
// we target the max of BDP or bandwidth in microseconds.
int32_t frame_size = (int32_t)GPR_CLAMP(
GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
target_initial_window_size_),
16384, 16777215);
action.set_send_max_frame_size_update(
DeltaUrgency(frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE),
frame_size);
}
return UpdateAction(action);
}

@ -127,16 +127,9 @@ class FlowControlTrace {
class TransportFlowControl {
public:
TransportFlowControl(const grpc_chttp2_transport* t);
~TransportFlowControl() {
if (pid_controller_initialized_) {
pid_controller_.Destroy();
}
}
// toggle bdp probing
// TODO(ctiller): make this safe to dynamically toggle
void SetBdpProbe(bool enable) { enable_bdp_probe_ = enable; }
TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t,
bool enable_bdp_probe);
~TransportFlowControl() {}
bool bdp_probe() const { return enable_bdp_probe_; }
@ -210,6 +203,7 @@ class TransportFlowControl {
}
private:
double TargetLogBdp();
double SmoothLogBdp(grpc_exec_ctx* exec_ctx, double value);
FlowControlAction::Urgency DeltaUrgency(int32_t value,
grpc_chttp2_setting_id setting_id);
@ -246,14 +240,13 @@ class TransportFlowControl {
int32_t target_initial_window_size_ = kDefaultWindow;
/** should we probe bdp? */
bool enable_bdp_probe_ = true;
const bool enable_bdp_probe_;
/* bdp estimation */
grpc_core::BdpEstimator bdp_estimator_;
/* pid controller */
bool pid_controller_initialized_ = false;
grpc_core::ManualConstructor<grpc_core::PidController> pid_controller_;
grpc_core::PidController pid_controller_;
grpc_millis last_pid_update_ = 0;
};

@ -40,15 +40,8 @@ class BdpEstimator {
explicit BdpEstimator(const char *name);
~BdpEstimator() {}
// Returns true if a reasonable estimate could be obtained
bool EstimateBdp(int64_t *estimate_out) const {
*estimate_out = estimate_;
return true;
}
bool EstimateBandwidth(double *bw_out) const {
*bw_out = bw_est_;
return true;
}
int64_t EstimateBdp() const { return estimate_; }
double EstimateBandwidth() const { return bw_est_; }
void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; }

Loading…
Cancel
Save