Have BDP estimator schedule timers

pull/12903/head
Craig Tiller 7 years ago
parent 3be5e1e0cd
commit 00c207610b
  1. 42
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 2
      src/core/ext/transport/chttp2/transport/flow_control.cc
  3. 6
      src/core/ext/transport/chttp2/transport/internal.h
  4. 5
      src/core/lib/transport/bdp_estimator.cc
  5. 18
      src/core/lib/transport/bdp_estimator.h

@ -152,10 +152,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error); grpc_error *error);
static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error); grpc_error *error);
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error); grpc_error *error);
static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx,
void *tp, grpc_error *error);
static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error); grpc_error *error);
@ -305,6 +309,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_combiner_scheduler(t->combiner)); grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
grpc_combiner_scheduler(t->combiner)); grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
next_bdp_ping_timer_expired_locked, t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
t, grpc_combiner_scheduler(t->combiner)); t, grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
@ -564,6 +571,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
} }
schedule_bdp_ping_locked(exec_ctx, t);
grpc_chttp2_act_on_flowctl_action( grpc_chttp2_act_on_flowctl_action(
exec_ctx, exec_ctx,
grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t,
@ -619,6 +628,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
if (t->ping_state.is_delayed_ping_timer_set) { if (t->ping_state.is_delayed_ping_timer_set) {
grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer); grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer);
} }
if (t->have_next_bdp_ping_timer) {
grpc_timer_cancel(exec_ctx, &t->next_bdp_ping_timer);
}
switch (t->keepalive_state) { switch (t->keepalive_state) {
case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
@ -2434,12 +2446,14 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS); GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS);
} }
} }
#if 0
if (action.need_ping) { if (action.need_ping) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
t->flow_control.bdp_estimator->SchedulePing(); t->flow_control.bdp_estimator->SchedulePing();
send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
&t->finish_bdp_ping_locked); &t->finish_bdp_ping_locked);
} }
#endif
} }
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@ -2560,6 +2574,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action_locked", 0); GPR_TIMER_END("reading_action_locked", 0);
} }
static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
t->flow_control.bdp_estimator->SchedulePing();
send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
&t->finish_bdp_ping_locked);
}
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) { grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
@ -2579,9 +2601,27 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (GRPC_TRACER_ON(grpc_http_trace)) { if (GRPC_TRACER_ON(grpc_http_trace)) {
gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
} }
t->flow_control.bdp_estimator->CompletePing(exec_ctx); if (error == GRPC_ERROR_CANCELLED) {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
return;
}
grpc_millis next_ping = t->flow_control.bdp_estimator->CompletePing(exec_ctx);
GPR_ASSERT(!t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = true;
grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping,
&t->next_bdp_ping_timer_expired_locked);
}
static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx,
void *tp, grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
GPR_ASSERT(t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = false;
if (error == GRPC_ERROR_CANCELLED) {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
return;
}
schedule_bdp_ping_locked(exec_ctx, t);
} }
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args,

@ -459,8 +459,6 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
} }
} }
if (tfc->enable_bdp_probe) { if (tfc->enable_bdp_probe) {
action.need_ping = tfc->bdp_estimator->NeedPing(exec_ctx);
// get bdp estimate and update initial_window accordingly. // get bdp estimate and update initial_window accordingly.
int64_t estimate = -1; int64_t estimate = -1;
if (tfc->bdp_estimator->EstimateBdp(&estimate)) { if (tfc->bdp_estimator->EstimateBdp(&estimate)) {

@ -422,6 +422,7 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb *write_cb_pool; grpc_chttp2_write_cb *write_cb_pool;
/* bdp estimator */ /* bdp estimator */
grpc_closure next_bdp_ping_timer_expired_locked;
grpc_closure start_bdp_ping_locked; grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping_locked; grpc_closure finish_bdp_ping_locked;
@ -442,6 +443,10 @@ struct grpc_chttp2_transport {
/** destructive cleanup closure */ /** destructive cleanup closure */
grpc_closure destructive_reclaimer_locked; grpc_closure destructive_reclaimer_locked;
/* next bdp ping timer */
bool have_next_bdp_ping_timer;
grpc_timer next_bdp_ping_timer;
/* keep-alive ping support */ /* keep-alive ping support */
/** Closure to initialize a keepalive ping */ /** Closure to initialize a keepalive ping */
grpc_closure init_keepalive_ping_locked; grpc_closure init_keepalive_ping_locked;
@ -749,7 +754,6 @@ typedef struct {
grpc_chttp2_flowctl_urgency send_setting_update; grpc_chttp2_flowctl_urgency send_setting_update;
uint32_t initial_window_size; uint32_t initial_window_size;
uint32_t max_frame_size; uint32_t max_frame_size;
bool need_ping;
} grpc_chttp2_flowctl_action; } grpc_chttp2_flowctl_action;
// Reads the flow control data and returns and actionable struct that will tell // Reads the flow control data and returns and actionable struct that will tell

@ -33,13 +33,12 @@ BdpEstimator::BdpEstimator(const char *name)
accumulator_(0), accumulator_(0),
estimate_(65536), estimate_(65536),
ping_start_time_(gpr_time_0(GPR_CLOCK_MONOTONIC)), ping_start_time_(gpr_time_0(GPR_CLOCK_MONOTONIC)),
next_ping_scheduled_(0),
inter_ping_delay_(100.0), // start at 100ms inter_ping_delay_(100.0), // start at 100ms
stable_estimate_count_(0), stable_estimate_count_(0),
bw_est_(0), bw_est_(0),
name_(name) {} name_(name) {}
void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) { grpc_millis BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec dt_ts = gpr_time_sub(now, ping_start_time_); gpr_timespec dt_ts = gpr_time_sub(now, ping_start_time_);
double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec; double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec;
@ -79,7 +78,7 @@ void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) {
} }
ping_state_ = PingState::UNSCHEDULED; ping_state_ = PingState::UNSCHEDULED;
accumulator_ = 0; accumulator_ = 0;
next_ping_scheduled_ = grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_; return grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_;
} }
} // namespace grpc_core } // namespace grpc_core

@ -49,18 +49,6 @@ class BdpEstimator {
void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; } void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; }
// Returns true if the user should schedule a ping
bool NeedPing(grpc_exec_ctx *exec_ctx) const {
switch (ping_state_) {
case PingState::UNSCHEDULED:
return grpc_exec_ctx_now(exec_ctx) >= next_ping_scheduled_;
case PingState::SCHEDULED:
case PingState::STARTED:
return false;
}
GPR_UNREACHABLE_CODE(return false);
}
// Schedule a ping: call in response to receiving a true from // Schedule a ping: call in response to receiving a true from
// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a // grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
// transport (but not necessarily started) // transport (but not necessarily started)
@ -88,8 +76,8 @@ class BdpEstimator {
ping_start_time_ = gpr_now(GPR_CLOCK_MONOTONIC); ping_start_time_ = gpr_now(GPR_CLOCK_MONOTONIC);
} }
// Completes a previously started ping // Completes a previously started ping, returns when to schedule the next one
void CompletePing(grpc_exec_ctx *exec_ctx); grpc_millis CompletePing(grpc_exec_ctx *exec_ctx) GRPC_MUST_USE_RESULT;
private: private:
enum class PingState { UNSCHEDULED, SCHEDULED, STARTED }; enum class PingState { UNSCHEDULED, SCHEDULED, STARTED };
@ -99,8 +87,6 @@ class BdpEstimator {
int64_t estimate_; int64_t estimate_;
// when was the current ping started? // when was the current ping started?
gpr_timespec ping_start_time_; gpr_timespec ping_start_time_;
// when should the next ping start?
grpc_millis next_ping_scheduled_;
int inter_ping_delay_; int inter_ping_delay_;
int stable_estimate_count_; int stable_estimate_count_;
double bw_est_; double bw_est_;

Loading…
Cancel
Save