From 8b34eca90bc25e505cef5d50aab483a62a2df373 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 20 Oct 2016 21:13:31 -0700 Subject: [PATCH] Allow updating initial_window_size, connection_window based on observed bdp --- .../chttp2/transport/chttp2_transport.c | 108 +++++++++++++++--- .../chttp2/transport/frame_settings.c | 7 +- .../ext/transport/chttp2/transport/internal.h | 4 + 3 files changed, 97 insertions(+), 22 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 793d3cc5788..e9d5454c3c1 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -262,6 +262,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_bdp_estimator_init(&t->bdp_estimator); t->last_bdp_ping_finished = gpr_now(GPR_CLOCK_MONOTONIC); + t->last_pid_update = t->last_bdp_ping_finished; + grpc_pid_controller_init(&t->pid_controller, 128, 64, 0); + t->bdp_guess = DEFAULT_WINDOW; grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(&t->hpack_parser); @@ -1734,6 +1737,39 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, * INPUT PROCESSING - PARSING */ +static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + double bdp_dbl) { + uint32_t bdp; + if (bdp_dbl <= 0) { + bdp = 0; + } else if (bdp_dbl > UINT32_MAX) { + bdp = UINT32_MAX; + } else { + bdp = (uint32_t)(bdp_dbl); + } + int64_t delta = + (int64_t)bdp - + (int64_t)t->settings[GRPC_LOCAL_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) { + return; + } + gpr_log(GPR_DEBUG, "%s: %d %" PRId64, t->peer_string, bdp, delta); + if (delta < 0) { + t->retract_incoming_window += -delta; + } else if (delta <= t->retract_incoming_window) { + t->retract_incoming_window -= delta; + } else { + delta -= t->retract_incoming_window; + t->retract_incoming_window = 0; + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("update_bdp", t, announce_incoming_window, + delta); + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("update_bdp", t, incoming_window, delta); + grpc_chttp2_initiate_write(exec_ctx, t, false, "update_bdp"); + } + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp); +} + static void read_action_begin(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { /* Control flow: @@ -1826,7 +1862,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (t->initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { - grpc_chttp2_list_add_writable_stream(t, s); + grpc_chttp2_become_writable(exec_ctx, t, s, false, "unstalled"); } } t->initial_window_update = 0; @@ -1860,6 +1896,58 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, send_ping_locked(exec_ctx, t, &t->finish_bdp_ping); } + int64_t estimate; + if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update); + double dt = dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; + if (dt > 3) { + grpc_pid_controller_reset(&t->pid_controller); + } + double new_guess = t->bdp_guess + grpc_pid_controller_update( + &t->pid_controller, + 2.0 * estimate - t->bdp_guess, dt); + if (new_guess > t->bdp_guess * 2) { + grpc_pid_controller_reset(&t->pid_controller); + t->bdp_guess *= 2; + } else if (new_guess < t->bdp_guess * 0.5) { + grpc_pid_controller_reset(&t->pid_controller); + t->bdp_guess *= 0.5; + } else { + t->bdp_guess = new_guess; + } + update_bdp(exec_ctx, t, t->bdp_guess); + if (0) + gpr_log(GPR_DEBUG, "bdp guess %s: %lf (est=%" PRId64 " dt=%lf int=%lf)", + t->peer_string, t->bdp_guess, estimate, dt, + t->pid_controller.error_integral); + t->last_pid_update = now; + + /* + gpr_log( + GPR_DEBUG, "%s BDP estimate: %" PRId64 + " (%d %d) [%d %d %d %d %d %d %d %d %d %d %d %d %d %d %d + %d]", + t->peer_string, estimate, t->bdp_estimator.first_sample_idx, + t->bdp_estimator.num_samples, (int)t->bdp_estimator.samples[0], + (int)t->bdp_estimator.samples[1], + (int)t->bdp_estimator.samples[2], + (int)t->bdp_estimator.samples[3], + (int)t->bdp_estimator.samples[4], + (int)t->bdp_estimator.samples[5], + (int)t->bdp_estimator.samples[6], + (int)t->bdp_estimator.samples[7], + (int)t->bdp_estimator.samples[8], + (int)t->bdp_estimator.samples[9], + (int)t->bdp_estimator.samples[10], + (int)t->bdp_estimator.samples[11], + (int)t->bdp_estimator.samples[12], + (int)t->bdp_estimator.samples[13], + (int)t->bdp_estimator.samples[14], + (int)t->bdp_estimator.samples[15]); + */ + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); @@ -1883,26 +1971,8 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = tp; grpc_bdp_estimator_complete_ping(&t->bdp_estimator); - t->last_bdp_ping_finished = gpr_now(GPR_CLOCK_MONOTONIC); - int64_t estimate; - if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) { - gpr_log( - GPR_DEBUG, "%s BDP estimate: %" PRId64 - " (%d %d) [%d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d]", - t->peer_string, estimate, t->bdp_estimator.first_sample_idx, - t->bdp_estimator.num_samples, (int)t->bdp_estimator.samples[0], - (int)t->bdp_estimator.samples[1], (int)t->bdp_estimator.samples[2], - (int)t->bdp_estimator.samples[3], (int)t->bdp_estimator.samples[4], - (int)t->bdp_estimator.samples[5], (int)t->bdp_estimator.samples[6], - (int)t->bdp_estimator.samples[7], (int)t->bdp_estimator.samples[8], - (int)t->bdp_estimator.samples[9], (int)t->bdp_estimator.samples[10], - (int)t->bdp_estimator.samples[11], (int)t->bdp_estimator.samples[12], - (int)t->bdp_estimator.samples[13], (int)t->bdp_estimator.samples[14], - (int)t->bdp_estimator.samples[15]); - } - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); } diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index 92022f90c96..340a1459091 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -236,7 +236,7 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, } if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && parser->incoming_settings[parser->id] != parser->value) { - t->initial_window_update = + t->initial_window_update += (int64_t)parser->value - parser->incoming_settings[parser->id]; if (grpc_http_trace) { gpr_log(GPR_DEBUG, "adding %d for initial_window change", @@ -245,8 +245,9 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, } parser->incoming_settings[parser->id] = parser->value; if (grpc_http_trace) { - gpr_log(GPR_DEBUG, "CHTTP2:%s: got setting %d = %d", - t->is_client ? "CLI" : "SVR", parser->id, parser->value); + gpr_log(GPR_DEBUG, "CHTTP2:%s:%s: got setting %d = %d", + t->is_client ? "CLI" : "SVR", t->peer_string, parser->id, + parser->value); } } else if (grpc_http_trace) { gpr_log(GPR_ERROR, "CHTTP2: Ignoring unknown setting %d (value %d)", diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 2faeb9d25c6..6416ee820a4 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -52,6 +52,7 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/pid_controller.h" #include "src/core/lib/transport/transport_impl.h" /* streams are kept in various linked lists depending on what things need to @@ -328,9 +329,12 @@ struct grpc_chttp2_transport { /* bdp estimator */ grpc_bdp_estimator bdp_estimator; + grpc_pid_controller pid_controller; + double bdp_guess; grpc_closure finish_bdp_ping; grpc_closure finish_bdp_ping_locked; gpr_timespec last_bdp_ping_finished; + gpr_timespec last_pid_update; /* if non-NULL, close the transport with this error when writes are finished */