Get parsing/writing compiling again

pull/2149/head
Craig Tiller 10 years ago
parent d20efd26e3
commit 3719f07233
  1. 19
      src/core/transport/chttp2/frame_ping.c
  2. 8
      src/core/transport/chttp2/frame_rst_stream.c
  3. 9
      src/core/transport/chttp2/frame_settings.c
  4. 12
      src/core/transport/chttp2/frame_window_update.c
  5. 12
      src/core/transport/chttp2/hpack_parser.c
  6. 36
      src/core/transport/chttp2/internal.h
  7. 73
      src/core/transport/chttp2/parsing.c
  8. 8
      src/core/transport/chttp2/writing.c

@ -32,9 +32,11 @@
*/
#include "src/core/transport/chttp2/frame_ping.h"
#include "src/core/transport/chttp2/internal.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
gpr_slice grpc_chttp2_ping_create(gpr_uint8 ack, gpr_uint8 *opaque_8bytes) {
@ -67,12 +69,14 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
grpc_chttp2_ping_parser *p = parser;
grpc_chttp2_outstanding_ping *ping;
while (p->byte != 8 && cur != end) {
p->opaque_8bytes[p->byte] = *cur;
@ -83,9 +87,18 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
if (p->byte == 8) {
GPR_ASSERT(is_last);
if (p->is_ack) {
state->process_ping_reply = 1;
for (ping = transport_parsing->pings.next; ping != &transport_parsing->pings; ping = ping->next) {
if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) {
grpc_iomgr_add_delayed_callback(ping->on_recv, 1);
}
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
}
} else {
state->send_ping_ack = 1;
gpr_slice_buffer_add(
&transport_parsing->qbuf,
grpc_chttp2_ping_create(1, p->opaque_8bytes));
}
}

@ -32,6 +32,7 @@
*/
#include "src/core/transport/chttp2/frame_rst_stream.h"
#include "src/core/transport/chttp2/internal.h"
#include <grpc/support/log.h>
@ -69,7 +70,7 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
@ -84,8 +85,9 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
if (p->byte == 4) {
GPR_ASSERT(is_last);
state->rst_stream = 1;
state->rst_stream_reason =
stream_parsing->received_close = 1;
stream_parsing->saw_rst_stream = 1;
stream_parsing->rst_stream_reason =
(((gpr_uint32)p->reason_bytes[0]) << 24) |
(((gpr_uint32)p->reason_bytes[1]) << 16) |
(((gpr_uint32)p->reason_bytes[2]) << 8) |

@ -32,6 +32,7 @@
*/
#include "src/core/transport/chttp2/frame_settings.h"
#include "src/core/transport/chttp2/internal.h"
#include <string.h>
@ -137,7 +138,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
void *p, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last) {
void *p, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_settings_parser *parser = p;
const gpr_uint8 *cur = GPR_SLICE_START_PTR(slice);
const gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
@ -154,7 +155,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
if (is_last) {
memcpy(parser->target_settings, parser->incoming_settings,
GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
state->ack_settings = 1;
gpr_slice_buffer_add(&transport_parsing->qbuf, grpc_chttp2_settings_ack_create());
}
return GRPC_CHTTP2_PARSE_OK;
}
@ -220,11 +221,11 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
}
if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[parser->id] != parser->value) {
state->initial_window_update =
transport_parsing->initial_window_update =
(gpr_int64)parser->value -
parser->incoming_settings[parser->id];
gpr_log(GPR_DEBUG, "adding %d for initial_window change",
(int)state->initial_window_update);
(int)transport_parsing->initial_window_update);
}
parser->incoming_settings[parser->id] = parser->value;
if (grpc_http_trace) {

@ -32,6 +32,7 @@
*/
#include "src/core/transport/chttp2/frame_window_update.h"
#include "src/core/transport/chttp2/internal.h"
#include <grpc/support/log.h>
@ -73,7 +74,7 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame(
}
grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
void *parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
@ -92,7 +93,14 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
return GRPC_CHTTP2_CONNECTION_ERROR;
}
GPR_ASSERT(is_last);
state->window_update = p->amount;
if (transport_parsing->incoming_stream_id) {
if (stream_parsing) {
stream_parsing->outgoing_window_update += p->amount;
}
} else {
transport_parsing->outgoing_window_update += p->amount;
}
}
return GRPC_CHTTP2_PARSE_OK;

@ -32,6 +32,7 @@
*/
#include "src/core/transport/chttp2/hpack_parser.h"
#include "src/core/transport/chttp2/internal.h"
#include <stddef.h>
#include <string.h>
@ -1369,7 +1370,7 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
}
grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
void *hpack_parser, grpc_chttp2_parse_state *state, gpr_slice slice,
void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice,
int is_last) {
grpc_chttp2_hpack_parser *parser = hpack_parser;
if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice),
@ -1382,9 +1383,12 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
"end of header frame not aligned with a hpack record boundary");
return GRPC_CHTTP2_CONNECTION_ERROR;
}
state->metadata_boundary = parser->is_boundary;
state->end_of_stream = parser->is_eof;
state->need_flush_reads = parser->is_eof;
if (parser->is_boundary) {
grpc_chttp2_parsing_add_metadata_batch(transport_parsing, stream_parsing);
}
if (parser->is_eof) {
stream_parsing->received_close = 1;
}
parser->on_header = on_header_not_set;
parser->on_header_user_data = NULL;
parser->is_boundary = 0xde;

@ -162,10 +162,11 @@ typedef enum {
} grpc_chttp2_setting_set;
/* Outstanding ping request data */
typedef struct {
typedef struct grpc_chttp2_outstanding_ping {
gpr_uint8 id[8];
void (*cb)(void *user_data);
void *user_data;
grpc_iomgr_closure *on_recv;
struct grpc_chttp2_outstanding_ping *next;
struct grpc_chttp2_outstanding_ping *prev;
} grpc_chttp2_outstanding_ping;
typedef struct {
@ -181,10 +182,13 @@ typedef struct {
/** window available for us to send to peer */
gpr_uint32 outgoing_window;
/** window available for peer to send to us - updated after parse */
gpr_uint32 incoming_window;
/** how much window would we like to have for incoming_window */
gpr_uint32 connection_window_target;
/** is this transport a client? */
gpr_uint8 is_client;
/** are the local settings dirty and need to be sent? */
gpr_uint8 dirtied_local_settings;
/** have local settings been sent? */
@ -196,6 +200,9 @@ typedef struct {
/** last received stream id */
gpr_uint32 last_incoming_stream_id;
/** pings awaiting responses */
grpc_chttp2_outstanding_ping pings;
} grpc_chttp2_transport_global;
typedef struct {
@ -216,6 +223,9 @@ struct grpc_chttp2_transport_parsing {
/** was a goaway frame received? */
gpr_uint8 goaway_received;
/** initial window change */
gpr_int64 initial_window_update;
/** data to write later - after parsing */
gpr_slice_buffer qbuf;
/* metadata object cache */
@ -262,6 +272,11 @@ struct grpc_chttp2_transport_parsing {
grpc_status_code goaway_error;
gpr_uint32 goaway_last_stream_index;
gpr_slice goaway_text;
gpr_uint64 outgoing_window_update;
/** pings awaiting responses */
grpc_chttp2_outstanding_ping pings;
};
@ -306,9 +321,6 @@ struct grpc_chttp2_transport {
grpc_chttp2_stream_map stream_map;
/* pings */
grpc_chttp2_outstanding_ping *pings;
size_t ping_count;
size_t ping_capacity;
gpr_int64 ping_counter;
grpc_chttp2_transport_global global;
@ -339,6 +351,8 @@ typedef struct {
/** window available for us to send to peer */
gpr_int64 outgoing_window;
/** window available for peer to send to us - updated after parse */
gpr_uint32 incoming_window;
/** stream ops the transport user would like to send */
grpc_stream_op_buffer *outgoing_sopb;
/** when the application requests writes be closed, the write_closed is
@ -367,10 +381,16 @@ struct grpc_chttp2_stream_parsing {
gpr_uint8 incoming_window_changed;
/** saw an error on this stream during parsing (it should be cancelled) */
gpr_uint8 saw_error;
/** saw a rst_stream */
gpr_uint8 saw_rst_stream;
/** window available for peer to send to us */
gpr_uint32 incoming_window;
/** parsing state for data frames */
grpc_chttp2_data_parser data_parser;
/** reason give to rst_stream */
gpr_uint32 rst_stream_reason;
/* amount of window given */
gpr_uint64 outgoing_window_update;
/* incoming metadata */
grpc_linked_mdelem *incoming_metadata;
@ -440,6 +460,8 @@ void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transpor
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing);
#define GRPC_CHTTP2_FLOW_CTL_TRACE(a,b,c,d,e) do {} while (0)
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"

@ -275,14 +275,14 @@ static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
}
static grpc_chttp2_parse_error skip_parser(void *parser,
grpc_chttp2_parse_state *st,
grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing,
gpr_slice slice, int is_last) {
return GRPC_CHTTP2_PARSE_OK;
}
static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
static int init_skip_frame(grpc_chttp2_transport_parsing *transport_parsing, int is_header) {
static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsing, int is_header) {
if (is_header) {
int is_eoh = transport_parsing->expect_continuation_stream_id != 0;
transport_parsing->parser = grpc_chttp2_header_parser_parse;
@ -298,7 +298,7 @@ static int init_skip_frame(grpc_chttp2_transport_parsing *transport_parsing, int
}
static void become_skip_parser(grpc_chttp2_transport_parsing *transport_parsing) {
init_skip_frame(transport_parsing, transport_parsing->parser == grpc_chttp2_header_parser_parse);
init_skip_frame_parser(transport_parsing, transport_parsing->parser == grpc_chttp2_header_parser_parse);
}
static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) {
@ -329,7 +329,7 @@ static grpc_chttp2_parse_error update_incoming_window(grpc_chttp2_transport_pars
static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
grpc_chttp2_stream_parsing *stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id);
grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
if (!stream_parsing || stream_parsing->received_close) return init_skip_frame(transport_parsing, 0);
if (!stream_parsing || stream_parsing->received_close) return init_skip_frame_parser(transport_parsing, 0);
if (err == GRPC_CHTTP2_PARSE_OK) {
err = update_incoming_window(transport_parsing, stream_parsing);
}
@ -346,7 +346,7 @@ static int init_data_frame_parser(grpc_chttp2_transport_parsing *transport_parsi
case GRPC_CHTTP2_STREAM_ERROR:
stream_parsing->received_close = 1;
stream_parsing->saw_error = 1;
return init_skip_frame(transport_parsing, 0);
return init_skip_frame_parser(transport_parsing, 0);
case GRPC_CHTTP2_CONNECTION_ERROR:
return 0;
}
@ -421,7 +421,7 @@ static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_par
if (!stream_parsing) {
if (is_continuation) {
gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received");
return init_skip_frame(transport_parsing, 1);
return init_skip_frame_parser(transport_parsing, 1);
}
if (transport_parsing->is_client) {
if ((transport_parsing->incoming_stream_id & 1) &&
@ -430,22 +430,22 @@ static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_par
} else {
gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client");
}
return init_skip_frame(transport_parsing, 1);
return init_skip_frame_parser(transport_parsing, 1);
} else if (transport_parsing->last_incoming_stream_id > transport_parsing->incoming_stream_id) {
gpr_log(GPR_ERROR,
"ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream "
"id=%d, new grpc_chttp2_stream id=%d",
transport_parsing->last_incoming_stream_id, transport_parsing->incoming_stream_id);
return init_skip_frame(transport_parsing, 1);
return init_skip_frame_parser(transport_parsing, 1);
} else if ((transport_parsing->incoming_stream_id & 1) == 0) {
gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d",
transport_parsing->incoming_stream_id);
return init_skip_frame(transport_parsing, 1);
return init_skip_frame_parser(transport_parsing, 1);
}
stream_parsing = transport_parsing->incoming_stream = grpc_chttp2_parsing_accept_stream(transport_parsing, transport_parsing->incoming_stream_id);
if (!stream_parsing) {
gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted");
return init_skip_frame(transport_parsing, 1);
return init_skip_frame_parser(transport_parsing, 1);
}
} else {
transport_parsing->incoming_stream = stream_parsing;
@ -453,7 +453,7 @@ static int init_header_frame_parser(grpc_chttp2_transport_parsing *transport_par
if (stream_parsing->received_close) {
gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header");
transport_parsing->incoming_stream = NULL;
return init_skip_frame(transport_parsing, 1);
return init_skip_frame_parser(transport_parsing, 1);
}
transport_parsing->parser = grpc_chttp2_header_parser_parse;
transport_parsing->parser_data = &transport_parsing->hpack_parser;
@ -539,7 +539,7 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
}
*/
static void add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) {
void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) {
grpc_metadata_batch b;
b.list.head = NULL;
@ -555,15 +555,33 @@ static void add_metadata_batch(grpc_chttp2_transport_parsing *transport_parsing,
grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b);
}
static int parse_frame_slice(grpc_chttp2_transport_parsing *t, gpr_slice slice, int is_last) {
grpc_chttp2_parse_state st;
size_t i;
memset(&st, 0, sizeof(st));
switch (transport_parsing->parser(transport_parsing->parser_data, &st, slice, is_last)) {
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream;
switch (transport_parsing->parser(transport_parsing->parser_data, transport_parsing, stream_parsing, slice, is_last)) {
case GRPC_CHTTP2_PARSE_OK:
if (stream_parsing) {
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
}
return 1;
case GRPC_CHTTP2_STREAM_ERROR:
become_skip_parser(transport_parsing);
if (stream_parsing) {
stream_parsing->saw_error = 1;
}
return 1;
case GRPC_CHTTP2_CONNECTION_ERROR:
return 0;
}
gpr_log(GPR_ERROR, "should never reach here");
abort();
return 0;
}
#if 0
if (st.end_of_stream) {
transport_parsing->incoming_stream->read_closed = 1;
maybe_finish_read(t, transport_parsing->incoming_stream, 1);
@ -576,12 +594,8 @@ static int parse_frame_slice(grpc_chttp2_transport_parsing *t, gpr_slice slice,
maybe_finish_read(t, transport_parsing->incoming_stream, 1);
}
if (st.ack_settings) {
gpr_slice_buffer_add(&transport_parsing->qbuf, grpc_chttp2_settings_ack_create());
}
if (st.send_ping_ack) {
gpr_slice_buffer_add(
&transport_parsing->qbuf,
grpc_chttp2_ping_create(1, transport_parsing->simple.ping.opaque_8bytes));
}
if (st.goaway) {
add_goaway(t, st.goaway_error, st.goaway_text);
@ -624,19 +638,4 @@ static int parse_frame_slice(grpc_chttp2_transport_parsing *t, gpr_slice slice,
transport_parsing->global.outgoing_window_update += st.window_update;
}
}
return 1;
case GRPC_CHTTP2_STREAM_ERROR:
become_skip_parser(transport_parsing);
cancel_stream_id(
t, transport_parsing->incoming_stream_id,
grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR),
GRPC_CHTTP2_INTERNAL_ERROR, 1);
return 1;
case GRPC_CHTTP2_CONNECTION_ERROR:
drop_connection(transport_parsing);
return 0;
}
gpr_log(GPR_ERROR, "should never reach here");
abort();
return 0;
}
#endif

@ -39,7 +39,7 @@
static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status);
int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) {
int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;
gpr_uint32 window_delta;
@ -75,7 +75,7 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_constants *transpor
if (stream_global->write_state == WRITE_STATE_QUEUED_CLOSE &&
stream_global->outgoing_sopb->nops == 0) {
if (!transport_constants->is_client && !stream_global->read_closed) {
if (!transport_global->is_client && !stream_global->read_closed) {
stream_writing->send_closed = SEND_CLOSED_WITH_RST_STREAM;
} else {
stream_writing->send_closed = SEND_CLOSED;
@ -158,14 +158,14 @@ static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) {
grpc_chttp2_terminate_writing(transport_writing, write_status == GRPC_ENDPOINT_CB_OK);
}
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_constants *transport_constants, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) {
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_written_stream(transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->send_closed != DONT_SEND_CLOSED) {
stream_global->write_state = WRITE_STATE_SENT_CLOSE;
if (!transport_constants->is_client) {
if (!transport_global->is_client) {
stream_global->read_closed = 1;
}
grpc_chttp2_read_write_state_changed(transport_global, stream_global);

Loading…
Cancel
Save