clang-format, and process on lock splitting

pull/2149/head
Craig Tiller 10 years ago
parent 606d874d16
commit 4aa71a1774
  1. 76
      src/core/transport/chttp2/bin_encoder.c
  2. 4
      src/core/transport/chttp2/frame_data.c
  3. 3
      src/core/transport/chttp2/frame_data.h
  4. 4
      src/core/transport/chttp2/frame_goaway.c
  5. 3
      src/core/transport/chttp2/frame_goaway.h
  6. 11
      src/core/transport/chttp2/frame_ping.c
  7. 3
      src/core/transport/chttp2/frame_ping.h
  8. 7
      src/core/transport/chttp2/frame_rst_stream.c
  9. 3
      src/core/transport/chttp2/frame_rst_stream.h
  10. 6
      src/core/transport/chttp2/frame_settings.c
  11. 3
      src/core/transport/chttp2/frame_settings.h
  12. 4
      src/core/transport/chttp2/frame_window_update.c
  13. 3
      src/core/transport/chttp2/frame_window_update.h
  14. 33
      src/core/transport/chttp2/gen_hpack_tables.c
  15. 32
      src/core/transport/chttp2/hpack_parser.c
  16. 4
      src/core/transport/chttp2/hpack_parser.h
  17. 3
      src/core/transport/chttp2/hpack_table.c
  18. 323
      src/core/transport/chttp2/huffsyms.c
  19. 142
      src/core/transport/chttp2/internal.h
  20. 310
      src/core/transport/chttp2/parsing.c
  21. 3
      src/core/transport/chttp2/stream_map.h
  22. 3
      src/core/transport/chttp2/varint.h
  23. 86
      src/core/transport/chttp2/writing.c
  24. 258
      src/core/transport/chttp2_transport.c
  25. 5
      src/core/transport/transport.c
  26. 7
      src/core/transport/transport.h
  27. 3
      src/core/transport/transport_impl.h

@ -46,70 +46,18 @@ typedef struct {
gpr_uint8 length;
} b64_huff_sym;
static const b64_huff_sym huff_alphabet[64] = {{0x21, 6},
{0x5d, 7},
{0x5e, 7},
{0x5f, 7},
{0x60, 7},
{0x61, 7},
{0x62, 7},
{0x63, 7},
{0x64, 7},
{0x65, 7},
{0x66, 7},
{0x67, 7},
{0x68, 7},
{0x69, 7},
{0x6a, 7},
{0x6b, 7},
{0x6c, 7},
{0x6d, 7},
{0x6e, 7},
{0x6f, 7},
{0x70, 7},
{0x71, 7},
{0x72, 7},
{0xfc, 8},
{0x73, 7},
{0xfd, 8},
{0x3, 5},
{0x23, 6},
{0x4, 5},
{0x24, 6},
{0x5, 5},
{0x25, 6},
{0x26, 6},
{0x27, 6},
{0x6, 5},
{0x74, 7},
{0x75, 7},
{0x28, 6},
{0x29, 6},
{0x2a, 6},
{0x7, 5},
{0x2b, 6},
{0x76, 7},
{0x2c, 6},
{0x8, 5},
{0x9, 5},
{0x2d, 6},
{0x77, 7},
{0x78, 7},
{0x79, 7},
{0x7a, 7},
{0x7b, 7},
{0x0, 5},
{0x1, 5},
{0x2, 5},
{0x19, 6},
{0x1a, 6},
{0x1b, 6},
{0x1c, 6},
{0x1d, 6},
{0x1e, 6},
{0x1f, 6},
{0x7fb, 11},
{0x18, 6}};
static const b64_huff_sym huff_alphabet[64] = {
{0x21, 6}, {0x5d, 7}, {0x5e, 7}, {0x5f, 7}, {0x60, 7}, {0x61, 7},
{0x62, 7}, {0x63, 7}, {0x64, 7}, {0x65, 7}, {0x66, 7}, {0x67, 7},
{0x68, 7}, {0x69, 7}, {0x6a, 7}, {0x6b, 7}, {0x6c, 7}, {0x6d, 7},
{0x6e, 7}, {0x6f, 7}, {0x70, 7}, {0x71, 7}, {0x72, 7}, {0xfc, 8},
{0x73, 7}, {0xfd, 8}, {0x3, 5}, {0x23, 6}, {0x4, 5}, {0x24, 6},
{0x5, 5}, {0x25, 6}, {0x26, 6}, {0x27, 6}, {0x6, 5}, {0x74, 7},
{0x75, 7}, {0x28, 6}, {0x29, 6}, {0x2a, 6}, {0x7, 5}, {0x2b, 6},
{0x76, 7}, {0x2c, 6}, {0x8, 5}, {0x9, 5}, {0x2d, 6}, {0x77, 7},
{0x78, 7}, {0x79, 7}, {0x7a, 7}, {0x7b, 7}, {0x0, 5}, {0x1, 5},
{0x2, 5}, {0x19, 6}, {0x1a, 6}, {0x1b, 6}, {0x1c, 6}, {0x1d, 6},
{0x1e, 6}, {0x1f, 6}, {0x7fb, 11}, {0x18, 6}};
static const gpr_uint8 tail_xtra[3] = {0, 2, 3};

@ -70,8 +70,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;

@ -72,7 +72,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
/* handle a slice of a data frame - is_last indicates the last slice of a
frame */
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
/* create a slice with an empty data frame and is_last set */
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id);

@ -63,8 +63,8 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;

@ -65,7 +65,8 @@ void grpc_chttp2_goaway_parser_destroy(grpc_chttp2_goaway_parser *p);
grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame(
grpc_chttp2_goaway_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
void grpc_chttp2_goaway_append(gpr_uint32 last_stream_id, gpr_uint32 error_code,
gpr_slice debug_data,

@ -69,15 +69,14 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
grpc_chttp2_ping_parser *p = parser;
grpc_chttp2_outstanding_ping *ping;
while (p->byte != 8 && cur != end) {
p->opaque_8bytes[p->byte] = *cur;
cur++;
@ -87,7 +86,8 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
if (p->byte == 8) {
GPR_ASSERT(is_last);
if (p->is_ack) {
for (ping = transport_parsing->pings.next; ping != &transport_parsing->pings; ping = ping->next) {
for (ping = transport_parsing->pings.next;
ping != &transport_parsing->pings; ping = ping->next) {
if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) {
grpc_iomgr_add_delayed_callback(ping->on_recv, 1);
}
@ -96,8 +96,7 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
gpr_free(ping);
}
} else {
gpr_slice_buffer_add(
&transport_parsing->qbuf,
gpr_slice_buffer_add(&transport_parsing->qbuf,
grpc_chttp2_ping_create(1, p->opaque_8bytes));
}
}

@ -48,6 +48,7 @@ gpr_slice grpc_chttp2_ping_create(gpr_uint8 ack, gpr_uint8 *opaque_8bytes);
grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame(
grpc_chttp2_ping_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_PING_H */

@ -62,7 +62,8 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) {
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags) {
if (length != 4) {
gpr_log(GPR_ERROR, "invalid rst_stream: length=%d, flags=%02x", length, flags);
gpr_log(GPR_ERROR, "invalid rst_stream: length=%d, flags=%02x", length,
flags);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
parser->byte = 0;
@ -70,8 +71,8 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;

@ -47,6 +47,7 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code);
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */

@ -138,7 +138,8 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
void *p, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
void *p, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_settings_parser *parser = p;
const gpr_uint8 *cur = GPR_SLICE_START_PTR(slice);
const gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
@ -155,7 +156,8 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
if (is_last) {
memcpy(parser->target_settings, parser->incoming_settings,
GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
gpr_slice_buffer_add(&transport_parsing->qbuf, grpc_chttp2_settings_ack_create());
gpr_slice_buffer_add(&transport_parsing->qbuf,
grpc_chttp2_settings_ack_create());
}
return GRPC_CHTTP2_PARSE_OK;
}

@ -94,6 +94,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame(
grpc_chttp2_settings_parser *parser, gpr_uint32 length, gpr_uint8 flags,
gpr_uint32 *settings);
grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_SETTINGS_H */

@ -74,8 +74,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;

@ -50,6 +50,7 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame(
grpc_chttp2_window_update_parser *parser, gpr_uint32 length,
gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H */

@ -55,19 +55,15 @@ typedef struct {
unsigned char index;
} spec;
static const spec fields[] = {{"INDEXED_FIELD", 0X80, 1, 1},
{"INDEXED_FIELD_X", 0X80, 1, 2},
{"LITHDR_INCIDX", 0X40, 2, 1},
{"LITHDR_INCIDX_X", 0X40, 2, 2},
{"LITHDR_INCIDX_V", 0X40, 2, 0},
{"LITHDR_NOTIDX", 0X00, 4, 1},
{"LITHDR_NOTIDX_X", 0X00, 4, 2},
{"LITHDR_NOTIDX_V", 0X00, 4, 0},
{"LITHDR_NVRIDX", 0X10, 4, 1},
{"LITHDR_NVRIDX_X", 0X10, 4, 2},
{"LITHDR_NVRIDX_V", 0X10, 4, 0},
{"MAX_TBL_SIZE", 0X20, 3, 1},
{"MAX_TBL_SIZE_X", 0X20, 3, 2}, };
static const spec fields[] = {
{"INDEXED_FIELD", 0X80, 1, 1}, {"INDEXED_FIELD_X", 0X80, 1, 2},
{"LITHDR_INCIDX", 0X40, 2, 1}, {"LITHDR_INCIDX_X", 0X40, 2, 2},
{"LITHDR_INCIDX_V", 0X40, 2, 0}, {"LITHDR_NOTIDX", 0X00, 4, 1},
{"LITHDR_NOTIDX_X", 0X00, 4, 2}, {"LITHDR_NOTIDX_V", 0X00, 4, 0},
{"LITHDR_NVRIDX", 0X10, 4, 1}, {"LITHDR_NVRIDX_X", 0X10, 4, 2},
{"LITHDR_NVRIDX_V", 0X10, 4, 0}, {"MAX_TBL_SIZE", 0X20, 3, 1},
{"MAX_TBL_SIZE_X", 0X20, 3, 2},
};
static const int num_fields = sizeof(fields) / sizeof(*fields);
@ -129,13 +125,9 @@ static void generate_first_byte_lut(void) {
#define MAXHUFFSTATES 1024
/* represents a set of symbols as an array of booleans indicating inclusion */
typedef struct {
char included[GRPC_CHTTP2_NUM_HUFFSYMS];
} symset;
typedef struct { char included[GRPC_CHTTP2_NUM_HUFFSYMS]; } symset;
/* represents a lookup table indexed by a nibble */
typedef struct {
int values[16];
} nibblelut;
typedef struct { int values[16]; } nibblelut;
/* returns a symset that includes all possible symbols */
static symset symset_all(void) {
@ -268,8 +260,7 @@ static void build_dec_tbl(int state, int nibble, int nibbits, unsigned bitofs,
/* recurse down for this bit */
build_dec_tbl(state, (nibble << 1) | bit, nibbits + 1, bitofs + 1, emit,
nextsyms);
next:
;
next:;
}
}

@ -150,10 +150,12 @@ typedef enum {
/* jump table of parse state functions -- order must match first_byte_type
above */
static const grpc_chttp2_hpack_parser_state first_byte_action[] = {
parse_indexed_field, parse_indexed_field_x, parse_lithdr_incidx,
parse_lithdr_incidx_x, parse_lithdr_incidx_v, parse_lithdr_notidx,
parse_lithdr_notidx_x, parse_lithdr_notidx_v, parse_lithdr_nvridx,
parse_lithdr_nvridx_x, parse_lithdr_nvridx_v, parse_max_tbl_size,
parse_indexed_field, parse_indexed_field_x,
parse_lithdr_incidx, parse_lithdr_incidx_x,
parse_lithdr_incidx_v, parse_lithdr_notidx,
parse_lithdr_notidx_x, parse_lithdr_notidx_v,
parse_lithdr_nvridx, parse_lithdr_nvridx_x,
parse_lithdr_nvridx_v, parse_max_tbl_size,
parse_max_tbl_size_x, parse_error};
/* indexes the first byte to a parse state function - generated by
@ -222,7 +224,8 @@ static const gpr_uint8 first_byte_lut[256] = {
INDEXED_FIELD, INDEXED_FIELD, INDEXED_FIELD, INDEXED_FIELD,
INDEXED_FIELD, INDEXED_FIELD, INDEXED_FIELD, INDEXED_FIELD,
INDEXED_FIELD, INDEXED_FIELD, INDEXED_FIELD, INDEXED_FIELD,
INDEXED_FIELD, INDEXED_FIELD, INDEXED_FIELD, INDEXED_FIELD_X, };
INDEXED_FIELD, INDEXED_FIELD, INDEXED_FIELD, INDEXED_FIELD_X,
};
/* state table for huffman decoding: given a state, gives an index/16 into
next_sub_tbl. Taking that index and adding the value of the nibble being
@ -242,7 +245,8 @@ static const gpr_uint8 next_tbl[256] = {
38, 1, 1, 1, 1, 1, 1, 1, 15, 2, 2, 2, 2, 26, 3, 3, 39, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 7, 3, 3, 3, 40, 2,
41, 1, 1, 1, 42, 43, 1, 1, 44, 1, 1, 1, 1, 15, 2, 2, 2, 2, 2, 2,
3, 3, 3, 45, 46, 1, 1, 2, 2, 2, 35, 3, 3, 18, 47, 2, };
3, 3, 3, 45, 46, 1, 1, 2, 2, 2, 35, 3, 3, 18, 47, 2,
};
/* next state, based upon current state and the current nibble: see above.
generated by gen_hpack_tables.c */
static const gpr_int16 next_sub_tbl[48 * 16] = {
@ -297,7 +301,8 @@ static const gpr_int16 next_sub_tbl[48 * 16] = {
4, 8, 4, 8, 4, 8, 4, 8, 4, 8, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 245, 246, 247, 248, 249, 250, 251, 252,
253, 254, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 255, };
0, 0, 255,
};
/* emission table: indexed like next_tbl, ultimately gives the byte to be
emitted, or -1 for no byte, or 256 for end of stream
@ -320,7 +325,8 @@ static const gpr_uint16 emit_tbl[256] = {
204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218,
219, 220, 221, 0, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232,
233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247,
248, };
248,
};
/* generated by gen_hpack_tables.c */
static const gpr_int16 emit_sub_tbl[249 * 16] = {
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
@ -588,7 +594,8 @@ static const gpr_int16 emit_sub_tbl[249 * 16] = {
251, 251, 252, 252, 253, 253, 254, 254, 2, 3, 4, 5, 6, 7, 8,
11, 12, 14, 15, 16, 17, 18, 19, 20, 21, 23, 24, 25, 26, 27,
28, 29, 30, 31, 127, 220, 249, -1, 10, 10, 10, 10, 13, 13, 13,
13, 22, 22, 22, 22, 256, 256, 256, 256, };
13, 22, 22, 22, 22, 256, 256, 256, 256,
};
static const gpr_uint8 inverse_base64[256] = {
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
@ -608,7 +615,8 @@ static const gpr_uint8 inverse_base64[256] = {
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, };
255,
};
/* emission helpers */
static void on_hdr(grpc_chttp2_hpack_parser *p, grpc_mdelem *md,
@ -1370,8 +1378,8 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
}
grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_hpack_parser *parser = hpack_parser;
if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice),
GPR_SLICE_END_PTR(slice))) {

@ -107,7 +107,7 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
/* wraps grpc_chttp2_hpack_parser_parse to provide a frame level parser for
the transport */
grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last);
void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_PARSER_H */

@ -104,7 +104,8 @@ static struct {
/* 58: */ {"user-agent", ""},
/* 59: */ {"vary", ""},
/* 60: */ {"via", ""},
/* 61: */ {"www-authenticate", ""}, };
/* 61: */ {"www-authenticate", ""},
};
void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl, grpc_mdctx *mdctx) {
size_t i;

@ -37,260 +37,69 @@
command:
:%s/.* \([0-9a-f]\+\) \[ *\([0-9]\+\)\]/{0x\1, \2},/g */
const grpc_chttp2_huffsym grpc_chttp2_huffsyms[GRPC_CHTTP2_NUM_HUFFSYMS] = {
{0x1ff8, 13},
{0x7fffd8, 23},
{0xfffffe2, 28},
{0xfffffe3, 28},
{0xfffffe4, 28},
{0xfffffe5, 28},
{0xfffffe6, 28},
{0xfffffe7, 28},
{0xfffffe8, 28},
{0xffffea, 24},
{0x3ffffffc, 30},
{0xfffffe9, 28},
{0xfffffea, 28},
{0x3ffffffd, 30},
{0xfffffeb, 28},
{0xfffffec, 28},
{0xfffffed, 28},
{0xfffffee, 28},
{0xfffffef, 28},
{0xffffff0, 28},
{0xffffff1, 28},
{0xffffff2, 28},
{0x3ffffffe, 30},
{0xffffff3, 28},
{0xffffff4, 28},
{0xffffff5, 28},
{0xffffff6, 28},
{0xffffff7, 28},
{0xffffff8, 28},
{0xffffff9, 28},
{0xffffffa, 28},
{0xffffffb, 28},
{0x14, 6},
{0x3f8, 10},
{0x3f9, 10},
{0xffa, 12},
{0x1ff9, 13},
{0x15, 6},
{0xf8, 8},
{0x7fa, 11},
{0x3fa, 10},
{0x3fb, 10},
{0xf9, 8},
{0x7fb, 11},
{0xfa, 8},
{0x16, 6},
{0x17, 6},
{0x18, 6},
{0x0, 5},
{0x1, 5},
{0x2, 5},
{0x19, 6},
{0x1a, 6},
{0x1b, 6},
{0x1c, 6},
{0x1d, 6},
{0x1e, 6},
{0x1f, 6},
{0x5c, 7},
{0xfb, 8},
{0x7ffc, 15},
{0x20, 6},
{0xffb, 12},
{0x3fc, 10},
{0x1ffa, 13},
{0x21, 6},
{0x5d, 7},
{0x5e, 7},
{0x5f, 7},
{0x60, 7},
{0x61, 7},
{0x62, 7},
{0x63, 7},
{0x64, 7},
{0x65, 7},
{0x66, 7},
{0x67, 7},
{0x68, 7},
{0x69, 7},
{0x6a, 7},
{0x6b, 7},
{0x6c, 7},
{0x6d, 7},
{0x6e, 7},
{0x6f, 7},
{0x70, 7},
{0x71, 7},
{0x72, 7},
{0xfc, 8},
{0x73, 7},
{0xfd, 8},
{0x1ffb, 13},
{0x7fff0, 19},
{0x1ffc, 13},
{0x3ffc, 14},
{0x22, 6},
{0x7ffd, 15},
{0x3, 5},
{0x23, 6},
{0x4, 5},
{0x24, 6},
{0x5, 5},
{0x25, 6},
{0x26, 6},
{0x27, 6},
{0x6, 5},
{0x74, 7},
{0x75, 7},
{0x28, 6},
{0x29, 6},
{0x2a, 6},
{0x7, 5},
{0x2b, 6},
{0x76, 7},
{0x2c, 6},
{0x8, 5},
{0x9, 5},
{0x2d, 6},
{0x77, 7},
{0x78, 7},
{0x79, 7},
{0x7a, 7},
{0x7b, 7},
{0x7ffe, 15},
{0x7fc, 11},
{0x3ffd, 14},
{0x1ffd, 13},
{0xffffffc, 28},
{0xfffe6, 20},
{0x3fffd2, 22},
{0xfffe7, 20},
{0xfffe8, 20},
{0x3fffd3, 22},
{0x3fffd4, 22},
{0x3fffd5, 22},
{0x7fffd9, 23},
{0x3fffd6, 22},
{0x7fffda, 23},
{0x7fffdb, 23},
{0x7fffdc, 23},
{0x7fffdd, 23},
{0x7fffde, 23},
{0xffffeb, 24},
{0x7fffdf, 23},
{0xffffec, 24},
{0xffffed, 24},
{0x3fffd7, 22},
{0x7fffe0, 23},
{0xffffee, 24},
{0x7fffe1, 23},
{0x7fffe2, 23},
{0x7fffe3, 23},
{0x7fffe4, 23},
{0x1fffdc, 21},
{0x3fffd8, 22},
{0x7fffe5, 23},
{0x3fffd9, 22},
{0x7fffe6, 23},
{0x7fffe7, 23},
{0xffffef, 24},
{0x3fffda, 22},
{0x1fffdd, 21},
{0xfffe9, 20},
{0x3fffdb, 22},
{0x3fffdc, 22},
{0x7fffe8, 23},
{0x7fffe9, 23},
{0x1fffde, 21},
{0x7fffea, 23},
{0x3fffdd, 22},
{0x3fffde, 22},
{0xfffff0, 24},
{0x1fffdf, 21},
{0x3fffdf, 22},
{0x7fffeb, 23},
{0x7fffec, 23},
{0x1fffe0, 21},
{0x1fffe1, 21},
{0x3fffe0, 22},
{0x1fffe2, 21},
{0x7fffed, 23},
{0x3fffe1, 22},
{0x7fffee, 23},
{0x7fffef, 23},
{0xfffea, 20},
{0x3fffe2, 22},
{0x3fffe3, 22},
{0x3fffe4, 22},
{0x7ffff0, 23},
{0x3fffe5, 22},
{0x3fffe6, 22},
{0x7ffff1, 23},
{0x3ffffe0, 26},
{0x3ffffe1, 26},
{0xfffeb, 20},
{0x7fff1, 19},
{0x3fffe7, 22},
{0x7ffff2, 23},
{0x3fffe8, 22},
{0x1ffffec, 25},
{0x3ffffe2, 26},
{0x3ffffe3, 26},
{0x3ffffe4, 26},
{0x7ffffde, 27},
{0x7ffffdf, 27},
{0x3ffffe5, 26},
{0xfffff1, 24},
{0x1ffffed, 25},
{0x7fff2, 19},
{0x1fffe3, 21},
{0x3ffffe6, 26},
{0x7ffffe0, 27},
{0x7ffffe1, 27},
{0x3ffffe7, 26},
{0x7ffffe2, 27},
{0xfffff2, 24},
{0x1fffe4, 21},
{0x1fffe5, 21},
{0x3ffffe8, 26},
{0x3ffffe9, 26},
{0xffffffd, 28},
{0x7ffffe3, 27},
{0x7ffffe4, 27},
{0x7ffffe5, 27},
{0xfffec, 20},
{0xfffff3, 24},
{0xfffed, 20},
{0x1fffe6, 21},
{0x3fffe9, 22},
{0x1fffe7, 21},
{0x1fffe8, 21},
{0x7ffff3, 23},
{0x3fffea, 22},
{0x3fffeb, 22},
{0x1ffffee, 25},
{0x1ffffef, 25},
{0xfffff4, 24},
{0xfffff5, 24},
{0x3ffffea, 26},
{0x7ffff4, 23},
{0x3ffffeb, 26},
{0x7ffffe6, 27},
{0x3ffffec, 26},
{0x3ffffed, 26},
{0x7ffffe7, 27},
{0x7ffffe8, 27},
{0x7ffffe9, 27},
{0x7ffffea, 27},
{0x7ffffeb, 27},
{0xffffffe, 28},
{0x7ffffec, 27},
{0x7ffffed, 27},
{0x7ffffee, 27},
{0x7ffffef, 27},
{0x7fffff0, 27},
{0x3ffffee, 26},
{0x3fffffff, 30}, };
{0x1ff8, 13}, {0x7fffd8, 23}, {0xfffffe2, 28}, {0xfffffe3, 28},
{0xfffffe4, 28}, {0xfffffe5, 28}, {0xfffffe6, 28}, {0xfffffe7, 28},
{0xfffffe8, 28}, {0xffffea, 24}, {0x3ffffffc, 30}, {0xfffffe9, 28},
{0xfffffea, 28}, {0x3ffffffd, 30}, {0xfffffeb, 28}, {0xfffffec, 28},
{0xfffffed, 28}, {0xfffffee, 28}, {0xfffffef, 28}, {0xffffff0, 28},
{0xffffff1, 28}, {0xffffff2, 28}, {0x3ffffffe, 30}, {0xffffff3, 28},
{0xffffff4, 28}, {0xffffff5, 28}, {0xffffff6, 28}, {0xffffff7, 28},
{0xffffff8, 28}, {0xffffff9, 28}, {0xffffffa, 28}, {0xffffffb, 28},
{0x14, 6}, {0x3f8, 10}, {0x3f9, 10}, {0xffa, 12},
{0x1ff9, 13}, {0x15, 6}, {0xf8, 8}, {0x7fa, 11},
{0x3fa, 10}, {0x3fb, 10}, {0xf9, 8}, {0x7fb, 11},
{0xfa, 8}, {0x16, 6}, {0x17, 6}, {0x18, 6},
{0x0, 5}, {0x1, 5}, {0x2, 5}, {0x19, 6},
{0x1a, 6}, {0x1b, 6}, {0x1c, 6}, {0x1d, 6},
{0x1e, 6}, {0x1f, 6}, {0x5c, 7}, {0xfb, 8},
{0x7ffc, 15}, {0x20, 6}, {0xffb, 12}, {0x3fc, 10},
{0x1ffa, 13}, {0x21, 6}, {0x5d, 7}, {0x5e, 7},
{0x5f, 7}, {0x60, 7}, {0x61, 7}, {0x62, 7},
{0x63, 7}, {0x64, 7}, {0x65, 7}, {0x66, 7},
{0x67, 7}, {0x68, 7}, {0x69, 7}, {0x6a, 7},
{0x6b, 7}, {0x6c, 7}, {0x6d, 7}, {0x6e, 7},
{0x6f, 7}, {0x70, 7}, {0x71, 7}, {0x72, 7},
{0xfc, 8}, {0x73, 7}, {0xfd, 8}, {0x1ffb, 13},
{0x7fff0, 19}, {0x1ffc, 13}, {0x3ffc, 14}, {0x22, 6},
{0x7ffd, 15}, {0x3, 5}, {0x23, 6}, {0x4, 5},
{0x24, 6}, {0x5, 5}, {0x25, 6}, {0x26, 6},
{0x27, 6}, {0x6, 5}, {0x74, 7}, {0x75, 7},
{0x28, 6}, {0x29, 6}, {0x2a, 6}, {0x7, 5},
{0x2b, 6}, {0x76, 7}, {0x2c, 6}, {0x8, 5},
{0x9, 5}, {0x2d, 6}, {0x77, 7}, {0x78, 7},
{0x79, 7}, {0x7a, 7}, {0x7b, 7}, {0x7ffe, 15},
{0x7fc, 11}, {0x3ffd, 14}, {0x1ffd, 13}, {0xffffffc, 28},
{0xfffe6, 20}, {0x3fffd2, 22}, {0xfffe7, 20}, {0xfffe8, 20},
{0x3fffd3, 22}, {0x3fffd4, 22}, {0x3fffd5, 22}, {0x7fffd9, 23},
{0x3fffd6, 22}, {0x7fffda, 23}, {0x7fffdb, 23}, {0x7fffdc, 23},
{0x7fffdd, 23}, {0x7fffde, 23}, {0xffffeb, 24}, {0x7fffdf, 23},
{0xffffec, 24}, {0xffffed, 24}, {0x3fffd7, 22}, {0x7fffe0, 23},
{0xffffee, 24}, {0x7fffe1, 23}, {0x7fffe2, 23}, {0x7fffe3, 23},
{0x7fffe4, 23}, {0x1fffdc, 21}, {0x3fffd8, 22}, {0x7fffe5, 23},
{0x3fffd9, 22}, {0x7fffe6, 23}, {0x7fffe7, 23}, {0xffffef, 24},
{0x3fffda, 22}, {0x1fffdd, 21}, {0xfffe9, 20}, {0x3fffdb, 22},
{0x3fffdc, 22}, {0x7fffe8, 23}, {0x7fffe9, 23}, {0x1fffde, 21},
{0x7fffea, 23}, {0x3fffdd, 22}, {0x3fffde, 22}, {0xfffff0, 24},
{0x1fffdf, 21}, {0x3fffdf, 22}, {0x7fffeb, 23}, {0x7fffec, 23},
{0x1fffe0, 21}, {0x1fffe1, 21}, {0x3fffe0, 22}, {0x1fffe2, 21},
{0x7fffed, 23}, {0x3fffe1, 22}, {0x7fffee, 23}, {0x7fffef, 23},
{0xfffea, 20}, {0x3fffe2, 22}, {0x3fffe3, 22}, {0x3fffe4, 22},
{0x7ffff0, 23}, {0x3fffe5, 22}, {0x3fffe6, 22}, {0x7ffff1, 23},
{0x3ffffe0, 26}, {0x3ffffe1, 26}, {0xfffeb, 20}, {0x7fff1, 19},
{0x3fffe7, 22}, {0x7ffff2, 23}, {0x3fffe8, 22}, {0x1ffffec, 25},
{0x3ffffe2, 26}, {0x3ffffe3, 26}, {0x3ffffe4, 26}, {0x7ffffde, 27},
{0x7ffffdf, 27}, {0x3ffffe5, 26}, {0xfffff1, 24}, {0x1ffffed, 25},
{0x7fff2, 19}, {0x1fffe3, 21}, {0x3ffffe6, 26}, {0x7ffffe0, 27},
{0x7ffffe1, 27}, {0x3ffffe7, 26}, {0x7ffffe2, 27}, {0xfffff2, 24},
{0x1fffe4, 21}, {0x1fffe5, 21}, {0x3ffffe8, 26}, {0x3ffffe9, 26},
{0xffffffd, 28}, {0x7ffffe3, 27}, {0x7ffffe4, 27}, {0x7ffffe5, 27},
{0xfffec, 20}, {0xfffff3, 24}, {0xfffed, 20}, {0x1fffe6, 21},
{0x3fffe9, 22}, {0x1fffe7, 21}, {0x1fffe8, 21}, {0x7ffff3, 23},
{0x3fffea, 22}, {0x3fffeb, 22}, {0x1ffffee, 25}, {0x1ffffef, 25},
{0xfffff4, 24}, {0xfffff5, 24}, {0x3ffffea, 26}, {0x7ffff4, 23},
{0x3ffffeb, 26}, {0x7ffffe6, 27}, {0x3ffffec, 26}, {0x3ffffed, 26},
{0x7ffffe7, 27}, {0x7ffffe8, 27}, {0x7ffffe9, 27}, {0x7ffffea, 27},
{0x7ffffeb, 27}, {0xffffffe, 28}, {0x7ffffec, 27}, {0x7ffffed, 27},
{0x7ffffee, 27}, {0x7ffffef, 27}, {0x7fffff0, 27}, {0x3ffffee, 26},
{0x3fffffff, 30},
};

@ -169,11 +169,6 @@ typedef struct grpc_chttp2_outstanding_ping {
struct grpc_chttp2_outstanding_ping *prev;
} grpc_chttp2_outstanding_ping;
typedef struct {
grpc_status_code status;
gpr_slice debug;
} grpc_chttp2_pending_goaway;
typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
@ -218,8 +213,8 @@ typedef struct {
so this is a strict over-estimation on the client */
gpr_uint32 concurrent_stream_count;
/** is there a goaway available? */
gpr_uint8 have_goaway;
/** is there a goaway available? (boolean) */
grpc_chttp2_error_state goaway_state;
/** what is the debug text of the goaway? */
gpr_slice goaway_text;
/** what is the status code of the goaway? */
@ -282,9 +277,9 @@ struct grpc_chttp2_transport_parsing {
/* active parser */
void *parser_data;
grpc_chttp2_stream_parsing *incoming_stream;
grpc_chttp2_parse_error (*parser)(void *parser_user_data,
grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing,
gpr_slice slice, int is_last);
grpc_chttp2_parse_error (*parser)(
void *parser_user_data, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
/* received settings */
gpr_uint32 settings[GRPC_CHTTP2_NUM_SETTINGS];
@ -473,55 +468,106 @@ struct grpc_chttp2_stream {
};
/** Transport writing call flow:
chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes are required;
if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the writes.
Once writes have been completed (meaning another write could potentially be started),
grpc_chttp2_terminate_writing is called. This will call grpc_chttp2_cleanup_writing, at which
chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes
are required;
if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the
writes.
Once writes have been completed (meaning another write could potentially be
started),
grpc_chttp2_terminate_writing is called. This will call
grpc_chttp2_cleanup_writing, at which
point the write phase is complete. */
/** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */
int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing);
void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint);
void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writing, int success);
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing);
int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
void grpc_chttp2_perform_writes(
grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint);
void grpc_chttp2_terminate_writing(
grpc_chttp2_transport_writing *transport_writing, int success);
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
/** Process one slice of incoming data */
void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing);
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice);
void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing);
void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
gpr_slice slice);
void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);
/** Get a writable stream
\return non-zero if there was a stream available */
void grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_add_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_parsing_seen_stream(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing);
int grpc_chttp2_list_pop_parsing_seen_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_parsing **stream_parsing);
void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success);
void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing);
#define GRPC_CHTTP2_FLOW_CTL_TRACE(a,b,c,d,e) do {} while (0)
void grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_have_writing_streams(
grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_list_pop_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_add_written_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_pop_written_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_add_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_parsing_seen_stream(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing);
int grpc_chttp2_list_pop_parsing_seen_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_parsing **stream_parsing);
void grpc_chttp2_schedule_closure(
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success);
void grpc_chttp2_read_write_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
void grpc_chttp2_parsing_add_metadata_batch(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing);
void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
gpr_slice goaway_text);
#define GRPC_CHTTP2_FLOW_CTL_TRACE(a, b, c, d, e) \
do { \
} while (0)
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING)-1)
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
extern int grpc_http_trace;

@ -41,29 +41,41 @@
#include <grpc/support/log.h>
static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_continuation);
static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
static int init_rst_stream_parser(grpc_chttp2_transport_parsing *transport_parsing);
static int init_settings_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
static int init_window_update_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
static int init_header_frame_parser(
grpc_chttp2_transport_parsing *transport_parsing, int is_continuation);
static int init_data_frame_parser(
grpc_chttp2_transport_parsing *transport_parsing);
static int init_rst_stream_parser(
grpc_chttp2_transport_parsing *transport_parsing);
static int init_settings_frame_parser(
grpc_chttp2_transport_parsing *transport_parsing);
static int init_window_update_frame_parser(
grpc_chttp2_transport_parsing *transport_parsing);
static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing);
static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing);
static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_header);
static int init_skip_frame_parser(
grpc_chttp2_transport_parsing *transport_parsing, int is_header);
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last);
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing,
gpr_slice slice, int is_last);
void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing) {
void grpc_chttp2_publish_reads(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_parsing *transport_parsing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_parsing *stream_parsing;
/* transport_parsing->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when
/* transport_parsing->last_incoming_stream_id is used as
last-grpc_chttp2_stream-id when
sending GOAWAY frame.
https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
says that last-grpc_chttp2_stream-id is peer-initiated grpc_chttp2_stream ID. So,
says that last-grpc_chttp2_stream-id is peer-initiated grpc_chttp2_stream
ID. So,
since we don't have server pushed streams, client should send
GOAWAY last-grpc_chttp2_stream-id=0 in this case. */
if (!transport_parsing->is_client) {
transport_global->last_incoming_stream_id = transport_parsing->incoming_stream_id;
transport_global->last_incoming_stream_id =
transport_parsing->incoming_stream_id;
}
/* TODO(ctiller): re-implement */
@ -101,13 +113,15 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, g
/* update global settings */
if (transport_parsing->settings_updated) {
memcpy(transport_global->settings[PEER_SETTINGS], transport_parsing->settings, sizeof(transport_parsing->settings));
memcpy(transport_global->settings[PEER_SETTINGS],
transport_parsing->settings, sizeof(transport_parsing->settings));
transport_parsing->settings_updated = 0;
}
/* update settings based on ack if received */
if (transport_parsing->settings_ack_received) {
memcpy(transport_global->settings[ACKED_SETTINGS], transport_global->settings[SENT_SETTINGS],
memcpy(transport_global->settings[ACKED_SETTINGS],
transport_global->settings[SENT_SETTINGS],
GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
transport_parsing->settings_ack_received = 0;
}
@ -115,20 +129,19 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, g
/* move goaway to the global state if we received one (it will be
published later */
if (transport_parsing->goaway_received) {
gpr_slice_unref(transport_global->goaway_text);
transport_global->goaway_text = gpr_slice_ref(transport_parsing->goaway_text);
transport_global->goaway_error = transport_parsing->goaway_error;
transport_global->have_goaway = 1;
grpc_chttp2_add_incoming_goaway(transport_global, transport_parsing->goaway_error, transport_parsing->goaway_text);
transport_parsing->goaway_received = 0;
}
/* for each stream that saw an update, fixup global state */
while (grpc_chttp2_list_pop_parsing_seen_stream(transport_global, transport_parsing, &stream_global, &stream_parsing)) {
while (grpc_chttp2_list_pop_parsing_seen_stream(
transport_global, transport_parsing, &stream_global, &stream_parsing)) {
/* update incoming flow control window */
if (stream_parsing->incoming_window_delta) {
stream_global->incoming_window -= stream_parsing->incoming_window_delta;
stream_parsing->incoming_window_delta = 0;
grpc_chttp2_list_add_writable_window_update_stream(transport_global, stream_global);
grpc_chttp2_list_add_writable_window_update_stream(transport_global,
stream_global);
}
/* update outgoing flow control window */
@ -145,7 +158,8 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, g
}
}
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice) {
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
gpr_slice slice) {
gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
@ -178,13 +192,16 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, g
case DTS_CLIENT_PREFIX_22:
case DTS_CLIENT_PREFIX_23:
while (cur != end && transport_parsing->deframe_state != DTS_FH_0) {
if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state]) {
if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing
->deframe_state]) {
gpr_log(GPR_ERROR,
"Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
"at byte %d",
GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state],
(int)(gpr_uint8)GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing->deframe_state], *cur,
(int)*cur, transport_parsing->deframe_state);
GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing
->deframe_state],
(int)(gpr_uint8)GRPC_CHTTP2_CLIENT_CONNECT_STRING
[transport_parsing->deframe_state],
*cur, (int)*cur, transport_parsing->deframe_state);
return 0;
}
++cur;
@ -267,7 +284,8 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, g
return 0;
}
if (transport_parsing->incoming_stream_id) {
transport_parsing->last_incoming_stream_id = transport_parsing->incoming_stream_id;
transport_parsing->last_incoming_stream_id =
transport_parsing->incoming_stream_id;
}
if (transport_parsing->incoming_frame_size == 0) {
if (!parse_frame_slice(transport_parsing, gpr_empty_slice(), 1)) {
@ -287,14 +305,18 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, g
GPR_ASSERT(cur < end);
if ((gpr_uint32)(end - cur) == transport_parsing->incoming_frame_size) {
if (!parse_frame_slice(
transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
transport_parsing,
gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
return 0;
}
transport_parsing->deframe_state = DTS_FH_0;
return 1;
} else if ((gpr_uint32)(end - cur) > transport_parsing->incoming_frame_size) {
} else if ((gpr_uint32)(end - cur) >
transport_parsing->incoming_frame_size) {
if (!parse_frame_slice(
transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg,
transport_parsing,
gpr_slice_sub_no_ref(
slice, cur - beg,
cur + transport_parsing->incoming_frame_size - beg),
1)) {
return 0;
@ -303,7 +325,8 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, g
goto dts_fh_0; /* loop */
} else {
if (!parse_frame_slice(
transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
transport_parsing,
gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
return 0;
}
transport_parsing->incoming_frame_size -= (end - cur);
@ -321,15 +344,19 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, g
static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
if (transport_parsing->expect_continuation_stream_id != 0) {
if (transport_parsing->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
if (transport_parsing->incoming_frame_type !=
GRPC_CHTTP2_FRAME_CONTINUATION) {
gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
transport_parsing->incoming_frame_type);
return 0;
}
if (transport_parsing->expect_continuation_stream_id != transport_parsing->incoming_stream_id) {
if (transport_parsing->expect_continuation_stream_id !=
transport_parsing->incoming_stream_id) {
gpr_log(GPR_ERROR,
"Expected CONTINUATION frame for grpc_chttp2_stream %08x, got grpc_chttp2_stream %08x",
transport_parsing->expect_continuation_stream_id, transport_parsing->incoming_stream_id);
"Expected CONTINUATION frame for grpc_chttp2_stream %08x, got "
"grpc_chttp2_stream %08x",
transport_parsing->expect_continuation_stream_id,
transport_parsing->incoming_stream_id);
return 0;
}
return init_header_frame_parser(transport_parsing, 1);
@ -353,20 +380,22 @@ static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
case GRPC_CHTTP2_FRAME_GOAWAY:
return init_goaway_parser(transport_parsing);
default:
gpr_log(GPR_ERROR, "Unknown frame type %02x", transport_parsing->incoming_frame_type);
gpr_log(GPR_ERROR, "Unknown frame type %02x",
transport_parsing->incoming_frame_type);
return init_skip_frame_parser(transport_parsing, 0);
}
}
static grpc_chttp2_parse_error skip_parser(void *parser,
grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing,
gpr_slice slice, int is_last) {
static grpc_chttp2_parse_error skip_parser(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
return GRPC_CHTTP2_PARSE_OK;
}
static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_header) {
static int init_skip_frame_parser(
grpc_chttp2_transport_parsing *transport_parsing, int is_header) {
if (is_header) {
int is_eoh = transport_parsing->expect_continuation_stream_id != 0;
transport_parsing->parser = grpc_chttp2_header_parser_parse;
@ -374,52 +403,68 @@ static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsi
transport_parsing->hpack_parser.on_header = skip_header;
transport_parsing->hpack_parser.on_header_user_data = NULL;
transport_parsing->hpack_parser.is_boundary = is_eoh;
transport_parsing->hpack_parser.is_eof = is_eoh ? transport_parsing->header_eof : 0;
transport_parsing->hpack_parser.is_eof =
is_eoh ? transport_parsing->header_eof : 0;
} else {
transport_parsing->parser = skip_parser;
}
return 1;
}
static void become_skip_parser(grpc_chttp2_transport_parsing *transport_parsing) {
init_skip_frame_parser(transport_parsing, transport_parsing->parser == grpc_chttp2_header_parser_parse);
static void become_skip_parser(
grpc_chttp2_transport_parsing *transport_parsing) {
init_skip_frame_parser(
transport_parsing,
transport_parsing->parser == grpc_chttp2_header_parser_parse);
}
static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) {
if (transport_parsing->incoming_frame_size > transport_parsing->incoming_window) {
static grpc_chttp2_parse_error update_incoming_window(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
if (transport_parsing->incoming_frame_size >
transport_parsing->incoming_window) {
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
transport_parsing->incoming_frame_size, transport_parsing->incoming_window);
transport_parsing->incoming_frame_size,
transport_parsing->incoming_window);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
if (transport_parsing->incoming_frame_size > stream_parsing->incoming_window) {
if (transport_parsing->incoming_frame_size >
stream_parsing->incoming_window) {
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
transport_parsing->incoming_frame_size, stream_parsing->incoming_window);
transport_parsing->incoming_frame_size,
stream_parsing->incoming_window);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, incoming, 0, -(gpr_int64)transport_parsing->incoming_frame_size);
GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, incoming, s->global.id, -(gpr_int64)transport_parsing->incoming_frame_size);
GRPC_CHTTP2_FLOW_CTL_TRACE(
t, t, incoming, 0, -(gpr_int64)transport_parsing->incoming_frame_size);
GRPC_CHTTP2_FLOW_CTL_TRACE(
t, s, incoming, s->global.id,
-(gpr_int64)transport_parsing->incoming_frame_size);
transport_parsing->incoming_window -= transport_parsing->incoming_frame_size;
stream_parsing->incoming_window -= transport_parsing->incoming_frame_size;
/* if the grpc_chttp2_stream incoming window is getting low, schedule an update */
stream_parsing->incoming_window_changed = 1;
stream_parsing->incoming_window_delta +=
transport_parsing->incoming_frame_size;
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
return GRPC_CHTTP2_PARSE_OK;
}
static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
grpc_chttp2_stream_parsing *stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id);
static int init_data_frame_parser(
grpc_chttp2_transport_parsing *transport_parsing) {
grpc_chttp2_stream_parsing *stream_parsing =
grpc_chttp2_parsing_lookup_stream(transport_parsing,
transport_parsing->incoming_stream_id);
grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
if (!stream_parsing || stream_parsing->received_close) return init_skip_frame_parser(transport_parsing, 0);
if (!stream_parsing || stream_parsing->received_close)
return init_skip_frame_parser(transport_parsing, 0);
if (err == GRPC_CHTTP2_PARSE_OK) {
err = update_incoming_window(transport_parsing, stream_parsing);
}
if (err == GRPC_CHTTP2_PARSE_OK) {
err = grpc_chttp2_data_parser_begin_frame(&stream_parsing->data_parser,
transport_parsing->incoming_frame_flags);
err = grpc_chttp2_data_parser_begin_frame(
&stream_parsing->data_parser, transport_parsing->incoming_frame_flags);
}
switch (err) {
case GRPC_CHTTP2_PARSE_OK:
@ -441,26 +486,32 @@ static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsi
static void free_timeout(void *p) { gpr_free(p); }
static void add_incoming_metadata(grpc_chttp2_stream_parsing *stream_parsing, grpc_mdelem *elem) {
if (stream_parsing->incoming_metadata_capacity == stream_parsing->incoming_metadata_count) {
static void add_incoming_metadata(grpc_chttp2_stream_parsing *stream_parsing,
grpc_mdelem *elem) {
if (stream_parsing->incoming_metadata_capacity ==
stream_parsing->incoming_metadata_count) {
stream_parsing->incoming_metadata_capacity =
GPR_MAX(8, 2 * stream_parsing->incoming_metadata_capacity);
stream_parsing->incoming_metadata =
gpr_realloc(stream_parsing->incoming_metadata, sizeof(*stream_parsing->incoming_metadata) *
gpr_realloc(stream_parsing->incoming_metadata,
sizeof(*stream_parsing->incoming_metadata) *
stream_parsing->incoming_metadata_capacity);
}
stream_parsing->incoming_metadata[stream_parsing->incoming_metadata_count++].md = elem;
stream_parsing->incoming_metadata[stream_parsing->incoming_metadata_count++]
.md = elem;
}
static void on_header(void *tp, grpc_mdelem *md) {
grpc_chttp2_transport_parsing *transport_parsing = tp;
grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream;
grpc_chttp2_stream_parsing *stream_parsing =
transport_parsing->incoming_stream;
GPR_ASSERT(stream_parsing);
IF_TRACING(gpr_log(
GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id, transport_parsing->is_client ? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id,
transport_parsing->is_client ? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->key),
grpc_mdstr_as_c_string(md->value)));
if (md->key == transport_parsing->str_grpc_timeout) {
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
@ -475,7 +526,8 @@ static void on_header(void *tp, grpc_mdelem *md) {
}
grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
}
stream_parsing->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
stream_parsing->incoming_deadline =
gpr_time_add(gpr_now(), *cached_timeout);
grpc_mdelem_unref(md);
} else {
add_incoming_metadata(stream_parsing, md);
@ -484,49 +536,61 @@ static void on_header(void *tp, grpc_mdelem *md) {
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
}
static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) {
int is_eoh =
(transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
static int init_header_frame_parser(
grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) {
int is_eoh = (transport_parsing->incoming_frame_flags &
GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
grpc_chttp2_stream_parsing *stream_parsing;
if (is_eoh) {
transport_parsing->expect_continuation_stream_id = 0;
} else {
transport_parsing->expect_continuation_stream_id = transport_parsing->incoming_stream_id;
transport_parsing->expect_continuation_stream_id =
transport_parsing->incoming_stream_id;
}
if (!is_continuation) {
transport_parsing->header_eof =
(transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
transport_parsing->header_eof = (transport_parsing->incoming_frame_flags &
GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
}
/* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */
stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id);
stream_parsing = grpc_chttp2_parsing_lookup_stream(
transport_parsing, transport_parsing->incoming_stream_id);
if (!stream_parsing) {
if (is_continuation) {
gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received");
gpr_log(GPR_ERROR,
"grpc_chttp2_stream disbanded before CONTINUATION received");
return init_skip_frame_parser(transport_parsing, 1);
}
if (transport_parsing->is_client) {
if ((transport_parsing->incoming_stream_id & 1) &&
transport_parsing->incoming_stream_id < transport_parsing->next_stream_id) {
transport_parsing->incoming_stream_id <
transport_parsing->next_stream_id) {
/* this is an old (probably cancelled) grpc_chttp2_stream */
} else {
gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client");
gpr_log(GPR_ERROR,
"ignoring new grpc_chttp2_stream creation on client");
}
return init_skip_frame_parser(transport_parsing, 1);
} else if (transport_parsing->last_incoming_stream_id > transport_parsing->incoming_stream_id) {
} else if (transport_parsing->last_incoming_stream_id >
transport_parsing->incoming_stream_id) {
gpr_log(GPR_ERROR,
"ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream "
"ignoring out of order new grpc_chttp2_stream request on server; "
"last grpc_chttp2_stream "
"id=%d, new grpc_chttp2_stream id=%d",
transport_parsing->last_incoming_stream_id, transport_parsing->incoming_stream_id);
transport_parsing->last_incoming_stream_id,
transport_parsing->incoming_stream_id);
return init_skip_frame_parser(transport_parsing, 1);
} else if ((transport_parsing->incoming_stream_id & 1) == 0) {
gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d",
gpr_log(GPR_ERROR,
"ignoring grpc_chttp2_stream with non-client generated index %d",
transport_parsing->incoming_stream_id);
return init_skip_frame_parser(transport_parsing, 1);
}
stream_parsing = transport_parsing->incoming_stream = grpc_chttp2_parsing_accept_stream(transport_parsing, transport_parsing->incoming_stream_id);
stream_parsing = transport_parsing->incoming_stream =
grpc_chttp2_parsing_accept_stream(
transport_parsing, transport_parsing->incoming_stream_id);
if (!stream_parsing) {
gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted");
return init_skip_frame_parser(transport_parsing, 1);
@ -544,15 +608,17 @@ static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_par
transport_parsing->hpack_parser.on_header = on_header;
transport_parsing->hpack_parser.on_header_user_data = transport_parsing;
transport_parsing->hpack_parser.is_boundary = is_eoh;
transport_parsing->hpack_parser.is_eof = is_eoh ? transport_parsing->header_eof : 0;
if (!is_continuation &&
(transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
transport_parsing->hpack_parser.is_eof =
is_eoh ? transport_parsing->header_eof : 0;
if (!is_continuation && (transport_parsing->incoming_frame_flags &
GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
grpc_chttp2_hpack_parser_set_has_priority(&transport_parsing->hpack_parser);
}
return 1;
}
static int init_window_update_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
static int init_window_update_frame_parser(
grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
&transport_parsing->simple.window_update,
transport_parsing->incoming_frame_size,
@ -563,8 +629,8 @@ static int init_window_update_frame_parser(grpc_chttp2_transport_parsing *transp
}
static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK ==
grpc_chttp2_ping_parser_begin_frame(&transport_parsing->simple.ping,
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_ping_parser_begin_frame(
&transport_parsing->simple.ping,
transport_parsing->incoming_frame_size,
transport_parsing->incoming_frame_flags);
transport_parsing->parser = grpc_chttp2_ping_parser_parse;
@ -572,7 +638,8 @@ static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) {
return ok;
}
static int init_rst_stream_parser(grpc_chttp2_transport_parsing *transport_parsing) {
static int init_rst_stream_parser(
grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame(
&transport_parsing->simple.rst_stream,
transport_parsing->incoming_frame_size,
@ -582,28 +649,32 @@ static int init_rst_stream_parser(grpc_chttp2_transport_parsing *transport_parsi
return ok;
}
static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing) {
int ok =
GRPC_CHTTP2_PARSE_OK ==
grpc_chttp2_goaway_parser_begin_frame(
&transport_parsing->goaway_parser, transport_parsing->incoming_frame_size, transport_parsing->incoming_frame_flags);
static int init_goaway_parser(
grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_goaway_parser_begin_frame(
&transport_parsing->goaway_parser,
transport_parsing->incoming_frame_size,
transport_parsing->incoming_frame_flags);
transport_parsing->parser = grpc_chttp2_goaway_parser_parse;
transport_parsing->parser_data = &transport_parsing->goaway_parser;
return ok;
}
static int init_settings_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
static int init_settings_frame_parser(
grpc_chttp2_transport_parsing *transport_parsing) {
int ok;
if (transport_parsing->incoming_stream_id != 0) {
gpr_log(GPR_ERROR, "settings frame received for grpc_chttp2_stream %d", transport_parsing->incoming_stream_id);
gpr_log(GPR_ERROR, "settings frame received for grpc_chttp2_stream %d",
transport_parsing->incoming_stream_id);
return 0;
}
ok = GRPC_CHTTP2_PARSE_OK ==
grpc_chttp2_settings_parser_begin_frame(
&transport_parsing->simple.settings, transport_parsing->incoming_frame_size,
transport_parsing->incoming_frame_flags, transport_parsing->settings);
ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_settings_parser_begin_frame(
&transport_parsing->simple.settings,
transport_parsing->incoming_frame_size,
transport_parsing->incoming_frame_flags,
transport_parsing->settings);
if (!ok) {
return 0;
}
@ -623,7 +694,9 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
}
*/
void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) {
void grpc_chttp2_parsing_add_metadata_batch(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
grpc_metadata_batch b;
b.list.head = NULL;
@ -639,7 +712,8 @@ void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *trans
grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b);
}
static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, grpc_chttp2_stream_parsing *stream_parsing) {
static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global,
grpc_chttp2_stream_parsing *stream_parsing) {
grpc_stream_op *ops = stream_global->incoming_sopb->ops;
size_t nops = stream_global->incoming_sopb->nops;
size_t i;
@ -663,10 +737,13 @@ static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, grpc_ch
GPR_ASSERT(last_mdidx <= stream_parsing->incoming_metadata_count);
/* turn the array into a doubly linked list */
op->data.metadata.list.head = &stream_parsing->incoming_metadata[mdidx];
op->data.metadata.list.tail = &stream_parsing->incoming_metadata[last_mdidx - 1];
op->data.metadata.list.tail =
&stream_parsing->incoming_metadata[last_mdidx - 1];
for (j = mdidx + 1; j < last_mdidx; j++) {
stream_parsing->incoming_metadata[j].prev = &stream_parsing->incoming_metadata[j - 1];
stream_parsing->incoming_metadata[j - 1].next = &stream_parsing->incoming_metadata[j];
stream_parsing->incoming_metadata[j].prev =
&stream_parsing->incoming_metadata[j - 1];
stream_parsing->incoming_metadata[j - 1].next =
&stream_parsing->incoming_metadata[j];
}
stream_parsing->incoming_metadata[mdidx].prev = NULL;
stream_parsing->incoming_metadata[last_mdidx - 1].next = NULL;
@ -678,12 +755,14 @@ static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, grpc_ch
if (mdidx != stream_parsing->incoming_metadata_count) {
/* we have a partially read metadata batch still in incoming_metadata */
size_t new_count = stream_parsing->incoming_metadata_count - mdidx;
size_t copy_bytes = sizeof(*stream_parsing->incoming_metadata) * new_count;
size_t copy_bytes =
sizeof(*stream_parsing->incoming_metadata) * new_count;
GPR_ASSERT(mdidx < stream_parsing->incoming_metadata_count);
stream_parsing->incoming_metadata = gpr_malloc(copy_bytes);
memcpy(stream_parsing->old_incoming_metadata + mdidx, stream_parsing->incoming_metadata,
copy_bytes);
stream_parsing->incoming_metadata_count = stream_parsing->incoming_metadata_capacity = new_count;
memcpy(stream_parsing->old_incoming_metadata + mdidx,
stream_parsing->incoming_metadata, copy_bytes);
stream_parsing->incoming_metadata_count =
stream_parsing->incoming_metadata_capacity = new_count;
} else {
stream_parsing->incoming_metadata = NULL;
stream_parsing->incoming_metadata_count = 0;
@ -692,12 +771,17 @@ static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, grpc_ch
}
}
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream;
switch (transport_parsing->parser(transport_parsing->parser_data, transport_parsing, stream_parsing, slice, is_last)) {
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing,
gpr_slice slice, int is_last) {
grpc_chttp2_stream_parsing *stream_parsing =
transport_parsing->incoming_stream;
switch (transport_parsing->parser(transport_parsing->parser_data,
transport_parsing, stream_parsing, slice,
is_last)) {
case GRPC_CHTTP2_PARSE_OK:
if (stream_parsing) {
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
}
return 1;
case GRPC_CHTTP2_STREAM_ERROR:
@ -714,10 +798,6 @@ static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, g
return 0;
}
#if 0
if (st.end_of_stream) {
transport_parsing->incoming_stream->read_closed = 1;

@ -67,7 +67,8 @@ void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map,
gpr_uint32 key);
/* Move all elements of src into dst */
void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, grpc_chttp2_stream_map *dst);
void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src,
grpc_chttp2_stream_map *dst);
/* Return an existing key, or NULL if it does not exist */
void *grpc_chttp2_stream_map_find(grpc_chttp2_stream_map *map, gpr_uint32 key);

@ -66,8 +66,7 @@ void grpc_chttp2_hpack_write_varint_tail(gpr_uint32 tail_value,
} else { \
(tgt)[0] = (prefix_or) | GRPC_CHTTP2_MAX_IN_PREFIX(prefix_bits); \
grpc_chttp2_hpack_write_varint_tail( \
(n) - GRPC_CHTTP2_MAX_IN_PREFIX(prefix_bits), (tgt) + 1, \
(length) - 1); \
(n)-GRPC_CHTTP2_MAX_IN_PREFIX(prefix_bits), (tgt) + 1, (length)-1); \
} \
} while (0)

@ -39,7 +39,9 @@
static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status);
int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) {
int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;
gpr_uint32 window_delta;
@ -48,25 +50,32 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *transport_g
gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf);
GPR_ASSERT(transport_global->qbuf.count == 0);
if (transport_global->dirtied_local_settings && !transport_global->sent_local_settings) {
if (transport_global->dirtied_local_settings &&
!transport_global->sent_local_settings) {
gpr_slice_buffer_add(
&transport_writing->outbuf, grpc_chttp2_settings_create(
transport_global->settings[SENT_SETTINGS], transport_global->settings[LOCAL_SETTINGS],
transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
&transport_writing->outbuf,
grpc_chttp2_settings_create(transport_global->settings[SENT_SETTINGS],
transport_global->settings[LOCAL_SETTINGS],
transport_global->force_send_settings,
GRPC_CHTTP2_NUM_SETTINGS));
transport_global->force_send_settings = 0;
transport_global->dirtied_local_settings = 0;
transport_global->sent_local_settings = 1;
}
/* for each grpc_chttp2_stream that's become writable, frame it's data (according to
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to
available window sizes) and add to the output buffer */
while (transport_global->outgoing_window &&
grpc_chttp2_list_pop_writable_stream(transport_global, transport_writing, &stream_global, &stream_writing) &&
grpc_chttp2_list_pop_writable_stream(transport_global,
transport_writing, &stream_global,
&stream_writing) &&
stream_global->outgoing_window > 0) {
stream_writing->id = stream_global->id;
window_delta = grpc_chttp2_preencode(
stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops,
GPR_MIN(transport_global->outgoing_window, stream_global->outgoing_window),
GPR_MIN(transport_global->outgoing_window,
stream_global->outgoing_window),
&stream_writing->sopb);
GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
@ -81,51 +90,63 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *transport_g
stream_writing->send_closed = SEND_CLOSED;
}
}
if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != DONT_SEND_CLOSED) {
if (stream_writing->sopb.nops > 0 ||
stream_writing->send_closed != DONT_SEND_CLOSED) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
/* we should either exhaust window or have no ops left, but not both */
if (stream_global->outgoing_sopb->nops == 0) {
stream_global->outgoing_sopb = NULL;
grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 1);
grpc_chttp2_schedule_closure(transport_global,
stream_global->send_done_closure, 1);
} else if (stream_global->outgoing_window) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
}
/* for each grpc_chttp2_stream that wants to update its window, add that window here */
while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, &stream_global)) {
/* for each grpc_chttp2_stream that wants to update its window, add that
* window here */
while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global,
&stream_global)) {
window_delta =
transport_global->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
transport_global->settings[LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
stream_global->incoming_window;
if (!stream_global->read_closed && window_delta > 0) {
gpr_slice_buffer_add(
&transport_writing->outbuf, grpc_chttp2_window_update_create(stream_global->id, window_delta));
&transport_writing->outbuf,
grpc_chttp2_window_update_create(stream_global->id, window_delta));
GRPC_CHTTP2_FLOW_CTL_TRACE(t, s, incoming, s->id, window_delta);
stream_global->incoming_window += window_delta;
}
}
/* if the grpc_chttp2_transport is ready to send a window update, do so here also */
if (transport_global->incoming_window < transport_global->connection_window_target * 3 / 4) {
window_delta = transport_global->connection_window_target - transport_global->incoming_window;
/* if the grpc_chttp2_transport is ready to send a window update, do so here
* also */
if (transport_global->incoming_window <
transport_global->connection_window_target * 3 / 4) {
window_delta = transport_global->connection_window_target -
transport_global->incoming_window;
gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_window_update_create(0, window_delta));
GRPC_CHTTP2_FLOW_CTL_TRACE(t, t, incoming, 0, window_delta);
transport_global->incoming_window += window_delta;
}
return transport_writing->outbuf.length > 0 || grpc_chttp2_list_have_writing_streams(transport_writing);
return transport_writing->outbuf.length > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing);
}
void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint) {
void grpc_chttp2_perform_writes(
grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint) {
finalize_outbuf(transport_writing);
GPR_ASSERT(transport_writing->outbuf.count > 0);
switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices, transport_writing->outbuf.count,
finish_write_cb, transport_writing)) {
switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices,
transport_writing->outbuf.count, finish_write_cb,
transport_writing)) {
case GRPC_ENDPOINT_WRITE_DONE:
grpc_chttp2_terminate_writing(transport_writing, 1);
break;
@ -140,14 +161,17 @@ void grpc_chttp2_perform_writes(grpc_chttp2_transport_writing *transport_writing
static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_writing *stream_writing;
while (grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
stream_writing->send_closed != DONT_SEND_CLOSED, stream_writing->id,
&transport_writing->hpack_compressor, &transport_writing->outbuf);
stream_writing->send_closed != DONT_SEND_CLOSED,
stream_writing->id, &transport_writing->hpack_compressor,
&transport_writing->outbuf);
stream_writing->sopb.nops = 0;
if (stream_writing->send_closed == SEND_CLOSED_WITH_RST_STREAM) {
gpr_slice_buffer_add(&transport_writing->outbuf, grpc_chttp2_rst_stream_create(
stream_writing->id, GRPC_CHTTP2_NO_ERROR));
gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_rst_stream_create(stream_writing->id,
GRPC_CHTTP2_NO_ERROR));
}
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
@ -155,14 +179,18 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) {
grpc_chttp2_transport_writing *transport_writing = tw;
grpc_chttp2_terminate_writing(transport_writing, write_status == GRPC_ENDPOINT_CB_OK);
grpc_chttp2_terminate_writing(transport_writing,
write_status == GRPC_ENDPOINT_CB_OK);
}
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) {
void grpc_chttp2_cleanup_writing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_written_stream(transport_global, transport_writing, &stream_global, &stream_writing)) {
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->send_closed != DONT_SEND_CLOSED) {
stream_global->write_state = WRITE_STATE_SENT_CLOSE;
if (!transport_global->is_client) {

@ -64,31 +64,54 @@ int grpc_flowctl_trace = 0;
else \
flowctl_trace(t, #dir, obj->dir##_window, id, delta)
#define TRANSPORT_FROM_WRITING(tw) ((grpc_chttp2_transport*)((char*)(tw) - offsetof(grpc_chttp2_transport, writing)))
#define TRANSPORT_FROM_WRITING(tw) \
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
writing)))
static const grpc_transport_vtable vtable;
static void lock(grpc_chttp2_transport *t);
static void unlock(grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(void *t, int iomgr_success_ignored);
static void notify_closed(void *t, int iomgr_success_ignored);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value);
static void lock(grpc_chttp2_transport *t);
static void unlock(grpc_chttp2_transport *t);
/** Endpoint callback to process incoming data */
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error);
/** Start disconnection chain */
static void drop_connection(grpc_chttp2_transport *t);
/* basic stream list management */
static grpc_chttp2_stream *stream_list_remove_head(
grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id);
static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id);
static void stream_list_add_tail(grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id);
static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id);
/** schedule a closure to be called outside of the transport lock after the next
unlock() operation */
static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure,
int success);
#if 0
static void unlock_check_cancellations(grpc_chttp2_transport *t);
static void unlock_check_parser(grpc_chttp2_transport *t);
static void unlock_check_channel_callbacks(grpc_chttp2_transport *t);
static void writing_action(void *t, int iomgr_success_ignored);
static void notify_closed(void *t, int iomgr_success_ignored);
static void drop_connection(grpc_chttp2_transport *t);
static void end_all_the_calls(grpc_chttp2_transport *t);
static grpc_chttp2_stream *stream_list_remove_head(grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id);
static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id);
static void stream_list_add_tail(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id);
static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id);
static void cancel_stream_id(grpc_chttp2_transport *t, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, int send_rst);
@ -96,24 +119,27 @@ static void cancel_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message, int send_rst);
static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id);
static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t,
gpr_uint32 id);
static void remove_from_stream_map(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
static void maybe_start_some_streams(grpc_chttp2_transport *t);
static void parsing_become_skip_parser(grpc_chttp2_transport *t);
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error);
static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure, int success);
static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, int is_parser);
static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset);
static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_transport_op *op);
static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
int is_parser);
static void maybe_join_window_updates(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
static void add_to_pollset_locked(grpc_chttp2_transport *t,
grpc_pollset *pollset);
static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_transport_op *op);
static void add_metadata_batch(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
#endif
static void flowctl_trace(grpc_chttp2_transport *t, const char *flow, gpr_int32 window,
gpr_uint32 id, gpr_int32 delta) {
static void flowctl_trace(grpc_chttp2_transport *t, const char *flow,
gpr_int32 window, gpr_uint32 id, gpr_int32 delta) {
gpr_log(GPR_DEBUG, "HTTP:FLOW:%p:%d:%s: %d + %d = %d", t, id, flow, window,
delta, window + delta);
}
@ -176,15 +202,17 @@ static void unref_transport(grpc_chttp2_transport *t) {
static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callback setup,
void *arg, const grpc_channel_args *channel_args,
static void init_transport(grpc_chttp2_transport *t,
grpc_transport_setup_callback setup, void *arg,
const grpc_channel_args *channel_args,
grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
grpc_mdctx *mdctx, int is_client) {
size_t i;
int j;
grpc_transport_setup_result sr;
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
memset(t, 0, sizeof(*t));
@ -220,7 +248,8 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t);
if (is_client) {
gpr_slice_buffer_add(&t->global.qbuf,
gpr_slice_buffer_add(
&t->global.qbuf,
gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
}
/* 8 is a random stab in the dark as to a good initial size: it's small enough
@ -234,7 +263,8 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
/* copy in initial settings to all setting sets */
for (i = 0; i < NUM_SETTING_SETS; i++) {
for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) {
t->global.settings[i][j] = grpc_chttp2_settings_parameters[j].default_value;
t->global.settings[i][j] =
grpc_chttp2_settings_parameters[j].default_value;
}
}
t->global.dirtied_local_settings = 1;
@ -272,7 +302,8 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
} else if ((t->global.next_stream_id & 1) !=
(channel_args->args[i].value.integer & 1)) {
gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->global.next_stream_id & 1,
GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER,
t->global.next_stream_id & 1,
is_client ? "client" : "server");
} else {
t->global.next_stream_id = channel_args->args[i].value.integer;
@ -355,9 +386,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
GPR_ASSERT(t->parsing_active);
s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
s->global.outgoing_window =
t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
t->global
.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->global.incoming_window =
t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
t->global
.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
}
@ -375,7 +408,8 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_lock(&t->mu);
GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || s->global.id == 0);
GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
s->global.id == 0);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
stream_list_remove(t, s, i);
@ -400,11 +434,13 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
* LIST MANAGEMENT
*/
static int stream_list_empty(grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id) {
static int stream_list_empty(grpc_chttp2_transport *t,
grpc_chttp2_stream_list_id id) {
return t->lists[id].head == NULL;
}
static grpc_chttp2_stream *stream_list_remove_head(grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id) {
static grpc_chttp2_stream *stream_list_remove_head(
grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *s = t->lists[id].head;
if (s) {
grpc_chttp2_stream *new_head = s->links[id].next;
@ -421,7 +457,8 @@ static grpc_chttp2_stream *stream_list_remove_head(grpc_chttp2_transport *t, grp
return s;
}
static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) {
static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (!s->included[id]) return;
s->included[id] = 0;
if (s->links[id].prev) {
@ -437,7 +474,9 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
}
}
static void stream_list_add_tail(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) {
static void stream_list_add_tail(grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *old_tail;
GPR_ASSERT(!s->included[id]);
old_tail = t->lists[id].tail;
@ -453,7 +492,8 @@ static void stream_list_add_tail(grpc_chttp2_transport *t, grpc_chttp2_stream *s
s->included[id] = 1;
}
static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) {
static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
return;
}
@ -475,7 +515,8 @@ static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream
* LOCK MANAGEMENT
*/
/* We take a grpc_chttp2_transport-global lock in response to calls coming in from above,
/* We take a grpc_chttp2_transport-global lock in response to calls coming in
from above,
and in response to data being received from below. New data to be written
is always queued, as are callbacks to process data. During unlock() we
check our todo lists and initiate callbacks and flush writes. */
@ -485,14 +526,15 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_closure *run_closures;
if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
if (!t->writing_active &&
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
ref_transport(t);
schedule_cb(t, &t->writing_action, 1);
}
unlock_check_cancellations(t);
unlock_check_parser(t);
unlock_check_channel_callbacks(t);
/* unlock_check_cancellations(t); */
/* unlock_check_parser(t); */
/* unlock_check_channel_callbacks(t); */
run_closures = t->global.pending_closures;
t->global.pending_closures = NULL;
@ -525,7 +567,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
}
}
void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writing, int success) {
void grpc_chttp2_terminate_writing(
grpc_chttp2_transport_writing *transport_writing, int success) {
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
lock(t);
@ -551,34 +594,38 @@ void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writ
unref_transport(t);
}
static void writing_action(void *gt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = gt;
grpc_chttp2_perform_writes(&t->writing, t->ep);
}
static void add_goaway(grpc_chttp2_transport *t, gpr_uint32 goaway_error,
void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
gpr_slice goaway_text) {
gpr_slice_unref(t->channel_callback.goaway_text);
t->channel_callback.have_goaway = 1;
t->channel_callback.goaway_text = goaway_text;
t->channel_callback.goaway_error = goaway_error;
if (transport_global->goaway_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
transport_global->goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
transport_global->goaway_text = goaway_text;
transport_global->goaway_error = goaway_error;
} else {
gpr_slice_unref(goaway_text);
}
}
static void maybe_start_some_streams(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
/* start streams where we have free grpc_chttp2_stream ids and free concurrency */
/* start streams where we have free grpc_chttp2_stream ids and free
* concurrency */
while (t->global.next_stream_id <= MAX_CLIENT_STREAM_ID &&
t->global.concurrent_stream_count <
t->global.settings[PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
(s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
IF_TRACING(gpr_log(
GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
t->global.is_client ? "CLI" : "SVR", s, t->global.next_stream_id));
if (t->global.next_stream_id == MAX_CLIENT_STREAM_ID) {
add_goaway(
t, GRPC_CHTTP2_NO_ERROR,
grpc_chttp2_add_incoming_goaway(
&t->global, GRPC_CHTTP2_NO_ERROR,
gpr_slice_from_copied_string("Exceeded sequence number limit"));
}
@ -586,15 +633,18 @@ static void maybe_start_some_streams(grpc_chttp2_transport *t) {
s->global.id = t->global.next_stream_id;
t->global.next_stream_id += 2;
s->global.outgoing_window =
t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
t->global
.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->global.incoming_window =
t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
t->global
.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
t->global.concurrent_stream_count++;
stream_list_join(t, s, WRITABLE);
}
/* cancel out streams that will never be started */
while (t->global.next_stream_id > MAX_CLIENT_STREAM_ID && (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
while (t->global.next_stream_id > MAX_CLIENT_STREAM_ID &&
(s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
cancel_stream(
t, s, GRPC_STATUS_UNAVAILABLE,
grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL,
@ -816,9 +866,9 @@ static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, i
stream_list_join(t, s, FINISHED_READ_OP);
}
}
#endif
static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
static void maybe_join_window_updates(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
if (t->parsing.executing) {
stream_list_join(t, s, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE);
return;
@ -832,7 +882,6 @@ static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stre
}
}
#if 0
static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
return grpc_chttp2_stream_map_find(&t->stream_map, id);
}
@ -867,15 +916,19 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
t->parsing_active = 1;
grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
gpr_mu_unlock(&t->mu);
for (i = 0; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); i++)
for (i = 0;
i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
i++)
;
gpr_mu_lock(&t->mu);
if (i != nslices) {
drop_connection(t);
}
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map);
t->global.concurrent_stream_count = grpc_stream_map_size(&t->parsing_stream_map);
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
t->global.concurrent_stream_count =
grpc_chttp2_stream_map_size(&t->parsing_stream_map);
/* handle higher level things */
grpc_chttp2_publish_reads(&t->global, &t->parsing);
t->parsing_active = 0;
@ -932,25 +985,18 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
typedef struct {
grpc_chttp2_transport *t;
grpc_chttp2_pending_goaway *goaways;
size_t num_goaways;
gpr_uint32 error;
gpr_slice text;
grpc_iomgr_closure closure;
} notify_goaways_args;
static void notify_goaways(void *p, int iomgr_success_ignored) {
size_t i;
notify_goaways_args *a = p;
grpc_chttp2_transport *t = a->t;
for (i = 0; i < a->num_goaways; i++) {
t->channel_callback.cb->goaway(
t->channel_callback.cb_user_data,
&t->base,
a->goaways[i].status,
a->goaways[i].debug);
}
t->channel_callback.cb->goaway(t->channel_callback.cb_user_data, &t->base,
a->error, a->text);
gpr_free(a->goaways);
gpr_free(a);
lock(t);
@ -960,49 +1006,47 @@ static void notify_goaways(void *p, int iomgr_success_ignored) {
unref_transport(t);
}
static void notify_closed(void *gt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = gt;
t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base);
lock(t);
t->channel_callback.executing = 0;
unlock(t);
unref_transport(t);
}
static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
if (t->channel_callback.executing) {
return;
}
if (t->parsing.executing) {
return;
}
if (t->num_pending_goaways) {
if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NONE) {
if (t->global.goaway_state == GRPC_CHTTP2_ERROR_STATE_SEEN &&
t->global.error_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) {
notify_goaways_args *a = gpr_malloc(sizeof(*a));
a->goaways = t->pending_goaways;
a->num_goaways = t->num_pending_goaways;
t->pending_goaways = NULL;
t->num_pending_goaways = 0;
t->cap_pending_goaways = 0;
a->error = t->global.goaway_error;
a->text = t->global.goaway_text;
t->global.goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
t->channel_callback.executing = 1;
grpc_iomgr_closure_init(&a->closure, notify_goaways, a);
ref_transport(t);
schedule_cb(t, &a->closure, 1);
return;
}
if (t->writing.executing) {
} else if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) {
return;
}
if (t->error_state == ERROR_STATE_SEEN) {
t->error_state = ERROR_STATE_NOTIFIED;
}
if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_SEEN) {
t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
t->channel_callback.executing = 1;
ref_transport(t);
schedule_cb(t, &t->channel_callback.notify_closed, 1);
}
}
static void notify_closed(void *gt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = gt;
t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base);
lock(t);
t->channel_callback.executing = 0;
unlock(t);
unref_transport(t);
}
static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure, int success) {
static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure,
int success) {
closure->success = success;
closure->next = t->global.pending_closures;
t->global.pending_closures = closure;
@ -1012,7 +1056,8 @@ static void schedule_cb(grpc_chttp2_transport *t, grpc_iomgr_closure *closure, i
* POLLSET STUFF
*/
static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset) {
static void add_to_pollset_locked(grpc_chttp2_transport *t,
grpc_pollset *pollset) {
if (t->ep) {
grpc_endpoint_add_to_pollset(t->ep, pollset);
}
@ -1029,10 +1074,15 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
* INTEGRATION GLUE
*/
static const grpc_transport_vtable vtable = {
sizeof(grpc_chttp2_stream), init_stream, perform_op,
add_to_pollset, destroy_stream, goaway,
close_transport, send_ping, destroy_transport};
static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
init_stream,
perform_op,
add_to_pollset,
destroy_stream,
goaway,
close_transport,
send_ping,
destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg,

@ -73,9 +73,8 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
transport->vtable->destroy_stream(transport, stream);
}
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data) {
transport->vtable->ping(transport, cb, user_data);
void grpc_transport_ping(grpc_transport *transport, grpc_iomgr_closure *cb) {
transport->vtable->ping(transport, cb);
}
void grpc_transport_setup_cancel(grpc_transport_setup *setup) {

@ -164,11 +164,8 @@ void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
/* Send a ping on a transport
Calls cb with user data when a response is received.
cb *MAY* be called with arbitrary transport level locks held. It is not safe
to call into the transport during cb. */
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data);
Calls cb with user data when a response is received. */
void grpc_transport_ping(grpc_transport *transport, grpc_iomgr_closure *cb);
/* Advise peer of pending connection termination. */
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,

@ -63,8 +63,7 @@ typedef struct grpc_transport_vtable {
void (*close)(grpc_transport *self);
/* implementation of grpc_transport_ping */
void (*ping)(grpc_transport *self, void (*cb)(void *user_data),
void *user_data);
void (*ping)(grpc_transport *self, grpc_iomgr_closure *cb);
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_transport *self);

Loading…
Cancel
Save