Flow control bugfixes

pull/2149/head
Craig Tiller 10 years ago
parent d53d21428c
commit 285b882157
  1. 7
      src/core/transport/chttp2/frame_data.c
  2. 7
      src/core/transport/chttp2/frame_window_update.c
  3. 2
      src/core/transport/chttp2/hpack_parser.c
  4. 17
      src/core/transport/chttp2/incoming_metadata.c
  5. 5
      src/core/transport/chttp2/incoming_metadata.h
  6. 28
      src/core/transport/chttp2/internal.h
  7. 117
      src/core/transport/chttp2/parsing.c
  8. 12
      src/core/transport/chttp2/stream_lists.c
  9. 22
      src/core/transport/chttp2/writing.c
  10. 126
      src/core/transport/chttp2_transport.c

@ -133,8 +133,13 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) == p->frame_size) {
}
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
if ((gpr_uint32)(end - cur) == p->frame_size) {
grpc_sopb_add_slice(&p->incoming_sopb,
gpr_slice_sub(slice, cur - beg, end - beg));
p->state = GRPC_CHTTP2_DATA_FH_0;

@ -96,9 +96,16 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
if (transport_parsing->incoming_stream_id) {
if (stream_parsing) {
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing,
stream_parsing, outgoing_window_update,
p->amount);
stream_parsing->outgoing_window_update += p->amount;
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
}
} else {
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("update", transport_parsing,
outgoing_window_update, p->amount);
transport_parsing->outgoing_window_update += p->amount;
}
}

@ -1395,6 +1395,8 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
&stream_parsing->incoming_metadata,
&stream_parsing->data_parser.incoming_sopb);
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
}
if (parser->is_eof) {
stream_parsing->received_close = 1;

@ -71,7 +71,8 @@ void grpc_chttp2_incoming_metadata_live_op_buffer_end(
buffer->elems = NULL;
}
void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb) {
void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb) {
grpc_metadata_batch b;
b.list.head = NULL;
@ -79,7 +80,7 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_
we can reconstitute the list.
We can't do list building here as later incoming metadata may reallocate
the underlying array. */
b.list.tail = (void*)(gpr_intptr)buffer->count;
b.list.tail = (void *)(gpr_intptr)buffer->count;
b.garbage.head = b.garbage.tail = NULL;
b.deadline = buffer->deadline;
buffer->deadline = gpr_inf_future;
@ -87,14 +88,15 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_
grpc_sopb_add_metadata(sopb, b);
}
void grpc_chttp2_incoming_metadata_buffer_swap(grpc_chttp2_incoming_metadata_buffer *a, grpc_chttp2_incoming_metadata_buffer *b) {
void grpc_chttp2_incoming_metadata_buffer_swap(
grpc_chttp2_incoming_metadata_buffer *a,
grpc_chttp2_incoming_metadata_buffer *b) {
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, *a, *b);
}
void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
grpc_chttp2_incoming_metadata_buffer *src,
grpc_chttp2_incoming_metadata_buffer *dst,
grpc_stream_op_buffer *sopb) {
grpc_chttp2_incoming_metadata_buffer *src,
grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb) {
size_t delta;
size_t i;
if (gpr_time_cmp(dst->deadline, gpr_inf_future) == 0) {
@ -119,7 +121,8 @@ void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
dst->count += src->count;
for (i = 0; i < sopb->nops; i++) {
if (sopb->ops[i].type != GRPC_OP_METADATA) continue;
sopb->ops[i].data.metadata.list.tail = (void*)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail);
sopb->ops[i].data.metadata.list.tail =
(void *)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail);
}
}

@ -67,9 +67,8 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb);
void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
grpc_chttp2_incoming_metadata_buffer *src,
grpc_chttp2_incoming_metadata_buffer *dst,
grpc_stream_op_buffer *sopb);
grpc_chttp2_incoming_metadata_buffer *src,
grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb);
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb,

@ -218,6 +218,8 @@ typedef struct {
gpr_slice_buffer outbuf;
/** hpack encoding */
grpc_chttp2_hpack_compressor hpack_compressor;
/** is this a client? */
gpr_uint8 is_client;
} grpc_chttp2_transport_writing;
struct grpc_chttp2_transport_parsing {
@ -252,6 +254,7 @@ struct grpc_chttp2_transport_parsing {
/** window available for peer to send to us */
gpr_uint32 incoming_window;
gpr_uint32 incoming_window_delta;
/** next stream id available at the time of beginning parsing */
gpr_uint32 next_stream_id;
@ -601,13 +604,15 @@ void grpc_chttp2_for_all_streams(
void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data,
grpc_chttp2_stream_global *stream_global));
void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport_parsing *transport_parsing);
void grpc_chttp2_parsing_become_skip_parser(
grpc_chttp2_transport_parsing *transport_parsing);
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
extern int grpc_http_trace;
extern int grpc_flowctl_trace;
#define IF_TRACING(stmt) \
if (!(grpc_http_trace)) \
@ -615,4 +620,25 @@ extern int grpc_http_trace;
else \
stmt
#define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \
delta) \
if (!(grpc_flowctl_trace)) { \
} else { \
grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \
transport->is_client, context->id, context->var, \
delta); \
}
#define GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(reason, context, var, delta) \
if (!(grpc_flowctl_trace)) { \
} else { \
grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \
context->is_client, 0, context->var, delta); \
}
void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
const char *context, const char *var,
int is_client, gpr_uint32 stream_id,
gpr_int64 current_value, gpr_int64 delta);
#endif

@ -60,16 +60,30 @@ static int init_skip_frame_parser(
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing,
gpr_slice slice, int is_last);
void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_parsing *transport_parsing) {
void grpc_chttp2_prepare_to_read(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_parsing *transport_parsing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_parsing *stream_parsing;
/* update the parsing view of incoming window */
transport_parsing->incoming_window = transport_global->incoming_window;
if (transport_parsing->incoming_window != transport_global->incoming_window) {
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
"parse", transport_parsing, incoming_window,
(gpr_int64)transport_global->incoming_window -
(gpr_int64)transport_parsing->incoming_window);
transport_parsing->incoming_window = transport_global->incoming_window;
}
while (grpc_chttp2_list_pop_incoming_window_updated(
transport_global, transport_parsing, &stream_global, &stream_parsing)) {
stream_parsing->incoming_window = transport_parsing->incoming_window;
stream_parsing->id = stream_global->id;
if (stream_parsing->incoming_window != stream_global->incoming_window) {
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
"parse", transport_parsing, stream_parsing, incoming_window,
(gpr_int64)stream_global->incoming_window -
(gpr_int64)stream_parsing->incoming_window);
stream_parsing->incoming_window = stream_global->incoming_window;
}
}
}
@ -95,33 +109,6 @@ void grpc_chttp2_publish_reads(
/* TODO(ctiller): re-implement */
GPR_ASSERT(transport_parsing->initial_window_update == 0);
#if 0
while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
int publish = 0;
GPR_ASSERT(s->incoming_sopb);
*s->publish_state =
compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
if (*s->publish_state != s->published_state) {
s->published_state = *s->publish_state;
publish = 1;
if (s->published_state == GRPC_STREAM_CLOSED) {
remove_from_stream_map(t, s);
}
}
if (s->parser.incoming_sopb.nops > 0) {
grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
publish = 1;
}
if (publish) {
if (s->incoming_metadata_count > 0) {
patch_metadata_ops(s);
}
s->incoming_sopb = NULL;
schedule_cb(t, s->global.recv_done_closure, 1);
}
}
#endif
/* copy parsing qbuf to global qbuf */
gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf);
@ -149,11 +136,42 @@ void grpc_chttp2_publish_reads(
transport_parsing->goaway_received = 0;
}
/* propagate flow control tokens to global state */
if (transport_parsing->outgoing_window_update) {
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
"parsed", transport_global, outgoing_window,
transport_parsing->outgoing_window_update);
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
"parsed", transport_parsing, outgoing_window_update,
-(gpr_int64)transport_parsing->outgoing_window_update);
transport_global->outgoing_window +=
transport_parsing->outgoing_window_update;
transport_parsing->outgoing_window_update = 0;
}
if (transport_parsing->incoming_window_delta) {
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
"parsed", transport_global, incoming_window,
-(gpr_int64)transport_parsing->incoming_window_delta);
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
"parsed", transport_parsing, incoming_window_delta,
-(gpr_int64)transport_parsing->incoming_window_delta);
transport_global->incoming_window -=
transport_parsing->incoming_window_delta;
transport_parsing->incoming_window_delta = 0;
}
/* for each stream that saw an update, fixup global state */
while (grpc_chttp2_list_pop_parsing_seen_stream(
transport_global, transport_parsing, &stream_global, &stream_parsing)) {
/* update incoming flow control window */
if (stream_parsing->incoming_window_delta) {
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
"parsed", transport_parsing, stream_global, incoming_window,
-(gpr_int64)stream_parsing->incoming_window_delta);
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
"parsed", transport_parsing, stream_parsing, incoming_window_delta,
-(gpr_int64)stream_parsing->incoming_window_delta);
stream_global->incoming_window -= stream_parsing->incoming_window_delta;
stream_parsing->incoming_window_delta = 0;
grpc_chttp2_list_add_writable_window_update_stream(transport_global,
@ -164,9 +182,16 @@ void grpc_chttp2_publish_reads(
if (stream_parsing->outgoing_window_update) {
int was_zero = stream_global->outgoing_window <= 0;
int is_zero;
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("parsed", transport_parsing,
stream_global, outgoing_window,
stream_parsing->outgoing_window_update);
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
"parsed", transport_parsing, stream_parsing, outgoing_window_update,
-(gpr_int64)stream_parsing->outgoing_window_update);
stream_global->outgoing_window += stream_parsing->outgoing_window_update;
stream_parsing->outgoing_window_update = 0;
is_zero = stream_global->outgoing_window <= 0;
gpr_log(GPR_DEBUG, "was=%d is=%d", was_zero, is_zero);
if (was_zero && !is_zero) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
@ -186,8 +211,11 @@ void grpc_chttp2_publish_reads(
/* publish incoming stream ops */
if (stream_parsing->data_parser.incoming_sopb.nops > 0) {
grpc_incoming_metadata_buffer_move_to_referencing_sopb(&stream_parsing->incoming_metadata, &stream_global->incoming_metadata, &stream_parsing->data_parser.incoming_sopb);
grpc_sopb_move_to(&stream_parsing->data_parser.incoming_sopb, &stream_global->incoming_sopb);
grpc_incoming_metadata_buffer_move_to_referencing_sopb(
&stream_parsing->incoming_metadata, &stream_global->incoming_metadata,
&stream_parsing->data_parser.incoming_sopb);
grpc_sopb_move_to(&stream_parsing->data_parser.incoming_sopb,
&stream_global->incoming_sopb);
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
@ -476,7 +504,22 @@ static grpc_chttp2_parse_error update_incoming_window(
return GRPC_CHTTP2_CONNECTION_ERROR;
}
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
"data", transport_parsing, incoming_window,
-(gpr_int64)transport_parsing->incoming_frame_size);
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("data", transport_parsing,
incoming_window_delta,
transport_parsing->incoming_frame_size);
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
"data", transport_parsing, stream_parsing, incoming_window,
-(gpr_int64)transport_parsing->incoming_frame_size);
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("data", transport_parsing, stream_parsing,
incoming_window_delta,
transport_parsing->incoming_frame_size);
transport_parsing->incoming_window -= transport_parsing->incoming_frame_size;
transport_parsing->incoming_window_delta +=
transport_parsing->incoming_frame_size;
stream_parsing->incoming_window -= transport_parsing->incoming_frame_size;
stream_parsing->incoming_window_delta +=
transport_parsing->incoming_frame_size;
@ -649,6 +692,10 @@ static int init_window_update_frame_parser(
&transport_parsing->simple.window_update,
transport_parsing->incoming_frame_size,
transport_parsing->incoming_frame_flags);
if (transport_parsing->incoming_stream_id) {
transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream(
transport_parsing, transport_parsing->incoming_stream_id);
}
transport_parsing->parser = grpc_chttp2_window_update_parser_parse;
transport_parsing->parser_data = &transport_parsing->simple.window_update;
return ok;
@ -670,8 +717,8 @@ static int init_rst_stream_parser(
&transport_parsing->simple.rst_stream,
transport_parsing->incoming_frame_size,
transport_parsing->incoming_frame_flags);
transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream(transport_parsing,
transport_parsing->incoming_stream_id);
transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream(
transport_parsing, transport_parsing->incoming_stream_id);
if (!transport_parsing->incoming_stream) {
return init_skip_frame_parser(transport_parsing, 0);
}

@ -100,8 +100,9 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
}
}
static void stream_list_maybe_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
static void stream_list_maybe_remove(grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
stream_list_remove(t, s, id);
}
@ -299,7 +300,9 @@ int grpc_chttp2_list_pop_incoming_window_updated(
void grpc_chttp2_list_remove_incoming_window_updated(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED);
stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED);
}
void grpc_chttp2_list_add_read_write_state_changed(
@ -314,7 +317,8 @@ int grpc_chttp2_list_pop_read_write_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
*stream_global = &stream->global;
return r;
}

@ -77,9 +77,18 @@ int grpc_chttp2_unlocking_check_writes(
GPR_MIN(transport_global->outgoing_window,
stream_global->outgoing_window),
&stream_writing->sopb);
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
"write", transport_global, outgoing_window, -(gpr_int64)window_delta);
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
outgoing_window, -(gpr_int64)window_delta);
transport_global->outgoing_window -= window_delta;
stream_global->outgoing_window -= window_delta;
gpr_log(GPR_DEBUG, "%s ws:%d nops:%d rc:%d",
transport_global->is_client ? "CLI" : "SVR",
stream_global->write_state, stream_global->outgoing_sopb->nops,
stream_global->read_closed);
if (stream_global->write_state == WRITE_STATE_QUEUED_CLOSE &&
stream_global->outgoing_sopb->nops == 0) {
if (!transport_global->is_client && !stream_global->read_closed) {
@ -115,8 +124,11 @@ int grpc_chttp2_unlocking_check_writes(
gpr_slice_buffer_add(
&transport_writing->outbuf,
grpc_chttp2_window_update_create(stream_global->id, window_delta));
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
incoming_window, window_delta);
stream_global->incoming_window += window_delta;
grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global);
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
}
}
@ -128,6 +140,8 @@ int grpc_chttp2_unlocking_check_writes(
transport_global->incoming_window;
gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_window_update_create(0, window_delta));
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("write", transport_global,
incoming_window, window_delta);
transport_global->incoming_window += window_delta;
}
@ -137,7 +151,8 @@ int grpc_chttp2_unlocking_check_writes(
void grpc_chttp2_perform_writes(
grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint) {
GPR_ASSERT(transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams(transport_writing));
GPR_ASSERT(transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing));
finalize_outbuf(transport_writing);
@ -167,6 +182,9 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
gpr_log(GPR_DEBUG, "%s write %d: sc=%d",
transport_writing->is_client ? "CLI" : "SVR", stream_writing->id,
stream_writing->send_closed);
grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
stream_writing->send_closed != DONT_SEND_CLOSED,
stream_writing->id, &transport_writing->hpack_compressor,

@ -47,6 +47,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
/* #define REFCOUNTING_DEBUG */
@ -166,15 +167,19 @@ static void destruct_transport(grpc_chttp2_transport *t) {
#ifdef REFCOUNTING_DEBUG
#define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__)
#define UNREF_TRANSPORT(t, r) unref_transport(t, r, __FILE__, __LINE__)
static void unref_transport(grpc_chttp2_transport *t, const char *reason, const char *file, int line) {
gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count, t->refs.count-1, reason, file, line);
static void unref_transport(grpc_chttp2_transport *t, const char *reason,
const char *file, int line) {
gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count,
t->refs.count - 1, reason, file, line);
if (!gpr_unref(&t->refs)) return;
destruct_transport(t);
}
static void ref_transport(grpc_chttp2_transport *t, const char *reason, const char *file, int line) {
gpr_log(GPR_DEBUG, "chttp2: ref:%p %d->%d %s [%s:%d]", t, t->refs.count, t->refs.count+1, reason, file, line);
gpr_ref(&t->refs);
static void ref_transport(grpc_chttp2_transport *t, const char *reason,
const char *file, int line) {
gpr_log(GPR_DEBUG, "chttp2: ref:%p %d->%d %s [%s:%d]", t, t->refs.count,
t->refs.count + 1, reason, file, line);
gpr_ref(&t->refs);
}
#else
#define REF_TRANSPORT(t, r) ref_transport(t)
@ -221,6 +226,7 @@ static void init_transport(grpc_chttp2_transport *t,
t->parsing.str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
t->parsing.deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
gpr_slice_buffer_init(&t->global.qbuf);
@ -378,12 +384,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->global.outgoing_window =
t->global
.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->global.incoming_window =
s->parsing.incoming_window = s->global.incoming_window =
t->global
.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_list_add_incoming_window_updated(&t->global, &s->global);
grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s);
s->global.in_stream_map = 1;
}
@ -404,7 +409,8 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
GPR_ASSERT(!s->global.in_stream_map);
grpc_chttp2_unregister_stream(t, s);
if (!t->parsing_active && s->global.id) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, s->global.id) == NULL);
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
s->global.id) == NULL);
}
gpr_mu_unlock(&t->mu);
@ -525,7 +531,8 @@ void grpc_chttp2_terminate_writing(
if (t->ep && !t->endpoint_reading) {
grpc_endpoint_destroy(t->ep);
t->ep = NULL;
UNREF_TRANSPORT(t, "disconnect"); /* safe because we'll still have the ref for write */
UNREF_TRANSPORT(
t, "disconnect"); /* safe because we'll still have the ref for write */
}
unlock(t);
@ -586,7 +593,8 @@ static void maybe_start_some_streams(
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
stream_global->in_stream_map = 1;
transport_global->concurrent_stream_count++;
grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global);
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
/* cancel out streams that will never be started */
@ -699,7 +707,8 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
}
static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id);
grpc_chttp2_stream *s =
grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id);
GPR_ASSERT(s);
s->global.in_stream_map = 0;
if (t->parsing.incoming_stream == &s->parsing) {
@ -729,25 +738,44 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
}
}
while (grpc_chttp2_list_pop_read_write_state_changed(transport_global, &stream_global)) {
while (grpc_chttp2_list_pop_read_write_state_changed(transport_global,
&stream_global)) {
if (!stream_global->publish_sopb) {
gpr_log(GPR_DEBUG, "%s %d: skip rw update: no publish target",
transport_global->is_client ? "CLI" : "SVR", stream_global->id);
continue;
}
if (stream_global->write_state != WRITE_STATE_OPEN && stream_global->read_closed && stream_global->in_stream_map) {
if (stream_global->write_state == WRITE_STATE_SENT_CLOSE &&
stream_global->read_closed && stream_global->in_stream_map) {
if (t->parsing_active) {
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, stream_global);
gpr_log(GPR_DEBUG, "%s %d: queue wait for close",
transport_global->is_client ? "CLI" : "SVR", stream_global->id);
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
stream_global);
} else {
gpr_log(GPR_DEBUG, "%s %d: late removal from map",
transport_global->is_client ? "CLI" : "SVR", stream_global->id);
remove_stream(t, stream_global->id);
}
}
state = compute_state(stream_global->write_state == WRITE_STATE_SENT_CLOSE, stream_global->read_closed && !stream_global->in_stream_map);
if (stream_global->incoming_sopb.nops == 0 && state == stream_global->published_state) {
state = compute_state(
stream_global->write_state == WRITE_STATE_SENT_CLOSE,
stream_global->read_closed && !stream_global->in_stream_map);
gpr_log(GPR_DEBUG, "%s %d: state=%d->%d; nops=%d",
transport_global->is_client ? "CLI" : "SVR", stream_global->id,
stream_global->published_state, state,
stream_global->incoming_sopb.nops);
if (stream_global->incoming_sopb.nops == 0 &&
state == stream_global->published_state) {
continue;
}
grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(&stream_global->incoming_metadata, &stream_global->incoming_sopb, &stream_global->outstanding_metadata);
grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
&stream_global->incoming_metadata, &stream_global->incoming_sopb,
&stream_global->outstanding_metadata);
grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
stream_global->published_state = *stream_global->publish_state = state;
grpc_chttp2_schedule_closure(transport_global, stream_global->recv_done_closure, 1);
grpc_chttp2_schedule_closure(transport_global,
stream_global->recv_done_closure, 1);
stream_global->recv_done_closure = NULL;
stream_global->publish_sopb = NULL;
stream_global->publish_state = NULL;
@ -917,7 +945,8 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
if (!t->writing_active && t->ep) {
grpc_endpoint_destroy(t->ep);
t->ep = NULL;
UNREF_TRANSPORT(t, "disconnect"); /* safe as we still have a ref for read */
UNREF_TRANSPORT(
t, "disconnect"); /* safe as we still have a ref for read */
}
unlock(t);
UNREF_TRANSPORT(t, "recv_data");
@ -951,32 +980,6 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_chttp2_stream_map_size(&t->parsing_stream_map);
t->parsing_active = 0;
}
#if 0
while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) {
maybe_finish_read(t, s, 0);
}
while ((s = stream_list_remove_head(t, PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE))) {
maybe_join_window_updates(t, s);
}
while ((s = stream_list_remove_head(t, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE))) {
maybe_join_window_updates(t, s);
}
while ((s = stream_list_remove_head(t, NEW_OUTGOING_WINDOW))) {
int was_window_empty = s->global.outgoing_window <= 0;
FLOWCTL_TRACE(t, s, outgoing, s->global.id, s->global.outgoing_window_update);
s->global.outgoing_window += s->global.outgoing_window_update;
s->global.outgoing_window_update = 0;
/* if this window update makes outgoing ops writable again,
flag that */
if (was_window_empty && s->global.outgoing_sopb &&
s->global.outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
t->global.outgoing_window += t->global.outgoing_window_update;
t->global.outgoing_window_update = 0;
maybe_start_some_streams(t);
#endif
if (i == nslices) {
grpc_endpoint_notify_on_read(t->ep, recv_data, t);
}
@ -1079,6 +1082,37 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
unlock(t);
}
/*
* TRACING
*/
void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
const char *context, const char *var,
int is_client, gpr_uint32 stream_id,
gpr_int64 current_value, gpr_int64 delta) {
char *identifier;
char *context_scope;
char *context_thread;
char *underscore_pos = strchr(context, '_');
GPR_ASSERT(underscore_pos);
context_thread = gpr_strdup(underscore_pos + 1);
context_scope = gpr_strdup(context);
context_scope[underscore_pos - context] = 0;
if (stream_id) {
gpr_asprintf(&identifier, "%s[%d]", context_scope, stream_id);
} else {
identifier = gpr_strdup(context_scope);
}
gpr_log(GPR_DEBUG,
"FLOWCTL: %s %-10s %8s %-23s %8lld %c %8lld = %8lld %-10s [%s:%d]",
is_client ? "client" : "server", identifier, context_thread, var,
current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta,
current_value + delta, reason, file, line);
gpr_free(identifier);
gpr_free(context_thread);
gpr_free(context_scope);
}
/*
* INTEGRATION GLUE
*/

Loading…
Cancel
Save