pull/12915/head
Craig Tiller 7 years ago
parent 4bbd68b208
commit e4b893b1a6
  1. 125
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 148
      src/core/ext/transport/chttp2/transport/flow_control.cc
  3. 53
      src/core/ext/transport/chttp2/transport/flow_control.h

@ -280,7 +280,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2;
t->is_client = is_client;
t->flow_control.Init();
t->flow_control.Init(t);
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@ -320,8 +320,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner));
t->flow_control.bdp_estimator.Init(t->peer_string);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
@ -345,8 +343,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
window -- this should by rights be 0 */
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
t->write_buffer_size = DEFAULT_WINDOW;
t->flow_control.enable_bdp_probe = true;
t->write_buffer_size = grpc_core::chttp2::kDefaultWindow;
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@ -451,8 +448,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE});
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
t->flow_control.enable_bdp_probe =
grpc_channel_arg_get_integer(&channel_args->args[i], {1, 0, 1});
t->flow_control->SetBdpProbe(
grpc_channel_arg_get_bool(&channel_args->args[i], true));
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_TIME_MS)) {
const int value = grpc_channel_arg_get_integer(
@ -570,9 +567,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
schedule_bdp_ping_locked(exec_ctx, t);
grpc_chttp2_act_on_flowctl_action(
exec_ctx,
grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t,
NULL);
exec_ctx, t->flow_control->MakeAction(exec_ctx), t, NULL);
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
@ -708,7 +703,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
post_destructive_reclaimer(exec_ctx, t);
}
s->flow_control.s = s;
s->flow_control.Init(t->flow_control.get(), s);
GPR_TIMER_END("init_stream", 0);
return 0;
@ -759,7 +754,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error);
grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control);
s->flow_control.Destroy();
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
@ -1626,13 +1621,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (s->id != 0) {
if (!s->read_closed) {
already_received = s->frame_storage.length;
grpc_chttp2_flowctl_incoming_bs_update(
&t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES,
already_received);
s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
already_received);
grpc_chttp2_act_on_flowctl_action(
exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
&s->flow_control),
t, s);
exec_ctx, s->flow_control->MakeAction(exec_ctx), t, s);
}
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
@ -2399,49 +2391,44 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
* INPUT PROCESSING - PARSING
*/
void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
grpc_chttp2_flowctl_action action,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
switch (action.send_stream_update) {
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
break;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(
exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL);
break;
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
break;
}
switch (action.send_transport_update) {
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
break;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
grpc_chttp2_initiate_write(
exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL);
template <class F>
static void WithUrgency(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_core::chttp2::FlowControlAction::Urgency urgency,
grpc_chttp2_initiate_write_reason reason, F action) {
switch (urgency) {
case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
break;
// this is the same as no action b/c every time the transport enters the
// writing path it will maybe do an update
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
grpc_chttp2_initiate_write(exec_ctx, t, reason);
// fallthrough
case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
action();
break;
}
if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
if (action.initial_window_size > 0) {
queue_setting_update(exec_ctx, t,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
(uint32_t)action.initial_window_size);
}
if (action.max_frame_size > 0) {
queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
(uint32_t)action.max_frame_size);
}
if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) {
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS);
}
}
}
void grpc_chttp2_act_on_flowctl_action(
grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action,
grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
WithUrgency(
exec_ctx, t, action.send_stream_update(),
GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
[exec_ctx, t, s]() { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); });
WithUrgency(exec_ctx, t, action.send_transport_update(),
GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
WithUrgency(exec_ctx, t, action.send_initial_window_update(),
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
[exec_ctx, t, &action]() {
queue_setting_update(exec_ctx, t,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
action.initial_window_size());
});
WithUrgency(
exec_ctx, t, action.send_max_frame_size_update(),
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [exec_ctx, t, &action]() {
queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
action.max_frame_size());
});
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@ -2497,7 +2484,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
t->flow_control.bdp_estimator->AddIncomingBytes(
t->flow_control->bdp_estimator()->AddIncomingBytes(
(int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
@ -2547,9 +2534,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked);
grpc_chttp2_act_on_flowctl_action(
exec_ctx,
grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t,
NULL);
exec_ctx, t->flow_control->MakeAction(exec_ctx), t, NULL);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@ -2565,7 +2550,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
t->flow_control.bdp_estimator->SchedulePing();
t->flow_control->bdp_estimator()->SchedulePing();
send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
&t->finish_bdp_ping_locked);
}
@ -2580,7 +2565,7 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
}
t->flow_control.bdp_estimator->StartPing();
t->flow_control->bdp_estimator()->StartPing();
}
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
@ -2593,7 +2578,8 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
return;
}
grpc_millis next_ping = t->flow_control.bdp_estimator->CompletePing(exec_ctx);
grpc_millis next_ping =
t->flow_control->bdp_estimator()->CompletePing(exec_ctx);
GPR_ASSERT(!t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = true;
grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping,
@ -2816,13 +2802,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
size_t cur_length = s->frame_storage.length;
if (!s->read_closed) {
grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control,
bs->next_action.max_size_hint,
cur_length);
s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint,
cur_length);
grpc_chttp2_act_on_flowctl_action(
exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
&s->flow_control),
t, s);
exec_ctx, s->flow_control->MakeAction(exec_ctx), t, s);
}
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
if (s->frame_storage.length > 0) {

@ -137,14 +137,18 @@ void FlowControlAction::Trace(grpc_chttp2_transport* t) const {
char* mf_str = fmt_uint32_diff_str(
t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
max_frame_size_);
gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s",
gpr_log(GPR_DEBUG, "t[%s], s[%s], iw:%s:%s mf:%s:%s",
UrgencyString(send_transport_update_),
UrgencyString(send_stream_update_),
UrgencyString(send_setting_update_), iw_str, mf_str);
UrgencyString(send_initial_window_update_), iw_str,
UrgencyString(send_max_frame_size_update_), mf_str);
gpr_free(iw_str);
gpr_free(mf_str);
}
TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t)
: t_(t), bdp_estimator_(t->peer_string) {}
uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
FlowControlTrace trace("t updt sent", this, nullptr);
const uint32_t target_announced_window = target_window();
@ -172,6 +176,10 @@ grpc_error* TransportFlowControl::ValidateRecvData(
return GRPC_ERROR_NONE;
}
StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc,
const grpc_chttp2_stream* s)
: tfc_(tfc), s_(s) {}
grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) {
FlowControlTrace trace(" data recv", tfc_, this);
@ -277,53 +285,76 @@ static double AdjustForMemoryPressure(grpc_resource_quota* quota,
return target;
}
FlowControlAction TransportFlowControl::UpdateForBdp(FlowControlAction action) {
double TransportFlowControl::SmoothLogBdp(grpc_exec_ctx* exec_ctx,
double value) {
grpc_millis now = grpc_exec_ctx_now(exec_ctx);
if (!pid_controller_initialized_) {
last_pid_update_ = now;
pid_controller_initialized_ = true;
pid_controller_.Init(grpc_core::PidController::Args()
.set_gain_p(4)
.set_gain_i(8)
.set_gain_d(0)
.set_initial_control_value(value)
.set_min_control_value(-1)
.set_max_control_value(25)
.set_integral_range(10));
return value;
}
double bdp_error = value - pid_controller_->last_control_value();
const double dt = (double)(now - last_pid_update_) * 1e-3;
last_pid_update_ = now;
return pid_controller_->Update(bdp_error, dt);
}
FlowControlAction::Urgency TransportFlowControl::DeltaUrgency(
int32_t value, grpc_chttp2_setting_id setting_id) {
int64_t delta =
(int64_t)value - (int64_t)t_->settings[GRPC_LOCAL_SETTINGS][setting_id];
// TODO(ncteisen): tune this
if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
return FlowControlAction::Urgency::QUEUE_UPDATE;
} else {
return FlowControlAction::Urgency::NO_ACTION_NEEDED;
}
}
FlowControlAction TransportFlowControl::UpdateForBdp(grpc_exec_ctx* exec_ctx,
FlowControlAction action) {
if (enable_bdp_probe_) {
// get bdp estimate and update initial_window accordingly.
int64_t estimate = -1;
if (bdp_estimator_.EstimateBdp(&estimate)) {
double target = 1 + log2((double)estimate);
// target might change based on how much memory pressure we are under
// TODO(ncteisen): experiment with setting target to be huge under low
// memory pressure.
target = AdjustForMemoryPressure(
grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)),
target);
// run our target through the pid controller to stabilize change.
// TODO(ncteisen): experiment with other controllers here.
double bdp_guess = get_pid_controller_guess(exec_ctx, tfc, target);
const double target =
pow(2, SmoothLogBdp(exec_ctx,
AdjustForMemoryPressure(
grpc_resource_user_quota(
grpc_endpoint_get_resource_user(t_->ep)),
1 + log2((double)estimate))));
// Though initial window 'could' drop to 0, we keep the floor at 128
tfc->target_initial_window_size =
(int32_t)GPR_CLAMP(bdp_guess, 128, INT32_MAX);
grpc_chttp2_flowctl_urgency init_window_update_urgency =
delta_is_significant(tfc, tfc->target_initial_window_size,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
action.send_setting_update = init_window_update_urgency;
action.initial_window_size = (uint32_t)tfc->target_initial_window_size;
}
target_initial_window_size_ = (int32_t)GPR_CLAMP(target, 128, INT32_MAX);
action.set_send_initial_window_update(
DeltaUrgency(target_initial_window_size_,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE),
target_initial_window_size_);
}
// get bandwidth estimate and update max_frame accordingly.
double bw_dbl = -1;
if (tfc->bdp_estimator->EstimateBandwidth(&bw_dbl)) {
if (bdp_estimator_.EstimateBandwidth(&bw_dbl)) {
// we target the max of BDP or bandwidth in microseconds.
int32_t frame_size = (int32_t)GPR_CLAMP(
GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
tfc->target_initial_window_size),
target_initial_window_size_),
16384, 16777215);
grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant(
tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE);
if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
if (frame_size_urgency > action.send_setting_update) {
action.send_setting_update = frame_size_urgency;
}
action.max_frame_size = (uint32_t)frame_size;
}
action.set_send_max_frame_size_update(
DeltaUrgency(frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE),
frame_size);
}
}
}
@ -354,56 +385,3 @@ FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
} // namespace chttp2
} // namespace grpc_core
// Returns an urgency with which to make an update
static grpc_chttp2_flowctl_urgency delta_is_significant(
const grpc_chttp2_transport_flowctl* tfc, int32_t value,
grpc_chttp2_setting_id setting_id) {
int64_t delta = (int64_t)value -
(int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id];
// TODO(ncteisen): tune this
if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
} else {
return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED;
}
}
// Takes in a target and uses the pid controller to return a stabilized
// guess at the new bdp.
static double get_pid_controller_guess(grpc_exec_ctx* exec_ctx,
grpc_chttp2_transport_flowctl* tfc,
double target) {
grpc_millis now = grpc_exec_ctx_now(exec_ctx);
if (!tfc->pid_controller_initialized) {
tfc->last_pid_update = now;
tfc->pid_controller_initialized = true;
tfc->pid_controller.Init(grpc_core::PidController::Args()
.set_gain_p(4)
.set_gain_i(8)
.set_gain_d(0)
.set_initial_control_value(target)
.set_min_control_value(-1)
.set_max_control_value(25)
.set_integral_range(10));
return pow(2, target);
}
double bdp_error = target - tfc->pid_controller->last_control_value();
double dt = (double)(now - tfc->last_pid_update) * 1e-3;
double log2_bdp_guess = tfc->pid_controller->Update(bdp_error, dt);
tfc->last_pid_update = now;
return pow(2, log2_bdp_guess);
}
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
grpc_exec_ctx* exec_ctx, grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc) {
grpc_chttp2_flowctl_action action;
memset(&action, 0, sizeof(action));
uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
if (tfc->announced_window < target_announced_window / 2) {
action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
}
TRACEACTION(tfc, action);
return action;
}

@ -22,6 +22,7 @@
#include <stdint.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/pid_controller.h"
@ -53,7 +54,12 @@ class FlowControlAction {
Urgency send_stream_update() const { return send_stream_update_; }
Urgency send_transport_update() const { return send_transport_update_; }
Urgency send_setting_update() const { return send_setting_update_; }
Urgency send_initial_window_update() const {
return send_initial_window_update_;
}
Urgency send_max_frame_size_update() const {
return send_max_frame_size_update_;
}
uint32_t initial_window_size() const { return initial_window_size_; }
uint32_t max_frame_size() const { return max_frame_size_; }
@ -61,12 +67,20 @@ class FlowControlAction {
send_stream_update_ = u;
return *this;
}
FlowControlAction& send_transport_update(Urgency u) {
FlowControlAction& set_send_transport_update(Urgency u) {
send_transport_update_ = u;
return *this;
}
FlowControlAction& send_setting_update(Urgency u) {
send_setting_update_ = u;
FlowControlAction& set_send_initial_window_update(Urgency u,
uint32_t update) {
send_initial_window_update_ = u;
initial_window_size_ = update;
return *this;
}
FlowControlAction& set_send_max_frame_size_update(Urgency u,
uint32_t update) {
send_max_frame_size_update_ = u;
max_frame_size_ = update;
return *this;
}
@ -76,7 +90,8 @@ class FlowControlAction {
private:
Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
Urgency send_setting_update_ = Urgency::NO_ACTION_NEEDED;
Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
uint32_t initial_window_size_ = 0;
uint32_t max_frame_size_ = 0;
};
@ -114,12 +129,16 @@ class FlowControlTrace {
class TransportFlowControl {
public:
TransportFlowControl(const grpc_chttp2_transport* t);
~TransportFlowControl() {
if (pid_controller_initialized_) {
pid_controller_.Destroy();
}
}
// toggle bdp probing
void SetBdpProbe(bool enable);
// returns an announce if we should send a transport update to our peer,
// else returns zero; writing_anyway indicates if a write would happen
// regardless of the send - if it is false and this function returns non-zero,
@ -177,10 +196,14 @@ class TransportFlowControl {
}
}
BdpEstimator* bdp_estimator() { return &bdp_estimator_; }
private:
FlowControlAction UpdateForBdp(grpc_exec_ctx* exec_ctx,
FlowControlAction action);
double SmoothBdp(grpc_exec_ctx* exec_ctx, double value);
double SmoothLogBdp(grpc_exec_ctx* exec_ctx, double value);
FlowControlAction::Urgency DeltaUrgency(int32_t value,
grpc_chttp2_setting_id setting_id);
const grpc_chttp2_transport* const t_;
@ -211,26 +234,28 @@ class TransportFlowControl {
int32_t target_initial_window_size_ = kDefaultWindow;
/** should we probe bdp? */
bool enable_bdp_probe_;
bool enable_bdp_probe_ = true;
/* bdp estimation */
grpc_core::BdpEstimator bdp_estimator_;
/* pid controller */
bool pid_controller_initialized_;
bool pid_controller_initialized_ = false;
grpc_core::ManualConstructor<grpc_core::PidController> pid_controller_;
grpc_millis last_pid_update_;
grpc_millis last_pid_update_ = 0;
};
class StreamFlowControl {
public:
StreamFlowControl(TransportFlowControl* tfc, grpc_chttp2_stream* s);
StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
~StreamFlowControl() {
tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
}
FlowControlAction UpdateAction(FlowControlAction action);
FlowControlAction MakeAction() { return UpdateAction(FlowControlAction()); }
FlowControlAction MakeAction(grpc_exec_ctx* exec_ctx) {
return UpdateAction(tfc_->MakeAction(exec_ctx));
}
// we have sent data on the wire, we must track this in our bookkeeping for
// the remote peer's flow control.
@ -276,16 +301,16 @@ class StreamFlowControl {
* window
* size of the transport... ie:
* remote_window = remote_window_delta + transport.initial_window_size */
int64_t remote_window_delta_;
int64_t remote_window_delta_ = 0;
/** window available for peer to send to us (as a delta on
* transport.initial_window_size)
* local_window = local_window_delta + transport.initial_window_size */
int64_t local_window_delta_;
int64_t local_window_delta_ = 0;
/** window available for peer to send to us over this stream that we have
* announced to the peer */
int64_t announced_window_delta_;
int64_t announced_window_delta_ = 0;
};
} // namespace chttp2

Loading…
Cancel
Save