Fix flow control issue, make debugging in the future easier

reviewable/pr6737/r3
Craig Tiller 9 years ago
parent 79a904a536
commit c0e73da8c2
  1. 47
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 7
      src/core/ext/transport/chttp2/transport/internal.h
  3. 6
      src/core/ext/transport/chttp2/transport/parsing.c
  4. 12
      src/core/ext/transport/chttp2/transport/stream_lists.c
  5. 6
      src/core/ext/transport/chttp2/transport/writing.c

@ -295,7 +295,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_slice_buffer_add(
&t->global.qbuf,
gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
grpc_chttp2_initiate_write(exec_ctx, &t->global, false);
grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write");
}
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet
@ -799,14 +799,14 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
bool covered_by_poller) {
bool covered_by_poller, const char *reason) {
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
switch (t->executor.write_state) {
case GRPC_CHTTP2_WRITING_INACTIVE:
set_write_state(t, covered_by_poller
? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER
: GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
"initiate_write");
reason);
break;
case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
/* nothing to do: write already requested */
@ -815,7 +815,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
if (covered_by_poller) {
/* upgrade to note poller is available to cover the write */
set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
"initiate_write");
reason);
}
break;
case GRPC_CHTTP2_WRITE_SCHEDULED:
@ -825,7 +825,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
set_write_state(t,
covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER
: GRPC_CHTTP2_WRITING_STALE_NO_POLLER,
"initiate_write");
reason);
break;
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
/* nothing to do: write already requested */
@ -834,7 +834,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
if (covered_by_poller) {
/* upgrade to note poller is available to cover the write */
set_write_state(t, GRPC_CHTTP2_WRITING_STALE_WITH_POLLER,
"initiate_write");
reason);
}
break;
}
@ -881,11 +881,11 @@ static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg,
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
bool covered_by_poller) {
bool covered_by_poller, const char *reason) {
if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed &&
grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller);
grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller, reason);
}
}
@ -901,7 +901,7 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) {
t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->global.dirtied_local_settings = 1;
grpc_chttp2_initiate_write(exec_ctx, &t->global, false);
grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "push_setting");
}
}
@ -1040,7 +1040,7 @@ static void maybe_start_some_streams(
stream_global->in_stream_map = true;
transport_global->concurrent_stream_count++;
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
true);
true, "new_stream");
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@ -1176,7 +1176,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
} else {
GPR_ASSERT(stream_global->id != 0);
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
true);
true, "op.send_initial_metadata");
}
} else {
stream_global->send_trailing_metadata = NULL;
@ -1202,7 +1202,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
stream_global->send_message = op->send_message;
if (stream_global->id != 0) {
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
true);
true, "op.send_message");
}
}
}
@ -1247,7 +1247,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
true);
true, "op.send_trailing_metadata");
}
}
}
@ -1313,7 +1313,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
p->id[7] = (uint8_t)(t->global.ping_counter & 0xff);
p->on_recv = on_recv;
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
grpc_chttp2_initiate_write(exec_ctx, &t->global, true);
grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping");
}
static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@ -1373,7 +1373,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
close_transport = grpc_chttp2_has_streams(t)
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("GOAWAY sent");
grpc_chttp2_initiate_write(exec_ctx, &t->global, false);
grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "goaway_sent");
}
if (op->set_accept_stream) {
@ -1578,7 +1578,7 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
&transport_global->qbuf,
grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error,
&stream_global->stats.outgoing));
grpc_chttp2_initiate_write(exec_ctx, transport_global, false);
grpc_chttp2_initiate_write(exec_ctx, transport_global, false, "rst_stream");
}
const char *msg =
@ -1844,7 +1844,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1, error);
grpc_chttp2_initiate_write(exec_ctx, transport_global, false);
grpc_chttp2_initiate_write(exec_ctx, transport_global, false, "close_from_api");
}
typedef struct {
@ -1896,7 +1896,7 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
if (was_zero && !is_zero) {
grpc_chttp2_become_writable(a->exec_ctx, transport_global, stream_global,
true);
true, "update_global_window");
}
}
@ -2007,7 +2007,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* copy parsing qbuf to global qbuf */
if (t->parsing.qbuf.count > 0) {
gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
grpc_chttp2_initiate_write(exec_ctx, transport_global, false);
grpc_chttp2_initiate_write(exec_ctx, transport_global, false, "parsing_qbuf");
}
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map);
@ -2176,6 +2176,7 @@ static void incoming_byte_stream_update_flow_control(
if (stream_global->max_recv_bytes < max_recv_bytes) {
uint32_t add_max_recv_bytes =
max_recv_bytes - stream_global->max_recv_bytes;
gpr_log(GPR_DEBUG, "add_max_recv_bytes:%d", add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
max_recv_bytes, add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
@ -2187,7 +2188,7 @@ static void incoming_byte_stream_update_flow_control(
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
stream_global);
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
false);
false, "read_incoming_stream");
}
}
@ -2395,7 +2396,7 @@ static char *format_flowctl_context_var(const char *context, const char *var,
if (context == NULL) {
*scope = NULL;
gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val);
result = gpr_leftpad(buf, ' ', 40);
result = gpr_leftpad(buf, ' ', 60);
gpr_free(buf);
return result;
}
@ -2408,7 +2409,7 @@ static char *format_flowctl_context_var(const char *context, const char *var,
gpr_free(tmp);
}
gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val);
result = gpr_leftpad(buf, ' ', 40);
result = gpr_leftpad(buf, ' ', 60);
gpr_free(buf);
return result;
}
@ -2441,7 +2442,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
tmp_phase = gpr_leftpad(phase, ' ', 8);
tmp_scope1 = gpr_leftpad(scope1, ' ', 11);
gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1);
gpr_asprintf(&prefix, "FLOW %s: %s %s ", tmp_phase, clisvr, scope1);
gpr_free(tmp_phase);
gpr_free(tmp_scope1);

@ -542,7 +542,7 @@ struct grpc_chttp2_stream {
should be performed. */
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
bool covered_by_poller);
bool covered_by_poller, const char *reason);
/** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */
@ -631,8 +631,7 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
void grpc_chttp2_list_flush_writing_stalled_by_transport(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
bool is_window_available);
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing);
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
@ -845,6 +844,6 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
bool covered_by_poller);
bool covered_by_poller, const char *reason);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */

@ -157,7 +157,7 @@ void grpc_chttp2_publish_reads(
while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
&stream_global)) {
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
false);
false, "transport.read_flow_control");
}
}
@ -169,7 +169,7 @@ void grpc_chttp2_publish_reads(
announce_incoming_window, announce_bytes);
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing,
incoming_window, announce_bytes);
grpc_chttp2_initiate_write(exec_ctx, transport_global, false);
grpc_chttp2_initiate_write(exec_ctx, transport_global, false, "global incoming window");
}
/* for each stream that saw an update, fixup global state */
@ -193,7 +193,7 @@ void grpc_chttp2_publish_reads(
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
false);
false, "stream.read_flow_control");
}
stream_global->max_recv_bytes -= (uint32_t)GPR_MIN(

@ -337,19 +337,13 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
}
void grpc_chttp2_list_flush_writing_stalled_by_transport(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
bool is_window_available) {
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream *stream;
grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing);
while (stream_list_pop(transport, &stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
if (is_window_available) {
grpc_chttp2_become_writable(exec_ctx, &transport->global, &stream->global,
true);
} else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
&stream->writing);
}
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
&stream->writing);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global,
"chttp2_writing_stalled");
}

@ -75,9 +75,6 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
bool is_window_available = transport_writing->outgoing_window > 0;
grpc_chttp2_list_flush_writing_stalled_by_transport(
exec_ctx, transport_writing, is_window_available);
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
@ -331,6 +328,9 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx,
transport_writing);
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_initial_metadata) {

Loading…
Cancel
Save