From 00c207610bdcb7b34701115a4799c081294f8e3b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sun, 8 Oct 2017 21:50:33 -0700 Subject: [PATCH] Have BDP estimator schedule timers --- .../chttp2/transport/chttp2_transport.cc | 44 ++++++++++++++++++- .../chttp2/transport/flow_control.cc | 2 - .../ext/transport/chttp2/transport/internal.h | 6 ++- src/core/lib/transport/bdp_estimator.cc | 5 +-- src/core/lib/transport/bdp_estimator.h | 18 +------- 5 files changed, 51 insertions(+), 24 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 0ef06ae6e00..97ac59fa868 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -152,10 +152,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); +static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, + void *tp, grpc_error *error); static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); @@ -305,6 +309,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, + next_bdp_ping_timer_expired_locked, t, + grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, @@ -564,6 +571,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } + schedule_bdp_ping_locked(exec_ctx, t); + grpc_chttp2_act_on_flowctl_action( exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, @@ -619,6 +628,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, if (t->ping_state.is_delayed_ping_timer_set) { grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer); } + if (t->have_next_bdp_ping_timer) { + grpc_timer_cancel(exec_ctx, &t->next_bdp_ping_timer); + } switch (t->keepalive_state) { case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); @@ -2434,12 +2446,14 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS); } } +#if 0 if (action.need_ping) { GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); t->flow_control.bdp_estimator->SchedulePing(); send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); } +#endif } static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, @@ -2560,6 +2574,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action_locked", 0); } +static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); + t->flow_control.bdp_estimator->SchedulePing(); + send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, + &t->finish_bdp_ping_locked); +} + static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; @@ -2579,9 +2601,27 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); } - t->flow_control.bdp_estimator->CompletePing(exec_ctx); + if (error == GRPC_ERROR_CANCELLED) { + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); + return; + } + grpc_millis next_ping = t->flow_control.bdp_estimator->CompletePing(exec_ctx); + GPR_ASSERT(!t->have_next_bdp_ping_timer); + t->have_next_bdp_ping_timer = true; + grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping, + &t->next_bdp_ping_timer_expired_locked); +} - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, + void *tp, grpc_error *error) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; + GPR_ASSERT(t->have_next_bdp_ping_timer); + t->have_next_bdp_ping_timer = false; + if (error == GRPC_ERROR_CANCELLED) { + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); + return; + } + schedule_bdp_ping_locked(exec_ctx, t); } void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index 60c43d840a4..d0e80c4bd5d 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -459,8 +459,6 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( } } if (tfc->enable_bdp_probe) { - action.need_ping = tfc->bdp_estimator->NeedPing(exec_ctx); - // get bdp estimate and update initial_window accordingly. int64_t estimate = -1; if (tfc->bdp_estimator->EstimateBdp(&estimate)) { diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 05b677dd4b6..b4eb033a47f 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -422,6 +422,7 @@ struct grpc_chttp2_transport { grpc_chttp2_write_cb *write_cb_pool; /* bdp estimator */ + grpc_closure next_bdp_ping_timer_expired_locked; grpc_closure start_bdp_ping_locked; grpc_closure finish_bdp_ping_locked; @@ -442,6 +443,10 @@ struct grpc_chttp2_transport { /** destructive cleanup closure */ grpc_closure destructive_reclaimer_locked; + /* next bdp ping timer */ + bool have_next_bdp_ping_timer; + grpc_timer next_bdp_ping_timer; + /* keep-alive ping support */ /** Closure to initialize a keepalive ping */ grpc_closure init_keepalive_ping_locked; @@ -749,7 +754,6 @@ typedef struct { grpc_chttp2_flowctl_urgency send_setting_update; uint32_t initial_window_size; uint32_t max_frame_size; - bool need_ping; } grpc_chttp2_flowctl_action; // Reads the flow control data and returns and actionable struct that will tell diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc index 2a1c97c84e3..f1597014b17 100644 --- a/src/core/lib/transport/bdp_estimator.cc +++ b/src/core/lib/transport/bdp_estimator.cc @@ -33,13 +33,12 @@ BdpEstimator::BdpEstimator(const char *name) accumulator_(0), estimate_(65536), ping_start_time_(gpr_time_0(GPR_CLOCK_MONOTONIC)), - next_ping_scheduled_(0), inter_ping_delay_(100.0), // start at 100ms stable_estimate_count_(0), bw_est_(0), name_(name) {} -void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) { +grpc_millis BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec dt_ts = gpr_time_sub(now, ping_start_time_); double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec; @@ -79,7 +78,7 @@ void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) { } ping_state_ = PingState::UNSCHEDULED; accumulator_ = 0; - next_ping_scheduled_ = grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_; + return grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_; } } // namespace grpc_core diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index a4acf30dedf..cda37f35a43 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -49,18 +49,6 @@ class BdpEstimator { void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; } - // Returns true if the user should schedule a ping - bool NeedPing(grpc_exec_ctx *exec_ctx) const { - switch (ping_state_) { - case PingState::UNSCHEDULED: - return grpc_exec_ctx_now(exec_ctx) >= next_ping_scheduled_; - case PingState::SCHEDULED: - case PingState::STARTED: - return false; - } - GPR_UNREACHABLE_CODE(return false); - } - // Schedule a ping: call in response to receiving a true from // grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a // transport (but not necessarily started) @@ -88,8 +76,8 @@ class BdpEstimator { ping_start_time_ = gpr_now(GPR_CLOCK_MONOTONIC); } - // Completes a previously started ping - void CompletePing(grpc_exec_ctx *exec_ctx); + // Completes a previously started ping, returns when to schedule the next one + grpc_millis CompletePing(grpc_exec_ctx *exec_ctx) GRPC_MUST_USE_RESULT; private: enum class PingState { UNSCHEDULED, SCHEDULED, STARTED }; @@ -99,8 +87,6 @@ class BdpEstimator { int64_t estimate_; // when was the current ping started? gpr_timespec ping_start_time_; - // when should the next ping start? - grpc_millis next_ping_scheduled_; int inter_ping_delay_; int stable_estimate_count_; double bw_est_;