From 31620ca54f2bd7a799d70d77abc29020e83a6798 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 15 Sep 2017 12:40:24 -0700 Subject: [PATCH] Squash ping queues, make BDP pings non-initiating, make BDP queries speedup/slowdown --- .../chttp2/transport/chttp2_transport.c | 63 +++++++------------ .../transport/chttp2/transport/flow_control.c | 9 --- .../ext/transport/chttp2/transport/internal.h | 12 +--- .../ext/transport/chttp2/transport/writing.c | 41 +++--------- src/core/lib/transport/bdp_estimator.c | 30 ++++++++- src/core/lib/transport/bdp_estimator.h | 5 ++ 6 files changed, 66 insertions(+), 94 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 79a9ed827f7..ecd4322b436 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -144,11 +144,9 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); -static void send_ping_locked( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate, - grpc_closure *on_complete, - grpc_chttp2_initiate_write_reason initiate_write_reason); +static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_closure *on_initiate, + grpc_closure *on_complete); static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); @@ -892,9 +890,6 @@ static void inc_initiate_write_reason( case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx); break; - case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING(exec_ctx); - break; case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING( exec_ctx); @@ -1701,28 +1696,21 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ - for (size_t i = 0; i < GRPC_CHTTP2_PING_TYPE_COUNT; i++) { - grpc_chttp2_ping_queue *pq = &t->ping_queues[i]; - for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { - grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]); - } + grpc_chttp2_ping_queue *pq = &t->ping_queue; + for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { + grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]); } GRPC_ERROR_UNREF(error); } -static void send_ping_locked( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate, - grpc_closure *on_ack, - grpc_chttp2_initiate_write_reason initiate_write_reason) { - grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type]; +static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_closure *on_initiate, grpc_closure *on_ack) { + grpc_chttp2_ping_queue *pq = &t->ping_queue; grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, GRPC_ERROR_NONE); - if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, - GRPC_ERROR_NONE)) { - grpc_chttp2_initiate_write(exec_ctx, t, initiate_write_reason); - } + grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, + GRPC_ERROR_NONE); } static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, @@ -1735,8 +1723,7 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint64_t id) { - grpc_chttp2_ping_queue *pq = - &t->ping_queues[id % GRPC_CHTTP2_PING_TYPE_COUNT]; + grpc_chttp2_ping_queue *pq = &t->ping_queue; if (pq->inflight_id != id) { char *from = grpc_endpoint_get_peer(t->ep); gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id); @@ -1806,9 +1793,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->send_ping) { - send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL, - op->send_ping, - GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); + send_ping_locked(exec_ctx, t, NULL, op->send_ping); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); } if (op->on_connectivity_state_change != NULL) { @@ -2449,10 +2436,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, if (action.need_ping) { GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator); - send_ping_locked(exec_ctx, t, - GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, - &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked, - GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING); + send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, + &t->finish_bdp_ping_locked); } } @@ -2560,7 +2545,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_locked); grpc_chttp2_act_on_flowctl_action( - exec_ctx, grpc_chttp2_flowctl_get_bdp_action(&t->flow_control), t, + exec_ctx, grpc_chttp2_flowctl_get_action(&t->flow_control, NULL), t, NULL); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { @@ -2647,10 +2632,10 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_stream_map_size(&t->stream_map) > 0) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); - send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, - &t->start_keepalive_ping_locked, - &t->finish_keepalive_ping_locked, - GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); + send_ping_locked(exec_ctx, t, &t->start_keepalive_ping_locked, + &t->finish_keepalive_ping_locked); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init( @@ -3141,8 +3126,6 @@ const char *grpc_chttp2_initiate_write_reason_string( return "TRANSPORT_FLOW_CONTROL"; case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: return "SEND_SETTINGS"; - case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING: - return "BDP_ESTIMATOR_PING"; case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: return "FLOW_CONTROL_UNSTALLED_BY_SETTING"; case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE: diff --git a/src/core/ext/transport/chttp2/transport/flow_control.c b/src/core/ext/transport/chttp2/transport/flow_control.c index 569a6349d3d..704c0ad7bc7 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.c +++ b/src/core/ext/transport/chttp2/transport/flow_control.c @@ -441,14 +441,6 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; } } - TRACEACTION(tfc, action); - return action; -} - -grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action( - grpc_chttp2_transport_flowctl* tfc) { - grpc_chttp2_flowctl_action action; - memset(&action, 0, sizeof(action)); if (tfc->enable_bdp_probe) { action.need_ping = grpc_bdp_estimator_need_ping(&tfc->bdp_estimator); @@ -496,7 +488,6 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action( } } } - TRACEACTION(tfc, action); return action; } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index c2dfce7c9c7..2f5320985ae 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -61,12 +61,6 @@ typedef enum { GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, } grpc_chttp2_write_state; -typedef enum { - GRPC_CHTTP2_PING_ON_NEXT_WRITE = 0, - GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, - GRPC_CHTTP2_PING_TYPE_COUNT /* must be last */ -} grpc_chttp2_ping_type; - typedef enum { GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY, GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT, @@ -93,7 +87,6 @@ typedef enum { GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, - GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING, @@ -370,7 +363,7 @@ struct grpc_chttp2_transport { uint32_t last_new_stream_id; /** ping queues for various ping insertion points */ - grpc_chttp2_ping_queue ping_queues[GRPC_CHTTP2_PING_TYPE_COUNT]; + grpc_chttp2_ping_queue ping_queue; grpc_chttp2_repeated_ping_policy ping_policy; grpc_chttp2_repeated_ping_state ping_state; uint64_t ping_ctr; /* unique id for pings */ @@ -754,9 +747,6 @@ typedef struct { grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc); -grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action( - grpc_chttp2_transport_flowctl *tfc); - // Takes in a flow control action and performs all the needed operations. void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, grpc_chttp2_flowctl_action action, diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 3ded801985d..c132195457e 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -42,18 +42,9 @@ static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->write_cb_pool = cb; } -static void collapse_pings_from_into(grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, - grpc_chttp2_ping_queue *pq) { - for (size_t i = 0; i < GRPC_CHTTP2_PCL_COUNT; i++) { - grpc_closure_list_move(&t->ping_queues[ping_type].lists[i], &pq->lists[i]); - } -} - static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type) { - grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type]; + grpc_chttp2_transport *t) { + grpc_chttp2_ping_queue *pq = &t->ping_queue; if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { /* no ping needed: wait */ return; @@ -100,17 +91,7 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, } return; } - /* coalesce equivalent pings into this one */ - switch (ping_type) { - case GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE: - collapse_pings_from_into(t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, pq); - break; - case GRPC_CHTTP2_PING_ON_NEXT_WRITE: - break; - case GRPC_CHTTP2_PING_TYPE_COUNT: - GPR_UNREACHABLE_CODE(break); - } - pq->inflight_id = t->ping_ctr * GRPC_CHTTP2_PING_TYPE_COUNT + ping_type; + pq->inflight_id = t->ping_ctr; t->ping_ctr++; GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INITIATE]); grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], @@ -179,6 +160,12 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( GPR_TIMER_BEGIN("grpc_chttp2_begin_write", 0); + for (size_t i = 0; i < t->ping_ack_count; i++) { + grpc_slice_buffer_add(&t->outbuf, + grpc_chttp2_ping_create(1, t->ping_acks[i])); + } + t->ping_ack_count = 0; + if (t->dirtied_local_settings && !t->sent_local_settings) { grpc_slice_buffer_add( &t->outbuf, @@ -481,8 +468,6 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( uint32_t transport_announce = grpc_chttp2_flowctl_maybe_send_transport_update(&t->flow_control); if (transport_announce) { - maybe_initiate_ping(exec_ctx, t, - GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE); grpc_transport_one_way_stats throwaway_stats; grpc_slice_buffer_add( &t->outbuf, grpc_chttp2_window_update_create(0, transport_announce, @@ -496,13 +481,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } } - for (size_t i = 0; i < t->ping_ack_count; i++) { - grpc_slice_buffer_add(&t->outbuf, - grpc_chttp2_ping_create(1, t->ping_acks[i])); - } - t->ping_ack_count = 0; - - maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE); + maybe_initiate_ping(exec_ctx, t); GPR_TIMER_END("grpc_chttp2_begin_write", 0); diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c index 8b576934134..2a3ffbbd018 100644 --- a/src/core/lib/transport/bdp_estimator.c +++ b/src/core/lib/transport/bdp_estimator.c @@ -29,8 +29,11 @@ grpc_tracer_flag grpc_bdp_estimator_trace = void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) { estimator->estimate = 65536; estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; + estimator->ping_start_time = gpr_time_0(GPR_CLOCK_MONOTONIC); estimator->name = name; estimator->bw_est = 0; + estimator->inter_ping_delay = 100.0; // start at 100ms + estimator->stable_estimate_count = 0; } bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator, @@ -53,7 +56,8 @@ void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, bool grpc_bdp_estimator_need_ping(const grpc_bdp_estimator *estimator) { switch (estimator->ping_state) { case GRPC_BDP_PING_UNSCHEDULED: - return true; + return gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), + estimator->ping_start_time) >= 0; case GRPC_BDP_PING_SCHEDULED: return false; case GRPC_BDP_PING_STARTED: @@ -84,10 +88,11 @@ void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) { } void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) { - gpr_timespec dt_ts = - gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), estimator->ping_start_time); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec dt_ts = gpr_time_sub(now, estimator->ping_start_time); double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec; double bw = dt > 0 ? ((double)estimator->accumulator / dt) : 0; + int start_inter_ping_delay = estimator->inter_ping_delay; if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64 " dt=%lf bw=%lfMbs bw_est=%lfMbs", @@ -104,7 +109,26 @@ void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) { gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64, estimator->name, estimator->estimate); } + estimator->inter_ping_delay /= 2; // if the ping estimate changes, + // exponentially get faster at probing + } else if (estimator->inter_ping_delay < 10000) { + estimator->stable_estimate_count++; + if (estimator->stable_estimate_count >= 2) { + estimator->inter_ping_delay += + 100 + + (int)(rand() * 100.0 / RAND_MAX); // if the ping estimate is steady, + // slowly ramp down the probe time + } + } + if (start_inter_ping_delay != estimator->inter_ping_delay) { + estimator->stable_estimate_count = 0; + if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { + gpr_log(GPR_DEBUG, "bdp[%s]:update_inter_time to %dms", estimator->name, + estimator->inter_ping_delay); + } } estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED; estimator->accumulator = 0; + estimator->ping_start_time = gpr_time_add( + now, gpr_time_from_millis(estimator->inter_ping_delay, GPR_TIMESPAN)); } diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index 1ef0dc99ddb..670e38dd4bb 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -39,7 +39,12 @@ typedef struct grpc_bdp_estimator { grpc_bdp_estimator_ping_state ping_state; int64_t accumulator; int64_t estimate; + // case ping_state of + // GRPC_BDP_PING_UNSCHEDULED => when to start the next ping + // GRPC_BDP_PING_STARTED => when the current ping was started gpr_timespec ping_start_time; + int inter_ping_delay; + int stable_estimate_count; double bw_est; const char *name; } grpc_bdp_estimator;