Add read only refs to transport and stream

pull/11720/head
ncteisen 7 years ago
parent 3c909d55fd
commit 00f74a914a
  1. 31
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 52
      src/core/ext/transport/chttp2/transport/flow_control.c
  3. 19
      src/core/ext/transport/chttp2/transport/internal.h
  4. 15
      src/core/ext/transport/chttp2/transport/parsing.c
  5. 6
      src/core/ext/transport/chttp2/transport/writing.c

@ -267,9 +267,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->is_client = is_client;
t->flow_control.remote_window = DEFAULT_WINDOW;
t->flow_control.announced_window = DEFAULT_WINDOW;
#ifndef NDEBUG
t->flow_control.t = t;
#endif
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@ -708,10 +706,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
post_destructive_reclaimer(exec_ctx, t);
}
#ifndef NDEBUG
s->flow_control.s = s;
#endif
GPR_TIMER_END("init_stream", 0);
return 0;
@ -1458,16 +1453,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (!s->read_closed) {
grpc_chttp2_flowctl_incoming_bs_update(
&t->flow_control, &s->flow_control,
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
5, already_received);
&t->flow_control, &s->flow_control, 5, already_received);
grpc_chttp2_flowctl_act_on_action(
exec_ctx,
grpc_chttp2_flowctl_get_action(
&t->flow_control, &s->flow_control, false,
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control),
t, s);
}
}
@ -2562,17 +2551,13 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
size_t cur_length = s->frame_storage.length;
if (!s->read_closed) {
grpc_chttp2_flowctl_incoming_bs_update(
&t->flow_control, &s->flow_control,
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
bs->next_action.max_size_hint, cur_length);
grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control,
bs->next_action.max_size_hint,
cur_length);
grpc_chttp2_flowctl_act_on_action(
exec_ctx, grpc_chttp2_flowctl_get_action(
&t->flow_control, &s->flow_control, false,
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
t, s);
exec_ctx,
grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), t,
s);
}
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
if (s->frame_storage.length > 0) {

@ -28,7 +28,7 @@
#include "src/core/lib/support/string.h"
static uint32_t grpc_chttp2_target_announced_window(
const grpc_chttp2_transport_flowctl* tfc, uint32_t acked_local_window);
const grpc_chttp2_transport_flowctl* tfc);
#ifndef NDEBUG
@ -45,9 +45,7 @@ static void pretrace(shadow_flow_control* shadow_fc,
grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc) {
shadow_fc->remote_window = tfc->remote_window;
shadow_fc->target_window = grpc_chttp2_target_announced_window(
tfc, tfc->t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc);
shadow_fc->announced_window = tfc->announced_window;
if (sfc != NULL) {
shadow_fc->remote_window_delta = sfc->remote_window_delta;
@ -78,9 +76,8 @@ static void posttrace(shadow_flow_control* shadow_fc,
tfc->t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
char* trw_str = fmt_str(shadow_fc->remote_window, tfc->remote_window);
char* tlw_str =
fmt_str(shadow_fc->target_window,
grpc_chttp2_target_announced_window(tfc, acked_local_window));
char* tlw_str = fmt_str(shadow_fc->target_window,
grpc_chttp2_target_announced_window(tfc));
char* taw_str = fmt_str(shadow_fc->announced_window, tfc->announced_window);
char* srw_str;
char* slw_str;
@ -143,10 +140,12 @@ static void trace_action(grpc_chttp2_flowctl_action action) {
/* How many bytes of incoming flow control would we like to advertise */
static uint32_t grpc_chttp2_target_announced_window(
const grpc_chttp2_transport_flowctl* tfc, uint32_t acked_init_window) {
const grpc_chttp2_transport_flowctl* tfc) {
return (uint32_t)GPR_MIN(
(int64_t)((1u << 31) - 1),
tfc->announced_stream_total_over_incoming_window + acked_init_window);
tfc->announced_stream_total_over_incoming_window +
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
}
// we have sent data on the wire, we must track this in our bookkeeping for the
@ -187,9 +186,13 @@ static void announced_window_delta_postupdate(
// Returns an error if the incoming frame violates our flow control.
grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc,
int64_t incoming_frame_size,
uint32_t acked_init_window,
uint32_t sent_init_window) {
int64_t incoming_frame_size) {
uint32_t sent_init_window =
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
uint32_t acked_init_window =
tfc->t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
PRETRACE(tfc, sfc);
if (incoming_frame_size > tfc->announced_window) {
char* msg;
@ -244,14 +247,12 @@ grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc,
// Returns a non zero announce integer if we should send a transport window
// update
uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
grpc_chttp2_transport_flowctl* tfc, uint32_t acked_init_window,
bool has_outbuf) {
grpc_chttp2_transport_flowctl* tfc) {
PRETRACE(tfc, NULL);
uint32_t target_announced_window =
grpc_chttp2_target_announced_window(tfc, acked_init_window);
uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
uint32_t threshold_to_send_transport_window_update =
has_outbuf ? 3 * target_announced_window / 4
: target_announced_window / 2;
tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4
: target_announced_window / 2;
if (tfc->announced_window <= threshold_to_send_transport_window_update &&
tfc->announced_window != target_announced_window) {
uint32_t announce = (uint32_t)GPR_CLAMP(
@ -304,11 +305,13 @@ void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc,
void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_stream_flowctl* sfc,
uint32_t sent_init_window,
size_t max_size_hint,
size_t have_already) {
PRETRACE(tfc, sfc);
uint32_t max_recv_bytes;
uint32_t sent_init_window =
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
/* clamp max recv hint to an allowable size */
if (max_size_hint >= UINT32_MAX - sent_init_window) {
@ -341,16 +344,17 @@ void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc,
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
const grpc_chttp2_transport_flowctl* tfc,
const grpc_chttp2_stream_flowctl* sfc, bool stream_read_closed,
uint32_t sent_init_window) {
const grpc_chttp2_stream_flowctl* sfc) {
grpc_chttp2_flowctl_action action;
memset(&action, 0, sizeof(action));
uint32_t target_announced_window =
grpc_chttp2_target_announced_window(tfc, sent_init_window);
uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
if (tfc->announced_window < target_announced_window / 2) {
action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
}
if (sfc != NULL && !stream_read_closed) {
if (sfc != NULL && !sfc->s->read_closed) {
uint32_t sent_init_window =
tfc->t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
if ((int64_t)sfc->local_window_delta >
(int64_t)sfc->announced_window_delta &&
(int64_t)sfc->announced_window_delta + sent_init_window <=

@ -238,10 +238,8 @@ typedef struct {
* to send WINDOW_UPDATE frames. */
int64_t announced_window;
// pointer back to transport for tracing
#ifndef NDEBUG
// read only pointer back to transport for certain data
const grpc_chttp2_transport *t;
#endif
} grpc_chttp2_transport_flowctl;
struct grpc_chttp2_transport {
@ -457,10 +455,8 @@ typedef struct {
* announced to the peer */
int64_t announced_window_delta;
// pointer back to stream for tracing
#ifndef NDEBUG
// read only pointer back to stream for data
const grpc_chttp2_stream *s;
#endif
} grpc_chttp2_stream_flowctl;
struct grpc_chttp2_stream {
@ -645,13 +641,10 @@ void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc,
// we have received data from the wire
grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc,
int64_t incoming_frame_size,
uint32_t acked_init_window,
uint32_t sent_init_window);
int64_t incoming_frame_size);
uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
grpc_chttp2_transport_flowctl *tfc, uint32_t acked_init_window,
bool has_outbuf);
grpc_chttp2_transport_flowctl *tfc);
uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc);
@ -668,7 +661,6 @@ void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc,
// the application is asking for a certain amount of bytes
void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc,
grpc_chttp2_stream_flowctl *sfc,
uint32_t initial_window_size,
size_t max_size_hint,
size_t have_already);
@ -692,8 +684,7 @@ typedef struct {
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
const grpc_chttp2_transport_flowctl *tfc,
const grpc_chttp2_stream_flowctl *sfc, bool stream_read_closed,
uint32_t acked_init_window);
const grpc_chttp2_stream_flowctl *sfc);
void grpc_chttp2_flowctl_act_on_action(grpc_exec_ctx *exec_ctx,
grpc_chttp2_flowctl_action action,

@ -354,19 +354,12 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s =
grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
grpc_error *err = GRPC_ERROR_NONE;
err = grpc_chttp2_flowctl_recv_data(
&t->flow_control, s == NULL ? NULL : &s->flow_control,
t->incoming_frame_size,
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
t->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
err = grpc_chttp2_flowctl_recv_data(&t->flow_control,
s == NULL ? NULL : &s->flow_control,
t->incoming_frame_size);
grpc_chttp2_flowctl_act_on_action(
exec_ctx, grpc_chttp2_flowctl_get_action(
&t->flow_control, s == NULL ? NULL : &s->flow_control,
s == NULL ? false : s->read_closed,
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
&t->flow_control, s == NULL ? NULL : &s->flow_control),
t, s);
if (err != GRPC_ERROR_NONE) {
goto error_handler;

@ -397,10 +397,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
}
}
uint32_t transport_announce = grpc_chttp2_flowctl_maybe_send_transport_update(
&t->flow_control, t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
t->outbuf.count > 0);
uint32_t transport_announce =
grpc_chttp2_flowctl_maybe_send_transport_update(&t->flow_control);
if (transport_announce) {
maybe_initiate_ping(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);

Loading…
Cancel
Save