diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index c1f6df6aa48..0ad62a9999b 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -133,8 +133,13 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: if (cur == end) { + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, + stream_parsing); return GRPC_CHTTP2_PARSE_OK; - } else if ((gpr_uint32)(end - cur) == p->frame_size) { + } + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, + stream_parsing); + if ((gpr_uint32)(end - cur) == p->frame_size) { grpc_sopb_add_slice(&p->incoming_sopb, gpr_slice_sub(slice, cur - beg, end - beg)); p->state = GRPC_CHTTP2_DATA_FH_0; diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c index 6c963aa44dd..b817df77459 100644 --- a/src/core/transport/chttp2/frame_window_update.c +++ b/src/core/transport/chttp2/frame_window_update.c @@ -96,9 +96,16 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( if (transport_parsing->incoming_stream_id) { if (stream_parsing) { + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing, + stream_parsing, outgoing_window_update, + p->amount); stream_parsing->outgoing_window_update += p->amount; + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, + stream_parsing); } } else { + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("update", transport_parsing, + outgoing_window_update, p->amount); transport_parsing->outgoing_window_update += p->amount; } } diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index 4b11d46cfed..b8ab664db50 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -1395,6 +1395,8 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( &stream_parsing->incoming_metadata, &stream_parsing->data_parser.incoming_sopb); + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, + stream_parsing); } if (parser->is_eof) { stream_parsing->received_close = 1; diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index 87b0a23795e..ea5215b1bd9 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -71,7 +71,8 @@ void grpc_chttp2_incoming_metadata_live_op_buffer_end( buffer->elems = NULL; } -void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb) { +void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( + grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb) { grpc_metadata_batch b; b.list.head = NULL; @@ -79,7 +80,7 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_ we can reconstitute the list. We can't do list building here as later incoming metadata may reallocate the underlying array. */ - b.list.tail = (void*)(gpr_intptr)buffer->count; + b.list.tail = (void *)(gpr_intptr)buffer->count; b.garbage.head = b.garbage.tail = NULL; b.deadline = buffer->deadline; buffer->deadline = gpr_inf_future; @@ -87,14 +88,15 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_ grpc_sopb_add_metadata(sopb, b); } -void grpc_chttp2_incoming_metadata_buffer_swap(grpc_chttp2_incoming_metadata_buffer *a, grpc_chttp2_incoming_metadata_buffer *b) { +void grpc_chttp2_incoming_metadata_buffer_swap( + grpc_chttp2_incoming_metadata_buffer *a, + grpc_chttp2_incoming_metadata_buffer *b) { GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, *a, *b); } void grpc_incoming_metadata_buffer_move_to_referencing_sopb( - grpc_chttp2_incoming_metadata_buffer *src, - grpc_chttp2_incoming_metadata_buffer *dst, - grpc_stream_op_buffer *sopb) { + grpc_chttp2_incoming_metadata_buffer *src, + grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb) { size_t delta; size_t i; if (gpr_time_cmp(dst->deadline, gpr_inf_future) == 0) { @@ -119,7 +121,8 @@ void grpc_incoming_metadata_buffer_move_to_referencing_sopb( dst->count += src->count; for (i = 0; i < sopb->nops; i++) { if (sopb->ops[i].type != GRPC_OP_METADATA) continue; - sopb->ops[i].data.metadata.list.tail = (void*)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail); + sopb->ops[i].data.metadata.list.tail = + (void *)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail); } } diff --git a/src/core/transport/chttp2/incoming_metadata.h b/src/core/transport/chttp2/incoming_metadata.h index bc7e3816bcb..2f1de411bae 100644 --- a/src/core/transport/chttp2/incoming_metadata.h +++ b/src/core/transport/chttp2/incoming_metadata.h @@ -67,9 +67,8 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb); void grpc_incoming_metadata_buffer_move_to_referencing_sopb( - grpc_chttp2_incoming_metadata_buffer *src, - grpc_chttp2_incoming_metadata_buffer *dst, - grpc_stream_op_buffer *sopb); + grpc_chttp2_incoming_metadata_buffer *src, + grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb); void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb, diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 644523e8104..96390fe7ce5 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -218,6 +218,8 @@ typedef struct { gpr_slice_buffer outbuf; /** hpack encoding */ grpc_chttp2_hpack_compressor hpack_compressor; + /** is this a client? */ + gpr_uint8 is_client; } grpc_chttp2_transport_writing; struct grpc_chttp2_transport_parsing { @@ -252,6 +254,7 @@ struct grpc_chttp2_transport_parsing { /** window available for peer to send to us */ gpr_uint32 incoming_window; + gpr_uint32 incoming_window_delta; /** next stream id available at the time of beginning parsing */ gpr_uint32 next_stream_id; @@ -601,13 +604,15 @@ void grpc_chttp2_for_all_streams( void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global)); -void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport_parsing *transport_parsing); +void grpc_chttp2_parsing_become_skip_parser( + grpc_chttp2_transport_parsing *transport_parsing); #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) extern int grpc_http_trace; +extern int grpc_flowctl_trace; #define IF_TRACING(stmt) \ if (!(grpc_http_trace)) \ @@ -615,4 +620,25 @@ extern int grpc_http_trace; else \ stmt +#define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \ + delta) \ + if (!(grpc_flowctl_trace)) { \ + } else { \ + grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \ + transport->is_client, context->id, context->var, \ + delta); \ + } + +#define GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(reason, context, var, delta) \ + if (!(grpc_flowctl_trace)) { \ + } else { \ + grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \ + context->is_client, 0, context->var, delta); \ + } + +void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, + const char *context, const char *var, + int is_client, gpr_uint32 stream_id, + gpr_int64 current_value, gpr_int64 delta); + #endif diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 93e8dcc1c8e..f33b54f167c 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -60,16 +60,30 @@ static int init_skip_frame_parser( static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last); -void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_parsing *transport_parsing) { +void grpc_chttp2_prepare_to_read( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_transport_parsing *transport_parsing) { grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_parsing *stream_parsing; /* update the parsing view of incoming window */ - transport_parsing->incoming_window = transport_global->incoming_window; + if (transport_parsing->incoming_window != transport_global->incoming_window) { + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( + "parse", transport_parsing, incoming_window, + (gpr_int64)transport_global->incoming_window - + (gpr_int64)transport_parsing->incoming_window); + transport_parsing->incoming_window = transport_global->incoming_window; + } while (grpc_chttp2_list_pop_incoming_window_updated( transport_global, transport_parsing, &stream_global, &stream_parsing)) { - stream_parsing->incoming_window = transport_parsing->incoming_window; + stream_parsing->id = stream_global->id; + if (stream_parsing->incoming_window != stream_global->incoming_window) { + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "parse", transport_parsing, stream_parsing, incoming_window, + (gpr_int64)stream_global->incoming_window - + (gpr_int64)stream_parsing->incoming_window); + stream_parsing->incoming_window = stream_global->incoming_window; + } } } @@ -95,33 +109,6 @@ void grpc_chttp2_publish_reads( /* TODO(ctiller): re-implement */ GPR_ASSERT(transport_parsing->initial_window_update == 0); -#if 0 - while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { - int publish = 0; - GPR_ASSERT(s->incoming_sopb); - *s->publish_state = - compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed); - if (*s->publish_state != s->published_state) { - s->published_state = *s->publish_state; - publish = 1; - if (s->published_state == GRPC_STREAM_CLOSED) { - remove_from_stream_map(t, s); - } - } - if (s->parser.incoming_sopb.nops > 0) { - grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb); - publish = 1; - } - if (publish) { - if (s->incoming_metadata_count > 0) { - patch_metadata_ops(s); - } - s->incoming_sopb = NULL; - schedule_cb(t, s->global.recv_done_closure, 1); - } - } -#endif - /* copy parsing qbuf to global qbuf */ gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf); @@ -149,11 +136,42 @@ void grpc_chttp2_publish_reads( transport_parsing->goaway_received = 0; } + /* propagate flow control tokens to global state */ + if (transport_parsing->outgoing_window_update) { + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( + "parsed", transport_global, outgoing_window, + transport_parsing->outgoing_window_update); + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( + "parsed", transport_parsing, outgoing_window_update, + -(gpr_int64)transport_parsing->outgoing_window_update); + transport_global->outgoing_window += + transport_parsing->outgoing_window_update; + transport_parsing->outgoing_window_update = 0; + } + + if (transport_parsing->incoming_window_delta) { + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( + "parsed", transport_global, incoming_window, + -(gpr_int64)transport_parsing->incoming_window_delta); + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( + "parsed", transport_parsing, incoming_window_delta, + -(gpr_int64)transport_parsing->incoming_window_delta); + transport_global->incoming_window -= + transport_parsing->incoming_window_delta; + transport_parsing->incoming_window_delta = 0; + } + /* for each stream that saw an update, fixup global state */ while (grpc_chttp2_list_pop_parsing_seen_stream( transport_global, transport_parsing, &stream_global, &stream_parsing)) { /* update incoming flow control window */ if (stream_parsing->incoming_window_delta) { + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "parsed", transport_parsing, stream_global, incoming_window, + -(gpr_int64)stream_parsing->incoming_window_delta); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "parsed", transport_parsing, stream_parsing, incoming_window_delta, + -(gpr_int64)stream_parsing->incoming_window_delta); stream_global->incoming_window -= stream_parsing->incoming_window_delta; stream_parsing->incoming_window_delta = 0; grpc_chttp2_list_add_writable_window_update_stream(transport_global, @@ -164,9 +182,16 @@ void grpc_chttp2_publish_reads( if (stream_parsing->outgoing_window_update) { int was_zero = stream_global->outgoing_window <= 0; int is_zero; + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("parsed", transport_parsing, + stream_global, outgoing_window, + stream_parsing->outgoing_window_update); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "parsed", transport_parsing, stream_parsing, outgoing_window_update, + -(gpr_int64)stream_parsing->outgoing_window_update); stream_global->outgoing_window += stream_parsing->outgoing_window_update; stream_parsing->outgoing_window_update = 0; is_zero = stream_global->outgoing_window <= 0; + gpr_log(GPR_DEBUG, "was=%d is=%d", was_zero, is_zero); if (was_zero && !is_zero) { grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } @@ -186,8 +211,11 @@ void grpc_chttp2_publish_reads( /* publish incoming stream ops */ if (stream_parsing->data_parser.incoming_sopb.nops > 0) { - grpc_incoming_metadata_buffer_move_to_referencing_sopb(&stream_parsing->incoming_metadata, &stream_global->incoming_metadata, &stream_parsing->data_parser.incoming_sopb); - grpc_sopb_move_to(&stream_parsing->data_parser.incoming_sopb, &stream_global->incoming_sopb); + grpc_incoming_metadata_buffer_move_to_referencing_sopb( + &stream_parsing->incoming_metadata, &stream_global->incoming_metadata, + &stream_parsing->data_parser.incoming_sopb); + grpc_sopb_move_to(&stream_parsing->data_parser.incoming_sopb, + &stream_global->incoming_sopb); grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); } @@ -476,7 +504,22 @@ static grpc_chttp2_parse_error update_incoming_window( return GRPC_CHTTP2_CONNECTION_ERROR; } + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( + "data", transport_parsing, incoming_window, + -(gpr_int64)transport_parsing->incoming_frame_size); + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("data", transport_parsing, + incoming_window_delta, + transport_parsing->incoming_frame_size); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "data", transport_parsing, stream_parsing, incoming_window, + -(gpr_int64)transport_parsing->incoming_frame_size); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("data", transport_parsing, stream_parsing, + incoming_window_delta, + transport_parsing->incoming_frame_size); + transport_parsing->incoming_window -= transport_parsing->incoming_frame_size; + transport_parsing->incoming_window_delta += + transport_parsing->incoming_frame_size; stream_parsing->incoming_window -= transport_parsing->incoming_frame_size; stream_parsing->incoming_window_delta += transport_parsing->incoming_frame_size; @@ -649,6 +692,10 @@ static int init_window_update_frame_parser( &transport_parsing->simple.window_update, transport_parsing->incoming_frame_size, transport_parsing->incoming_frame_flags); + if (transport_parsing->incoming_stream_id) { + transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( + transport_parsing, transport_parsing->incoming_stream_id); + } transport_parsing->parser = grpc_chttp2_window_update_parser_parse; transport_parsing->parser_data = &transport_parsing->simple.window_update; return ok; @@ -670,8 +717,8 @@ static int init_rst_stream_parser( &transport_parsing->simple.rst_stream, transport_parsing->incoming_frame_size, transport_parsing->incoming_frame_flags); - transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream(transport_parsing, - transport_parsing->incoming_stream_id); + transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( + transport_parsing, transport_parsing->incoming_stream_id); if (!transport_parsing->incoming_stream) { return init_skip_frame_parser(transport_parsing, 0); } diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 72431ff6a65..bac9060cb80 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -100,8 +100,9 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, } } -static void stream_list_maybe_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_chttp2_stream_list_id id) { +static void stream_list_maybe_remove(grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + grpc_chttp2_stream_list_id id) { if (s->included[id]) { stream_list_remove(t, s, id); } @@ -299,7 +300,9 @@ int grpc_chttp2_list_pop_incoming_window_updated( void grpc_chttp2_list_remove_incoming_window_updated( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED); + stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED); } void grpc_chttp2_list_add_read_write_state_changed( @@ -314,7 +317,8 @@ int grpc_chttp2_list_pop_read_write_state_changed( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED); + int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, + GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED); *stream_global = &stream->global; return r; } diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 97b2d4471cc..980af295526 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -77,9 +77,18 @@ int grpc_chttp2_unlocking_check_writes( GPR_MIN(transport_global->outgoing_window, stream_global->outgoing_window), &stream_writing->sopb); + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( + "write", transport_global, outgoing_window, -(gpr_int64)window_delta); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, + outgoing_window, -(gpr_int64)window_delta); transport_global->outgoing_window -= window_delta; stream_global->outgoing_window -= window_delta; + gpr_log(GPR_DEBUG, "%s ws:%d nops:%d rc:%d", + transport_global->is_client ? "CLI" : "SVR", + stream_global->write_state, stream_global->outgoing_sopb->nops, + stream_global->read_closed); + if (stream_global->write_state == WRITE_STATE_QUEUED_CLOSE && stream_global->outgoing_sopb->nops == 0) { if (!transport_global->is_client && !stream_global->read_closed) { @@ -115,8 +124,11 @@ int grpc_chttp2_unlocking_check_writes( gpr_slice_buffer_add( &transport_writing->outbuf, grpc_chttp2_window_update_create(stream_global->id, window_delta)); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, + incoming_window, window_delta); stream_global->incoming_window += window_delta; - grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); + grpc_chttp2_list_add_incoming_window_updated(transport_global, + stream_global); } } @@ -128,6 +140,8 @@ int grpc_chttp2_unlocking_check_writes( transport_global->incoming_window; gpr_slice_buffer_add(&transport_writing->outbuf, grpc_chttp2_window_update_create(0, window_delta)); + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("write", transport_global, + incoming_window, window_delta); transport_global->incoming_window += window_delta; } @@ -137,7 +151,8 @@ int grpc_chttp2_unlocking_check_writes( void grpc_chttp2_perform_writes( grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint) { - GPR_ASSERT(transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams(transport_writing)); + GPR_ASSERT(transport_writing->outbuf.count > 0 || + grpc_chttp2_list_have_writing_streams(transport_writing)); finalize_outbuf(transport_writing); @@ -167,6 +182,9 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { while ( grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { + gpr_log(GPR_DEBUG, "%s write %d: sc=%d", + transport_writing->is_client ? "CLI" : "SVR", stream_writing->id, + stream_writing->send_closed); grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, stream_writing->send_closed != DONT_SEND_CLOSED, stream_writing->id, &transport_writing->hpack_compressor, diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 3cd6c1f67b6..4d0e0c94c94 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -47,6 +47,7 @@ #include #include #include +#include #include /* #define REFCOUNTING_DEBUG */ @@ -166,15 +167,19 @@ static void destruct_transport(grpc_chttp2_transport *t) { #ifdef REFCOUNTING_DEBUG #define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__) #define UNREF_TRANSPORT(t, r) unref_transport(t, r, __FILE__, __LINE__) -static void unref_transport(grpc_chttp2_transport *t, const char *reason, const char *file, int line) { - gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count, t->refs.count-1, reason, file, line); +static void unref_transport(grpc_chttp2_transport *t, const char *reason, + const char *file, int line) { + gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count, + t->refs.count - 1, reason, file, line); if (!gpr_unref(&t->refs)) return; destruct_transport(t); } -static void ref_transport(grpc_chttp2_transport *t, const char *reason, const char *file, int line) { - gpr_log(GPR_DEBUG, "chttp2: ref:%p %d->%d %s [%s:%d]", t, t->refs.count, t->refs.count+1, reason, file, line); - gpr_ref(&t->refs); +static void ref_transport(grpc_chttp2_transport *t, const char *reason, + const char *file, int line) { + gpr_log(GPR_DEBUG, "chttp2: ref:%p %d->%d %s [%s:%d]", t, t->refs.count, + t->refs.count + 1, reason, file, line); + gpr_ref(&t->refs); } #else #define REF_TRANSPORT(t, r) ref_transport(t) @@ -221,6 +226,7 @@ static void init_transport(grpc_chttp2_transport *t, t->parsing.str_grpc_timeout = grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); t->parsing.deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; + t->writing.is_client = is_client; gpr_slice_buffer_init(&t->global.qbuf); @@ -378,12 +384,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->global.outgoing_window = t->global .settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->global.incoming_window = + s->parsing.incoming_window = s->global.incoming_window = t->global .settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; - grpc_chttp2_list_add_incoming_window_updated(&t->global, &s->global); - grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s); + grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s); s->global.in_stream_map = 1; } @@ -404,7 +409,8 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { GPR_ASSERT(!s->global.in_stream_map); grpc_chttp2_unregister_stream(t, s); if (!t->parsing_active && s->global.id) { - GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, s->global.id) == NULL); + GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, + s->global.id) == NULL); } gpr_mu_unlock(&t->mu); @@ -525,7 +531,8 @@ void grpc_chttp2_terminate_writing( if (t->ep && !t->endpoint_reading) { grpc_endpoint_destroy(t->ep); t->ep = NULL; - UNREF_TRANSPORT(t, "disconnect"); /* safe because we'll still have the ref for write */ + UNREF_TRANSPORT( + t, "disconnect"); /* safe because we'll still have the ref for write */ } unlock(t); @@ -586,7 +593,8 @@ static void maybe_start_some_streams( stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->in_stream_map = 1; transport_global->concurrent_stream_count++; - grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); + grpc_chttp2_list_add_incoming_window_updated(transport_global, + stream_global); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } /* cancel out streams that will never be started */ @@ -699,7 +707,8 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, } static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { - grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id); + grpc_chttp2_stream *s = + grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id); GPR_ASSERT(s); s->global.in_stream_map = 0; if (t->parsing.incoming_stream == &s->parsing) { @@ -729,25 +738,44 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { } } - while (grpc_chttp2_list_pop_read_write_state_changed(transport_global, &stream_global)) { + while (grpc_chttp2_list_pop_read_write_state_changed(transport_global, + &stream_global)) { if (!stream_global->publish_sopb) { + gpr_log(GPR_DEBUG, "%s %d: skip rw update: no publish target", + transport_global->is_client ? "CLI" : "SVR", stream_global->id); continue; } - if (stream_global->write_state != WRITE_STATE_OPEN && stream_global->read_closed && stream_global->in_stream_map) { + if (stream_global->write_state == WRITE_STATE_SENT_CLOSE && + stream_global->read_closed && stream_global->in_stream_map) { if (t->parsing_active) { - grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, stream_global); + gpr_log(GPR_DEBUG, "%s %d: queue wait for close", + transport_global->is_client ? "CLI" : "SVR", stream_global->id); + grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, + stream_global); } else { + gpr_log(GPR_DEBUG, "%s %d: late removal from map", + transport_global->is_client ? "CLI" : "SVR", stream_global->id); remove_stream(t, stream_global->id); } } - state = compute_state(stream_global->write_state == WRITE_STATE_SENT_CLOSE, stream_global->read_closed && !stream_global->in_stream_map); - if (stream_global->incoming_sopb.nops == 0 && state == stream_global->published_state) { + state = compute_state( + stream_global->write_state == WRITE_STATE_SENT_CLOSE, + stream_global->read_closed && !stream_global->in_stream_map); + gpr_log(GPR_DEBUG, "%s %d: state=%d->%d; nops=%d", + transport_global->is_client ? "CLI" : "SVR", stream_global->id, + stream_global->published_state, state, + stream_global->incoming_sopb.nops); + if (stream_global->incoming_sopb.nops == 0 && + state == stream_global->published_state) { continue; } - grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(&stream_global->incoming_metadata, &stream_global->incoming_sopb, &stream_global->outstanding_metadata); + grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( + &stream_global->incoming_metadata, &stream_global->incoming_sopb, + &stream_global->outstanding_metadata); grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); stream_global->published_state = *stream_global->publish_state = state; - grpc_chttp2_schedule_closure(transport_global, stream_global->recv_done_closure, 1); + grpc_chttp2_schedule_closure(transport_global, + stream_global->recv_done_closure, 1); stream_global->recv_done_closure = NULL; stream_global->publish_sopb = NULL; stream_global->publish_state = NULL; @@ -917,7 +945,8 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, if (!t->writing_active && t->ep) { grpc_endpoint_destroy(t->ep); t->ep = NULL; - UNREF_TRANSPORT(t, "disconnect"); /* safe as we still have a ref for read */ + UNREF_TRANSPORT( + t, "disconnect"); /* safe as we still have a ref for read */ } unlock(t); UNREF_TRANSPORT(t, "recv_data"); @@ -951,32 +980,6 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_chttp2_stream_map_size(&t->parsing_stream_map); t->parsing_active = 0; } -#if 0 - while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) { - maybe_finish_read(t, s, 0); - } - while ((s = stream_list_remove_head(t, PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE))) { - maybe_join_window_updates(t, s); - } - while ((s = stream_list_remove_head(t, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE))) { - maybe_join_window_updates(t, s); - } - while ((s = stream_list_remove_head(t, NEW_OUTGOING_WINDOW))) { - int was_window_empty = s->global.outgoing_window <= 0; - FLOWCTL_TRACE(t, s, outgoing, s->global.id, s->global.outgoing_window_update); - s->global.outgoing_window += s->global.outgoing_window_update; - s->global.outgoing_window_update = 0; - /* if this window update makes outgoing ops writable again, - flag that */ - if (was_window_empty && s->global.outgoing_sopb && - s->global.outgoing_sopb->nops > 0) { - stream_list_join(t, s, WRITABLE); - } - } - t->global.outgoing_window += t->global.outgoing_window_update; - t->global.outgoing_window_update = 0; - maybe_start_some_streams(t); -#endif if (i == nslices) { grpc_endpoint_notify_on_read(t->ep, recv_data, t); } @@ -1079,6 +1082,37 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { unlock(t); } +/* + * TRACING + */ + +void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, + const char *context, const char *var, + int is_client, gpr_uint32 stream_id, + gpr_int64 current_value, gpr_int64 delta) { + char *identifier; + char *context_scope; + char *context_thread; + char *underscore_pos = strchr(context, '_'); + GPR_ASSERT(underscore_pos); + context_thread = gpr_strdup(underscore_pos + 1); + context_scope = gpr_strdup(context); + context_scope[underscore_pos - context] = 0; + if (stream_id) { + gpr_asprintf(&identifier, "%s[%d]", context_scope, stream_id); + } else { + identifier = gpr_strdup(context_scope); + } + gpr_log(GPR_DEBUG, + "FLOWCTL: %s %-10s %8s %-23s %8lld %c %8lld = %8lld %-10s [%s:%d]", + is_client ? "client" : "server", identifier, context_thread, var, + current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta, + current_value + delta, reason, file, line); + gpr_free(identifier); + gpr_free(context_thread); + gpr_free(context_scope); +} + /* * INTEGRATION GLUE */