From 00f74a914aa7971c628ce3810029952024753e41 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 14 Jul 2017 15:41:05 -0700 Subject: [PATCH] Add read only refs to transport and stream --- .../chttp2/transport/chttp2_transport.c | 31 +++-------- .../transport/chttp2/transport/flow_control.c | 52 ++++++++++--------- .../ext/transport/chttp2/transport/internal.h | 19 ++----- .../ext/transport/chttp2/transport/parsing.c | 15 ++---- .../ext/transport/chttp2/transport/writing.c | 6 +-- 5 files changed, 47 insertions(+), 76 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index df8fc1b15bd..18b2725aae0 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -267,9 +267,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->is_client = is_client; t->flow_control.remote_window = DEFAULT_WINDOW; t->flow_control.announced_window = DEFAULT_WINDOW; -#ifndef NDEBUG t->flow_control.t = t; -#endif t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->is_first_frame = true; grpc_connectivity_state_init( @@ -708,10 +706,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, post_destructive_reclaimer(exec_ctx, t); } -#ifndef NDEBUG s->flow_control.s = s; -#endif - GPR_TIMER_END("init_stream", 0); return 0; @@ -1458,16 +1453,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (!s->read_closed) { grpc_chttp2_flowctl_incoming_bs_update( - &t->flow_control, &s->flow_control, - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - 5, already_received); + &t->flow_control, &s->flow_control, 5, already_received); grpc_chttp2_flowctl_act_on_action( exec_ctx, - grpc_chttp2_flowctl_get_action( - &t->flow_control, &s->flow_control, false, - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), + grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), t, s); } } @@ -2562,17 +2551,13 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, size_t cur_length = s->frame_storage.length; if (!s->read_closed) { - grpc_chttp2_flowctl_incoming_bs_update( - &t->flow_control, &s->flow_control, - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - bs->next_action.max_size_hint, cur_length); + grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control, + bs->next_action.max_size_hint, + cur_length); grpc_chttp2_flowctl_act_on_action( - exec_ctx, grpc_chttp2_flowctl_get_action( - &t->flow_control, &s->flow_control, false, - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), - t, s); + exec_ctx, + grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), t, + s); } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); if (s->frame_storage.length > 0) { diff --git a/src/core/ext/transport/chttp2/transport/flow_control.c b/src/core/ext/transport/chttp2/transport/flow_control.c index 983c8880b45..a3e077fdf23 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.c +++ b/src/core/ext/transport/chttp2/transport/flow_control.c @@ -28,7 +28,7 @@ #include "src/core/lib/support/string.h" static uint32_t grpc_chttp2_target_announced_window( - const grpc_chttp2_transport_flowctl* tfc, uint32_t acked_local_window); + const grpc_chttp2_transport_flowctl* tfc); #ifndef NDEBUG @@ -45,9 +45,7 @@ static void pretrace(shadow_flow_control* shadow_fc, grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { shadow_fc->remote_window = tfc->remote_window; - shadow_fc->target_window = grpc_chttp2_target_announced_window( - tfc, tfc->t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc); shadow_fc->announced_window = tfc->announced_window; if (sfc != NULL) { shadow_fc->remote_window_delta = sfc->remote_window_delta; @@ -78,9 +76,8 @@ static void posttrace(shadow_flow_control* shadow_fc, tfc->t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; char* trw_str = fmt_str(shadow_fc->remote_window, tfc->remote_window); - char* tlw_str = - fmt_str(shadow_fc->target_window, - grpc_chttp2_target_announced_window(tfc, acked_local_window)); + char* tlw_str = fmt_str(shadow_fc->target_window, + grpc_chttp2_target_announced_window(tfc)); char* taw_str = fmt_str(shadow_fc->announced_window, tfc->announced_window); char* srw_str; char* slw_str; @@ -143,10 +140,12 @@ static void trace_action(grpc_chttp2_flowctl_action action) { /* How many bytes of incoming flow control would we like to advertise */ static uint32_t grpc_chttp2_target_announced_window( - const grpc_chttp2_transport_flowctl* tfc, uint32_t acked_init_window) { + const grpc_chttp2_transport_flowctl* tfc) { return (uint32_t)GPR_MIN( (int64_t)((1u << 31) - 1), - tfc->announced_stream_total_over_incoming_window + acked_init_window); + tfc->announced_stream_total_over_incoming_window + + tfc->t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); } // we have sent data on the wire, we must track this in our bookkeeping for the @@ -187,9 +186,13 @@ static void announced_window_delta_postupdate( // Returns an error if the incoming frame violates our flow control. grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc, - int64_t incoming_frame_size, - uint32_t acked_init_window, - uint32_t sent_init_window) { + int64_t incoming_frame_size) { + uint32_t sent_init_window = + tfc->t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + uint32_t acked_init_window = + tfc->t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; PRETRACE(tfc, sfc); if (incoming_frame_size > tfc->announced_window) { char* msg; @@ -244,14 +247,12 @@ grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc, // Returns a non zero announce integer if we should send a transport window // update uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( - grpc_chttp2_transport_flowctl* tfc, uint32_t acked_init_window, - bool has_outbuf) { + grpc_chttp2_transport_flowctl* tfc) { PRETRACE(tfc, NULL); - uint32_t target_announced_window = - grpc_chttp2_target_announced_window(tfc, acked_init_window); + uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); uint32_t threshold_to_send_transport_window_update = - has_outbuf ? 3 * target_announced_window / 4 - : target_announced_window / 2; + tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4 + : target_announced_window / 2; if (tfc->announced_window <= threshold_to_send_transport_window_update && tfc->announced_window != target_announced_window) { uint32_t announce = (uint32_t)GPR_CLAMP( @@ -304,11 +305,13 @@ void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc, void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc, - uint32_t sent_init_window, size_t max_size_hint, size_t have_already) { PRETRACE(tfc, sfc); uint32_t max_recv_bytes; + uint32_t sent_init_window = + tfc->t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; /* clamp max recv hint to an allowable size */ if (max_size_hint >= UINT32_MAX - sent_init_window) { @@ -341,16 +344,17 @@ void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( const grpc_chttp2_transport_flowctl* tfc, - const grpc_chttp2_stream_flowctl* sfc, bool stream_read_closed, - uint32_t sent_init_window) { + const grpc_chttp2_stream_flowctl* sfc) { grpc_chttp2_flowctl_action action; memset(&action, 0, sizeof(action)); - uint32_t target_announced_window = - grpc_chttp2_target_announced_window(tfc, sent_init_window); + uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); if (tfc->announced_window < target_announced_window / 2) { action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; } - if (sfc != NULL && !stream_read_closed) { + if (sfc != NULL && !sfc->s->read_closed) { + uint32_t sent_init_window = + tfc->t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; if ((int64_t)sfc->local_window_delta > (int64_t)sfc->announced_window_delta && (int64_t)sfc->announced_window_delta + sent_init_window <= diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index ba9949d11f4..21ba75f4c08 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -238,10 +238,8 @@ typedef struct { * to send WINDOW_UPDATE frames. */ int64_t announced_window; -// pointer back to transport for tracing -#ifndef NDEBUG + // read only pointer back to transport for certain data const grpc_chttp2_transport *t; -#endif } grpc_chttp2_transport_flowctl; struct grpc_chttp2_transport { @@ -457,10 +455,8 @@ typedef struct { * announced to the peer */ int64_t announced_window_delta; -// pointer back to stream for tracing -#ifndef NDEBUG + // read only pointer back to stream for data const grpc_chttp2_stream *s; -#endif } grpc_chttp2_stream_flowctl; struct grpc_chttp2_stream { @@ -645,13 +641,10 @@ void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc, // we have received data from the wire grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc, - int64_t incoming_frame_size, - uint32_t acked_init_window, - uint32_t sent_init_window); + int64_t incoming_frame_size); uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( - grpc_chttp2_transport_flowctl *tfc, uint32_t acked_init_window, - bool has_outbuf); + grpc_chttp2_transport_flowctl *tfc); uint32_t grpc_chttp2_flowctl_maybe_send_stream_update( grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc); @@ -668,7 +661,6 @@ void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc, // the application is asking for a certain amount of bytes void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc, - uint32_t initial_window_size, size_t max_size_hint, size_t have_already); @@ -692,8 +684,7 @@ typedef struct { grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( const grpc_chttp2_transport_flowctl *tfc, - const grpc_chttp2_stream_flowctl *sfc, bool stream_read_closed, - uint32_t acked_init_window); + const grpc_chttp2_stream_flowctl *sfc); void grpc_chttp2_flowctl_act_on_action(grpc_exec_ctx *exec_ctx, grpc_chttp2_flowctl_action action, diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index c03d4758d60..ec006ff0e1b 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -354,19 +354,12 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); grpc_error *err = GRPC_ERROR_NONE; - err = grpc_chttp2_flowctl_recv_data( - &t->flow_control, s == NULL ? NULL : &s->flow_control, - t->incoming_frame_size, - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + err = grpc_chttp2_flowctl_recv_data(&t->flow_control, + s == NULL ? NULL : &s->flow_control, + t->incoming_frame_size); grpc_chttp2_flowctl_act_on_action( exec_ctx, grpc_chttp2_flowctl_get_action( - &t->flow_control, s == NULL ? NULL : &s->flow_control, - s == NULL ? false : s->read_closed, - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), + &t->flow_control, s == NULL ? NULL : &s->flow_control), t, s); if (err != GRPC_ERROR_NONE) { goto error_handler; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 8c2e763330d..726af39a2ed 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -397,10 +397,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } } - uint32_t transport_announce = grpc_chttp2_flowctl_maybe_send_transport_update( - &t->flow_control, t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - t->outbuf.count > 0); + uint32_t transport_announce = + grpc_chttp2_flowctl_maybe_send_transport_update(&t->flow_control); if (transport_announce) { maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);