diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 059caaa290d..df8fc1b15bd 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -265,8 +265,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->endpoint_reading = 1; t->next_stream_id = is_client ? 1 : 2; t->is_client = is_client; - t->remote_window = DEFAULT_WINDOW; - t->announced_window = DEFAULT_WINDOW; + t->flow_control.remote_window = DEFAULT_WINDOW; + t->flow_control.announced_window = DEFAULT_WINDOW; +#ifndef NDEBUG + t->flow_control.t = t; +#endif t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->is_first_frame = true; grpc_connectivity_state_init( @@ -705,6 +708,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, post_destructive_reclaimer(exec_ctx, t); } +#ifndef NDEBUG + s->flow_control.s = s; +#endif + GPR_TIMER_END("init_stream", 0); return 0; @@ -753,7 +760,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->byte_stream_error); - grpc_chttp2_flowctl_destroy_stream(s); + grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); @@ -1449,9 +1456,20 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, already_received = s->frame_storage.length + s->unprocessed_incoming_frames_buffer.length; } - grpc_chttp2_flowctl_incoming_bs_update(t, s, 5, already_received); - grpc_chttp2_flowctl_act_on_action( - exec_ctx, grpc_chttp2_flowctl_get_action(t, s), t, s); + if (!s->read_closed) { + grpc_chttp2_flowctl_incoming_bs_update( + &t->flow_control, &s->flow_control, + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + 5, already_received); + grpc_chttp2_flowctl_act_on_action( + exec_ctx, + grpc_chttp2_flowctl_get_action( + &t->flow_control, &s->flow_control, false, + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), + t, s); + } } grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } @@ -2252,8 +2270,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action.parse", 0); GPR_TIMER_BEGIN("post_parse_locked", 0); - if (t->initial_window_update != 0) { - if (t->initial_window_update > 0) { + if (t->flow_control.initial_window_update != 0) { + if (t->flow_control.initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { grpc_chttp2_become_writable( @@ -2261,7 +2279,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, "unstalled"); } } - t->initial_window_update = 0; + t->flow_control.initial_window_update = 0; } GPR_TIMER_END("post_parse_locked", 0); } @@ -2543,11 +2561,19 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s = bs->stream; size_t cur_length = s->frame_storage.length; - grpc_chttp2_flowctl_incoming_bs_update(t, s, bs->next_action.max_size_hint, - cur_length); - grpc_chttp2_flowctl_act_on_action(exec_ctx, - grpc_chttp2_flowctl_get_action(t, s), t, s); - + if (!s->read_closed) { + grpc_chttp2_flowctl_incoming_bs_update( + &t->flow_control, &s->flow_control, + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + bs->next_action.max_size_hint, cur_length); + grpc_chttp2_flowctl_act_on_action( + exec_ctx, grpc_chttp2_flowctl_get_action( + &t->flow_control, &s->flow_control, false, + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), + t, s); + } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); if (s->frame_storage.length > 0) { grpc_slice_buffer_swap(&s->frame_storage, diff --git a/src/core/ext/transport/chttp2/transport/flow_control.c b/src/core/ext/transport/chttp2/transport/flow_control.c index b9d2cbf53c9..983c8880b45 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.c +++ b/src/core/ext/transport/chttp2/transport/flow_control.c @@ -28,7 +28,7 @@ #include "src/core/lib/support/string.h" static uint32_t grpc_chttp2_target_announced_window( - const grpc_chttp2_transport* t); + const grpc_chttp2_transport_flowctl* tfc, uint32_t acked_local_window); #ifndef NDEBUG @@ -41,15 +41,18 @@ typedef struct { int64_t announced_window_delta; } shadow_flow_control; -static void pretrace(shadow_flow_control* sfc, grpc_chttp2_transport* t, - grpc_chttp2_stream* s) { - sfc->remote_window = t->remote_window; - sfc->target_window = grpc_chttp2_target_announced_window(t); - sfc->announced_window = t->announced_window; - if (s != NULL) { - sfc->remote_window_delta = s->remote_window_delta; - sfc->local_window_delta = s->local_window_delta; - sfc->announced_window_delta = s->announced_window_delta; +static void pretrace(shadow_flow_control* shadow_fc, + grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc) { + shadow_fc->remote_window = tfc->remote_window; + shadow_fc->target_window = grpc_chttp2_target_announced_window( + tfc, tfc->t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + shadow_fc->announced_window = tfc->announced_window; + if (sfc != NULL) { + shadow_fc->remote_window_delta = sfc->remote_window_delta; + shadow_fc->local_window_delta = sfc->local_window_delta; + shadow_fc->announced_window_delta = sfc->announced_window_delta; } } @@ -65,35 +68,39 @@ static char* fmt_str(int64_t old, int64_t new) { return str_lp; } -static void posttrace(shadow_flow_control* sfc, grpc_chttp2_transport* t, - grpc_chttp2_stream* s, char* reason) { +static void posttrace(shadow_flow_control* shadow_fc, + grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc, char* reason) { uint32_t acked_local_window = - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + tfc->t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; uint32_t remote_window = - t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - char* trw_str = fmt_str(sfc->remote_window, t->remote_window); + tfc->t->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + char* trw_str = fmt_str(shadow_fc->remote_window, tfc->remote_window); char* tlw_str = - fmt_str(sfc->target_window, grpc_chttp2_target_announced_window(t)); - char* taw_str = fmt_str(sfc->announced_window, t->announced_window); + fmt_str(shadow_fc->target_window, + grpc_chttp2_target_announced_window(tfc, acked_local_window)); + char* taw_str = fmt_str(shadow_fc->announced_window, tfc->announced_window); char* srw_str; char* slw_str; char* saw_str; - if (s != NULL) { - srw_str = fmt_str(sfc->remote_window_delta + remote_window, - s->remote_window_delta + remote_window); - slw_str = fmt_str(sfc->local_window_delta + acked_local_window, - s->local_window_delta + acked_local_window); - saw_str = fmt_str(sfc->announced_window_delta + acked_local_window, - s->announced_window_delta + acked_local_window); + if (sfc != NULL) { + srw_str = fmt_str(shadow_fc->remote_window_delta + remote_window, + sfc->remote_window_delta + remote_window); + slw_str = fmt_str(shadow_fc->local_window_delta + acked_local_window, + sfc->local_window_delta + acked_local_window); + saw_str = fmt_str(shadow_fc->announced_window_delta + acked_local_window, + sfc->announced_window_delta + acked_local_window); } else { srw_str = gpr_leftpad("", ' ', 30); slw_str = gpr_leftpad("", ' ', 30); saw_str = gpr_leftpad("", ' ', 30); } gpr_log(GPR_DEBUG, - "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s", t, - s != NULL ? s->id : 0, t->is_client ? "cli" : "svr", reason, trw_str, - tlw_str, taw_str, srw_str, slw_str, saw_str); + "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s", + tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr", + reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str); gpr_free(trw_str); gpr_free(tlw_str); gpr_free(taw_str); @@ -122,190 +129,190 @@ static void trace_action(grpc_chttp2_flowctl_action action) { urgency_to_string(action.send_stream_update)); } -#define PRETRACE(t, s) \ - shadow_flow_control sfc; \ - GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&sfc, t, s)) -#define POSTTRACE(t, s, reason) \ - GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&sfc, t, s, reason)) +#define PRETRACE(tfc, sfc) \ + shadow_flow_control shadow_fc; \ + GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc)) +#define POSTTRACE(tfc, sfc, reason) \ + GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason)) #define TRACEACTION(action) GRPC_FLOW_CONTROL_IF_TRACING(trace_action(action)) #else -#define PRETRACE(t, s) -#define POSTTRACE(t, s, reason) +#define PRETRACE(tfc, sfc) +#define POSTTRACE(tfc, sfc, reason) #define TRACEACTION(action) #endif /* How many bytes of incoming flow control would we like to advertise */ static uint32_t grpc_chttp2_target_announced_window( - const grpc_chttp2_transport* t) { + const grpc_chttp2_transport_flowctl* tfc, uint32_t acked_init_window) { return (uint32_t)GPR_MIN( (int64_t)((1u << 31) - 1), - t->announced_stream_total_over_incoming_window + - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + tfc->announced_stream_total_over_incoming_window + acked_init_window); } // we have sent data on the wire, we must track this in our bookkeeping for the // remote peer's flow control. -void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport* t, - grpc_chttp2_stream* s, int64_t size) { - PRETRACE(t, s); - t->remote_window -= size; - s->remote_window_delta -= size; - POSTTRACE(t, s, " data sent"); +void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc, + int64_t size) { + PRETRACE(tfc, sfc); + tfc->remote_window -= size; + sfc->remote_window_delta -= size; + POSTTRACE(tfc, sfc, " data sent"); } -static void announced_window_delta_preupdate(grpc_chttp2_transport* t, - grpc_chttp2_stream* s) { - if (s->announced_window_delta > 0) { - t->announced_stream_total_over_incoming_window -= s->announced_window_delta; +static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc) { + if (sfc->announced_window_delta > 0) { + tfc->announced_stream_total_over_incoming_window -= + sfc->announced_window_delta; } else { - t->announced_stream_total_under_incoming_window += - -s->announced_window_delta; + tfc->announced_stream_total_under_incoming_window += + -sfc->announced_window_delta; } } -static void announced_window_delta_postupdate(grpc_chttp2_transport* t, - grpc_chttp2_stream* s) { - if (s->announced_window_delta > 0) { - t->announced_stream_total_over_incoming_window += s->announced_window_delta; +static void announced_window_delta_postupdate( + grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { + if (sfc->announced_window_delta > 0) { + tfc->announced_stream_total_over_incoming_window += + sfc->announced_window_delta; } else { - t->announced_stream_total_under_incoming_window -= - -s->announced_window_delta; + tfc->announced_stream_total_under_incoming_window -= + -sfc->announced_window_delta; } } // We have received data from the wire. We must track this in our own flow // control bookkeeping. // Returns an error if the incoming frame violates our flow control. -grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport* t, - grpc_chttp2_stream* s, - int64_t incoming_frame_size) { - PRETRACE(t, s); - if (incoming_frame_size > t->announced_window) { +grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc, + int64_t incoming_frame_size, + uint32_t acked_init_window, + uint32_t sent_init_window) { + PRETRACE(tfc, sfc); + if (incoming_frame_size > tfc->announced_window) { char* msg; - gpr_asprintf(&msg, "frame of size %d overflows local window of %" PRId64, - t->incoming_frame_size, t->announced_window); + gpr_asprintf(&msg, + "frame of size %" PRId64 " overflows local window of %" PRId64, + incoming_frame_size, tfc->announced_window); grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); return err; } - // TODO(ncteisen): can this ever be null? ANSWER: only when incoming frame - // size is zero? - if (s != NULL) { + if (sfc != NULL) { int64_t acked_stream_window = - s->announced_window_delta + - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - int64_t sent_stream_window = - s->announced_window_delta + - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + sfc->announced_window_delta + acked_init_window; + int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window; if (incoming_frame_size > acked_stream_window) { if (incoming_frame_size <= sent_stream_window) { gpr_log( GPR_ERROR, - "Incoming frame of size %d exceeds local window size of %" PRId64 + "Incoming frame of size %" PRId64 + " exceeds local window size of %" PRId64 ".\n" "The (un-acked, future) window size would be %" PRId64 " which is not exceeded.\n" "This would usually cause a disconnection, but allowing it due to" "broken HTTP2 implementations in the wild.\n" "See (for example) https://github.com/netty/netty/issues/6520.", - t->incoming_frame_size, acked_stream_window, sent_stream_window); + incoming_frame_size, acked_stream_window, sent_stream_window); } else { char* msg; - gpr_asprintf(&msg, - "frame of size %d overflows local window of %" PRId64, - t->incoming_frame_size, acked_stream_window); + gpr_asprintf(&msg, "frame of size %" PRId64 + " overflows local window of %" PRId64, + incoming_frame_size, acked_stream_window); grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); return err; } } - announced_window_delta_preupdate(t, s); - s->announced_window_delta -= incoming_frame_size; - announced_window_delta_postupdate(t, s); - s->local_window_delta -= incoming_frame_size; - s->received_bytes += incoming_frame_size; + announced_window_delta_preupdate(tfc, sfc); + sfc->announced_window_delta -= incoming_frame_size; + announced_window_delta_postupdate(tfc, sfc); + sfc->local_window_delta -= incoming_frame_size; } - t->announced_window -= incoming_frame_size; + tfc->announced_window -= incoming_frame_size; - POSTTRACE(t, s, " data recv"); + POSTTRACE(tfc, sfc, " data recv"); return GRPC_ERROR_NONE; } // Returns a non zero announce integer if we should send a transport window // update uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( - grpc_chttp2_transport* t) { - PRETRACE(t, NULL); - uint32_t target_announced_window = grpc_chttp2_target_announced_window(t); + grpc_chttp2_transport_flowctl* tfc, uint32_t acked_init_window, + bool has_outbuf) { + PRETRACE(tfc, NULL); + uint32_t target_announced_window = + grpc_chttp2_target_announced_window(tfc, acked_init_window); uint32_t threshold_to_send_transport_window_update = - t->outbuf.count > 0 ? 3 * target_announced_window / 4 - : target_announced_window / 2; - if (t->announced_window <= threshold_to_send_transport_window_update && - t->announced_window != target_announced_window) { + has_outbuf ? 3 * target_announced_window / 4 + : target_announced_window / 2; + if (tfc->announced_window <= threshold_to_send_transport_window_update && + tfc->announced_window != target_announced_window) { uint32_t announce = (uint32_t)GPR_CLAMP( - target_announced_window - t->announced_window, 0, UINT32_MAX); - t->announced_window += announce; - POSTTRACE(t, NULL, "t updt sent"); + target_announced_window - tfc->announced_window, 0, UINT32_MAX); + tfc->announced_window += announce; + POSTTRACE(tfc, NULL, "t updt sent"); return announce; } GRPC_FLOW_CONTROL_IF_TRACING( - gpr_log(GPR_DEBUG, "%p[0][%s] will not to send transport update", t, - t->is_client ? "cli" : "svr")); + gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc, + tfc->t->is_client ? "cli" : "svr")); return 0; } // Returns a non zero announce integer if we should send a stream window update -uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(grpc_chttp2_stream* s) { - PRETRACE(s->t, s); - if (s->local_window_delta > s->announced_window_delta) { +uint32_t grpc_chttp2_flowctl_maybe_send_stream_update( + grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { + PRETRACE(tfc, sfc); + if (sfc->local_window_delta > sfc->announced_window_delta) { uint32_t announce = (uint32_t)GPR_CLAMP( - s->local_window_delta - s->announced_window_delta, 0, UINT32_MAX); - announced_window_delta_preupdate(s->t, s); - s->announced_window_delta += announce; - announced_window_delta_postupdate(s->t, s); - POSTTRACE(s->t, s, "s updt sent"); + sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX); + announced_window_delta_preupdate(tfc, sfc); + sfc->announced_window_delta += announce; + announced_window_delta_postupdate(tfc, sfc); + POSTTRACE(tfc, sfc, "s updt sent"); return announce; } GRPC_FLOW_CONTROL_IF_TRACING( - gpr_log(GPR_DEBUG, "%p[%u][%s] will not to send stream update", s->t, - s->id, s->t->is_client ? "cli" : "svr")); + gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc, + sfc->s->id, tfc->t->is_client ? "cli" : "svr")); return 0; } // we have received a WINDOW_UPDATE frame for a transport -void grpc_chttp2_flowctl_recv_transport_update(grpc_chttp2_transport* t, - uint32_t size) { - PRETRACE(t, NULL); - t->remote_window += size; - POSTTRACE(t, NULL, "t updt recv"); +void grpc_chttp2_flowctl_recv_transport_update( + grpc_chttp2_transport_flowctl* tfc, uint32_t size) { + PRETRACE(tfc, NULL); + tfc->remote_window += size; + POSTTRACE(tfc, NULL, "t updt recv"); } // we have received a WINDOW_UPDATE frame for a stream -void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_stream* s, +void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc, uint32_t size) { - PRETRACE(s->t, s); - s->remote_window_delta += size; - POSTTRACE(s->t, s, "s updt recv"); + PRETRACE(tfc, sfc); + sfc->remote_window_delta += size; + POSTTRACE(tfc, sfc, "s updt recv"); } -void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport* t, - grpc_chttp2_stream* s, +void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc, + uint32_t sent_init_window, size_t max_size_hint, size_t have_already) { - PRETRACE(t, s); + PRETRACE(tfc, sfc); uint32_t max_recv_bytes; - uint32_t initial_window_size = - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; /* clamp max recv hint to an allowable size */ - if (max_size_hint >= UINT32_MAX - initial_window_size) { - max_recv_bytes = UINT32_MAX - initial_window_size; + if (max_size_hint >= UINT32_MAX - sent_init_window) { + max_recv_bytes = UINT32_MAX - sent_init_window; } else { max_recv_bytes = (uint32_t)max_size_hint; } @@ -318,34 +325,38 @@ void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport* t, } /* add some small lookahead to keep pipelines flowing */ - GPR_ASSERT(max_recv_bytes <= UINT32_MAX - initial_window_size); - if (s->local_window_delta < max_recv_bytes && !s->read_closed) { + GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window); + if (sfc->local_window_delta < max_recv_bytes) { uint32_t add_max_recv_bytes = - (uint32_t)(max_recv_bytes - s->local_window_delta); - s->local_window_delta += add_max_recv_bytes; + (uint32_t)(max_recv_bytes - sfc->local_window_delta); + sfc->local_window_delta += add_max_recv_bytes; } - POSTTRACE(t, s, "app st recv"); + POSTTRACE(tfc, sfc, "app st recv"); } -void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_stream* s) { - announced_window_delta_preupdate(s->t, s); +void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc) { + announced_window_delta_preupdate(tfc, sfc); } grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( - const grpc_chttp2_transport* t, const grpc_chttp2_stream* s) { + const grpc_chttp2_transport_flowctl* tfc, + const grpc_chttp2_stream_flowctl* sfc, bool stream_read_closed, + uint32_t sent_init_window) { grpc_chttp2_flowctl_action action; memset(&action, 0, sizeof(action)); - uint32_t target_announced_window = grpc_chttp2_target_announced_window(t); - int64_t init_window = - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - if (t->announced_window < target_announced_window / 2) { + uint32_t target_announced_window = + grpc_chttp2_target_announced_window(tfc, sent_init_window); + if (tfc->announced_window < target_announced_window / 2) { action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; } - if (s != NULL && !s->read_closed) { - if ((int64_t)s->local_window_delta > (int64_t)s->announced_window_delta && - (int64_t)s->announced_window_delta <= -init_window / 2) { + if (sfc != NULL && !stream_read_closed) { + if ((int64_t)sfc->local_window_delta > + (int64_t)sfc->announced_window_delta && + (int64_t)sfc->announced_window_delta + sent_init_window <= + sent_init_window / 2) { action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; - } else if (s->local_window_delta > s->announced_window_delta) { + } else if (sfc->local_window_delta > sfc->announced_window_delta) { action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; } } diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index 032f2ac426b..057d3d9ed33 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -201,13 +201,13 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, } if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && parser->incoming_settings[id] != parser->value) { - t->initial_window_update += + t->flow_control.initial_window_update += (int64_t)parser->value - parser->incoming_settings[id]; if (GRPC_TRACER_ON(grpc_http_trace) || GRPC_TRACER_ON(grpc_flowctl_trace)) { gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change", t, t->is_client ? "cli" : "svr", - (int)t->initial_window_update); + (int)t->flow_control.initial_window_update); } } parser->incoming_settings[id] = parser->value; diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index 9cbadf4cfad..65f3b01d77f 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -95,8 +95,8 @@ grpc_error *grpc_chttp2_window_update_parser_parse( if (t->incoming_stream_id != 0) { if (s != NULL) { - grpc_chttp2_flowctl_recv_stream_update(s, received_update); - // TODO(control bits) + grpc_chttp2_flowctl_recv_stream_update( + &t->flow_control, &s->flow_control, received_update); if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { grpc_chttp2_become_writable( exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, @@ -104,10 +104,10 @@ grpc_error *grpc_chttp2_window_update_parser_parse( } } } else { - bool was_zero = t->remote_window <= 0; - grpc_chttp2_flowctl_recv_transport_update(t, received_update); - // TODO(control bits) - bool is_zero = t->remote_window <= 0; + bool was_zero = t->flow_control.remote_window <= 0; + grpc_chttp2_flowctl_recv_transport_update(&t->flow_control, + received_update); + bool is_zero = t->flow_control.remote_window <= 0; if (was_zero && !is_zero) { grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control"); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index fee4469a3a7..ba9949d11f4 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -213,6 +213,37 @@ typedef enum { GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED, } grpc_chttp2_keepalive_state; +typedef struct { + /** initial window change. This is tracked as we parse settings frames from + * the remote peer. If there is a positive delta, then we will make all + * streams readable since they may have become unstalled */ + int64_t initial_window_update; + + /** Our bookkeeping for the remote peer's available window */ + int64_t remote_window; + + /** calculating what we should give for local window: + we track the total amount of flow control over initial window size + across all streams: this is data that we want to receive right now (it + has an outstanding read) + and the total amount of flow control under initial window size across all + streams: this is data we've read early + we want to adjust incoming_window such that: + incoming_window = total_over - max(bdp - total_under, 0) */ + int64_t announced_stream_total_over_incoming_window; + int64_t announced_stream_total_under_incoming_window; + + /** This is out window according to what we have sent to our remote peer. The + * difference between this and target window is what we use to decide when + * to send WINDOW_UPDATE frames. */ + int64_t announced_window; + +// pointer back to transport for tracing +#ifndef NDEBUG + const grpc_chttp2_transport *t; +#endif +} grpc_chttp2_transport_flowctl; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -327,31 +358,7 @@ struct grpc_chttp2_transport { /** parser for goaway frames */ grpc_chttp2_goaway_parser goaway_parser; - /*********** Flow Control **************/ - - /** initial window change. This is tracked as we parse settings frames from - * the remote peer. If there is a positive delta, then we will make all - * streams readable since they may have become unstalled */ - int64_t initial_window_update; - - /** Our bookkeeping for the remote peer's available window */ - int64_t remote_window; - - /** calculating what we should give for local window: - we track the total amount of flow control over initial window size - across all streams: this is data that we want to receive right now (it - has an outstanding read) - and the total amount of flow control under initial window size across all - streams: this is data we've read early - we want to adjust incoming_window such that: - incoming_window = total_over - max(bdp - total_under, 0) */ - int64_t announced_stream_total_over_incoming_window; - int64_t announced_stream_total_under_incoming_window; - - /** This is out window according to what we have sent to our remote peer. The - * difference between this and target window is what we use to decide when - * to send WINDOW_UPDATE frames. */ - int64_t announced_window; + grpc_chttp2_transport_flowctl flow_control; /* bdp estimation */ grpc_bdp_estimator bdp_estimator; @@ -360,8 +367,6 @@ struct grpc_chttp2_transport { grpc_pid_controller pid_controller; gpr_timespec last_pid_update; - /*********** End of Flow Control **************/ - /* deframing */ grpc_chttp2_deframe_transport_state deframe_state; uint8_t incoming_frame_type; @@ -437,6 +442,27 @@ typedef enum { GPRC_METADATA_PUBLISHED_AT_CLOSE } grpc_published_metadata_method; +typedef struct { + /** window available for us to send to peer, over or under the initial window + * size of the transport... ie: + * remote_window = remote_window_delta + transport.initial_window_size */ + int64_t remote_window_delta; + + /** window available for peer to send to us (as a delta on + * transport.initial_window_size) + * local_window = local_window_delta + transport.initial_window_size */ + int64_t local_window_delta; + + /** window available for peer to send to us over this stream that we have + * announced to the peer */ + int64_t announced_window_delta; + +// pointer back to stream for tracing +#ifndef NDEBUG + const grpc_chttp2_stream *s; +#endif +} grpc_chttp2_stream_flowctl; + struct grpc_chttp2_stream { grpc_chttp2_transport *t; grpc_stream_refcount *refcount; @@ -527,23 +553,7 @@ struct grpc_chttp2_stream { bool sent_initial_metadata; bool sent_trailing_metadata; - /*********** Flow Control ***********/ - - /** window available for us to send to peer, over or under the initial window - * size of the transport... ie: - * remote_window = remote_window_delta + transport.initial_window_size */ - int64_t remote_window_delta; - - /** window available for peer to send to us (as a delta on - * transport.initial_window_size) - * local_window = local_window_delta + transport.initial_window_size */ - int64_t local_window_delta; - - /** window available for peer to send to us over this stream that we have - * announced to the peer */ - int64_t announced_window_delta; - - /*********** End of Flow Control ***********/ + grpc_chttp2_stream_flowctl flow_control; grpc_slice_buffer flow_controlled_buffer; @@ -628,33 +638,43 @@ bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t, /********* Flow Control ***************/ // we have sent data on the wire -void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport *t, - grpc_chttp2_stream *s, int64_t size); +void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc, + grpc_chttp2_stream_flowctl *sfc, + int64_t size); // we have received data from the wire -grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - int64_t incoming_frame_size); +grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc, + grpc_chttp2_stream_flowctl *sfc, + int64_t incoming_frame_size, + uint32_t acked_init_window, + uint32_t sent_init_window); uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( - grpc_chttp2_transport *t); + grpc_chttp2_transport_flowctl *tfc, uint32_t acked_init_window, + bool has_outbuf); -uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(grpc_chttp2_stream *s); +uint32_t grpc_chttp2_flowctl_maybe_send_stream_update( + grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc); // we have received a WINDOW_UPDATE frame for a transport -void grpc_chttp2_flowctl_recv_transport_update(grpc_chttp2_transport *t, - uint32_t size); +void grpc_chttp2_flowctl_recv_transport_update( + grpc_chttp2_transport_flowctl *tfc, uint32_t size); // we have received a WINDOW_UPDATE frame for a stream -void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_stream *s, +void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc, + grpc_chttp2_stream_flowctl *sfc, uint32_t size); // the application is asking for a certain amount of bytes -void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport *t, - grpc_chttp2_stream *s, +void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc, + grpc_chttp2_stream_flowctl *sfc, + uint32_t initial_window_size, size_t max_size_hint, size_t have_already); +void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl *tfc, + grpc_chttp2_stream_flowctl *sfc); + typedef enum { // Nothing to be done. GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED = 0, @@ -671,15 +691,15 @@ typedef struct { } grpc_chttp2_flowctl_action; grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( - const grpc_chttp2_transport *t, const grpc_chttp2_stream *s); + const grpc_chttp2_transport_flowctl *tfc, + const grpc_chttp2_stream_flowctl *sfc, bool stream_read_closed, + uint32_t acked_init_window); void grpc_chttp2_flowctl_act_on_action(grpc_exec_ctx *exec_ctx, grpc_chttp2_flowctl_action action, grpc_chttp2_transport *t, grpc_chttp2_stream *s); -void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_stream *s); - /********* End of Flow Control ***************/ grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t, diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index dd1979fb6de..c03d4758d60 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -354,15 +354,27 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); grpc_error *err = GRPC_ERROR_NONE; - err = grpc_chttp2_flowctl_recv_data(t, s, t->incoming_frame_size); - grpc_chttp2_flowctl_act_on_action(exec_ctx, - grpc_chttp2_flowctl_get_action(t, s), t, s); + err = grpc_chttp2_flowctl_recv_data( + &t->flow_control, s == NULL ? NULL : &s->flow_control, + t->incoming_frame_size, + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + grpc_chttp2_flowctl_act_on_action( + exec_ctx, grpc_chttp2_flowctl_get_action( + &t->flow_control, s == NULL ? NULL : &s->flow_control, + s == NULL ? false : s->read_closed, + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), + t, s); if (err != GRPC_ERROR_NONE) { goto error_handler; } if (s == NULL) { return init_skip_frame_parser(exec_ctx, t, 0); } + s->received_bytes += t->incoming_frame_size; s->stats.incoming.framing_bytes += 9; if (err == GRPC_ERROR_NONE && s->read_closed) { return init_skip_frame_parser(exec_ctx, t, 0); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 9f4c2865151..8c2e763330d 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -192,7 +192,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( &t->hpack_compressor, t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); - if (t->remote_window > 0) { + if (t->flow_control.remote_window > 0) { while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) { if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) && stream_ref_if_not_destroyed(&s->refcount->refs)) { @@ -222,7 +222,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t, t->is_client ? "CLIENT" : "SERVER", s->id, sent_initial_metadata, s->send_initial_metadata != NULL, - (int)(s->local_window_delta - s->announced_window_delta))); + (int)(s->flow_control.local_window_delta - + s->flow_control.announced_window_delta))); grpc_mdelem *extra_headers_for_trailing_metadata[2]; size_t num_extra_headers_for_trailing_metadata = 0; @@ -279,7 +280,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( sent_initial_metadata = true; } /* send any window updates */ - uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update(s); + uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update( + &t->flow_control, &s->flow_control); if (stream_announce > 0) { grpc_slice_buffer_add( &t->outbuf, grpc_chttp2_window_update_create(s->id, stream_announce, @@ -297,13 +299,13 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (s->flow_controlled_buffer.length > 0) { uint32_t stream_remote_window = (uint32_t)GPR_MAX( 0, - s->remote_window_delta + + s->flow_control.remote_window_delta + (int64_t)t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); - uint32_t max_outgoing = - (uint32_t)GPR_MIN(t->settings[GRPC_PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], - GPR_MIN(stream_remote_window, t->remote_window)); + uint32_t max_outgoing = (uint32_t)GPR_MIN( + t->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], + GPR_MIN(stream_remote_window, t->flow_control.remote_window)); if (max_outgoing > 0) { uint32_t send_bytes = (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length); @@ -316,7 +318,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes, is_last_frame, &s->stats.outgoing, &t->outbuf); - grpc_chttp2_flowctl_sent_data(t, s, send_bytes); + grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control, + send_bytes); t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; if (!t->is_client) { @@ -339,7 +342,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } - } else if (t->remote_window == 0) { + } else if (t->flow_control.remote_window == 0) { grpc_chttp2_list_add_stalled_by_transport(t, s); now_writing = true; } else if (stream_remote_window == 0) { @@ -394,8 +397,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } } - uint32_t transport_announce = - grpc_chttp2_flowctl_maybe_send_transport_update(t); + uint32_t transport_announce = grpc_chttp2_flowctl_maybe_send_transport_update( + &t->flow_control, t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + t->outbuf.count > 0); if (transport_announce) { maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE); diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 9193330f783..cb113c5254f 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -391,8 +391,9 @@ static void BM_TransportStreamSend(benchmark::State &state) { MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!state.KeepRunning()) return; // force outgoing window to be yuge - s.chttp2_stream()->remote_window_delta = 1024 * 1024 * 1024; - f.chttp2_transport()->remote_window = 1024 * 1024 * 1024; + s.chttp2_stream()->flow_control.remote_window_delta = + 1024 * 1024 * 1024; + f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024; grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); reset_op(); op.on_complete = c.get(); @@ -517,22 +518,22 @@ static void BM_TransportStreamRecv(benchmark::State &state) { std::unique_ptr drain_continue; grpc_slice recv_slice; - std::unique_ptr c = - MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { - if (!state.KeepRunning()) return; - // force outgoing window to be yuge - s.chttp2_stream()->local_window_delta = 1024 * 1024 * 1024; - s.chttp2_stream()->announced_window_delta = 1024 * 1024 * 1024; - f.chttp2_transport()->announced_window = 1024 * 1024 * 1024; - received = 0; - reset_op(); - op.on_complete = do_nothing.get(); - op.recv_message = true; - op.payload->recv_message.recv_message = &recv_stream; - op.payload->recv_message.recv_message_ready = drain_start.get(); - s.Op(&op); - f.PushInput(grpc_slice_ref(incoming_data)); - }); + std::unique_ptr c = MakeClosure([&](grpc_exec_ctx *exec_ctx, + grpc_error *error) { + if (!state.KeepRunning()) return; + // force outgoing window to be yuge + s.chttp2_stream()->flow_control.local_window_delta = 1024 * 1024 * 1024; + s.chttp2_stream()->flow_control.announced_window_delta = 1024 * 1024 * 1024; + f.chttp2_transport()->flow_control.announced_window = 1024 * 1024 * 1024; + received = 0; + reset_op(); + op.on_complete = do_nothing.get(); + op.recv_message = true; + op.payload->recv_message.recv_message = &recv_stream; + op.payload->recv_message.recv_message_ready = drain_start.get(); + s.Op(&op); + f.PushInput(grpc_slice_ref(incoming_data)); + }); drain_start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (recv_stream == NULL) { diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 13c96ef3e15..5c44b9751f6 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -127,14 +127,15 @@ class TrickledCHTTP2 : public EndpointPairFixture { client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, - client->remote_window, server->remote_window, client->announced_window, - server->announced_window, - client_stream ? client_stream->remote_window_delta : -1, - server_stream ? server_stream->remote_window_delta : -1, - client_stream ? client_stream->local_window_delta : -1, - server_stream ? server_stream->local_window_delta : -1, - client_stream ? client_stream->announced_window_delta : -1, - server_stream ? server_stream->announced_window_delta : -1, + client->flow_control.remote_window, server->flow_control.remote_window, + client->flow_control.announced_window, + server->flow_control.announced_window, + client_stream ? client_stream->flow_control.remote_window_delta : -1, + server_stream ? server_stream->flow_control.remote_window_delta : -1, + client_stream ? client_stream->flow_control.local_window_delta : -1, + server_stream ? server_stream->flow_control.local_window_delta : -1, + client_stream ? client_stream->flow_control.announced_window_delta : -1, + server_stream ? server_stream->flow_control.announced_window_delta : -1, client->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], client->settings[GRPC_LOCAL_SETTINGS]