Compiling, writing work

reviewable/pr8008/r2
Craig Tiller 9 years ago
parent 3d7c609444
commit 4e5b452147
  1. 154
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 33
      src/core/ext/transport/chttp2/transport/frame_data.c
  3. 11
      src/core/ext/transport/chttp2/transport/frame_goaway.c
  4. 15
      src/core/ext/transport/chttp2/transport/frame_ping.c
  5. 19
      src/core/ext/transport/chttp2/transport/frame_rst_stream.c
  6. 22
      src/core/ext/transport/chttp2/transport/frame_settings.c
  7. 35
      src/core/ext/transport/chttp2/transport/frame_window_update.c
  8. 29
      src/core/ext/transport/chttp2/transport/hpack_parser.c
  9. 42
      src/core/ext/transport/chttp2/transport/internal.h
  10. 610
      src/core/ext/transport/chttp2/transport/parsing.c
  11. 262
      src/core/ext/transport/chttp2/transport/stream_lists.c
  12. 557
      src/core/ext/transport/chttp2/transport/writing.c

@ -67,18 +67,20 @@ int grpc_flowctl_trace = 0;
static const grpc_transport_vtable vtable;
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *t,
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void write_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void write_action_end(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
static void read_action_begin(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void read_action_flush_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@ -224,12 +226,15 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_slice_buffer_init(&t->outbuf);
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
grpc_closure_init(&t->writing_action, writing_action, t);
grpc_closure_init(&t->reading_action, reading_action, t);
grpc_closure_init(&t->reading_action_locked, reading_action_locked, t);
grpc_closure_init(&t->terminate_writing, terminate_writing_with_lock, t);
grpc_closure_init(&t->initiate_read_flush_locked, initiate_read_flush_locked,
grpc_closure_init(&t->write_action_begin_locked, write_action_begin_locked,
t);
grpc_closure_init(&t->write_action, write_action, t);
grpc_closure_init(&t->write_action_end, write_action_end, t);
grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t);
grpc_closure_init(&t->read_action_begin, read_action_begin, t);
grpc_closure_init(&t->read_action_locked, read_action_locked, t);
grpc_closure_init(&t->read_action_flush_locked, read_action_flush_locked, t);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(&t->hpack_parser);
@ -538,27 +543,6 @@ grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,
return accepting;
}
/*******************************************************************************
* LOCK MANAGEMENT
*/
#if 0
static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
start_writing(exec_ctx, t);
}
#endif
static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
t->check_read_ops_scheduled = false;
check_read_ops(exec_ctx, t);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "initiate_read_flush_locked");
}
/*******************************************************************************
* OUTPUT PROCESSING
*/
@ -572,7 +556,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_WRITE_STATE_IDLE:
t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner, &t->writing_action,
grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked,
GRPC_ERROR_NONE, false);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
@ -594,37 +579,39 @@ void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
}
}
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_BEGIN("start_writing", 0);
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("write_action_begin_locked", 0);
grpc_chttp2_transport *t = gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) {
prevent_endpoint_shutdown(t);
grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL);
} else {
t->write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
}
GPR_TIMER_END("start_writing", 0);
GPR_TIMER_END("write_action_begin_locked", 0);
}
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_setting_id id, uint32_t value) {
const grpc_chttp2_setting_parameters *sp =
&grpc_chttp2_settings_parameters[id];
uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
if (use_value != value) {
gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
value, use_value);
}
if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->dirtied_local_settings = 1;
grpc_chttp2_initiate_write(exec_ctx, t, false, "push_setting");
}
static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
grpc_chttp2_transport *t = gt;
GPR_TIMER_BEGIN("write_action", 0);
grpc_endpoint_write(exec_ctx, t->ep, &t->outbuf, &t->write_action_end);
GPR_TIMER_END("write_action", 0);
}
static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
static void write_action_end(grpc_exec_ctx *exec_ctx, void *gt,
grpc_error *error) {
grpc_chttp2_transport *t = gt;
GPR_TIMER_BEGIN("write_action_end", 0);
grpc_combiner_execute(exec_ctx, t->combiner, &t->write_action_end_locked,
GRPC_ERROR_REF(error));
GPR_TIMER_END("write_action_end", 0);
}
static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
GPR_TIMER_BEGIN("terminate_writing_with_lock", 0);
grpc_chttp2_transport *t = tp;
allow_endpoint_shutdown_locked(exec_ctx, t);
@ -646,7 +633,7 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
t->write_state = GRPC_CHTTP2_WRITE_STATE_WRITING;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner, &t->writing_action,
grpc_combiner_execute_finally(exec_ctx, t->combiner, &t->write_action,
GRPC_ERROR_NONE, false);
break;
}
@ -655,21 +642,20 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("terminate_writing_with_lock", 0);
}
static void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, void *gt,
grpc_error *error) {
GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0);
grpc_chttp2_transport *t = gt;
grpc_combiner_execute(exec_ctx, t->combiner, &t->terminate_writing,
GRPC_ERROR_REF(error));
GPR_TIMER_END("grpc_chttp2_terminate_writing", 0);
}
static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
grpc_error *error) {
grpc_chttp2_transport *t = gt;
GPR_TIMER_BEGIN("writing_action", 0);
grpc_endpoint_write(exec_ctx, t->ep, &t->outbuf, &t->writing_done_action);
GPR_TIMER_END("writing_action", 0);
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_setting_id id, uint32_t value) {
const grpc_chttp2_setting_parameters *sp =
&grpc_chttp2_settings_parameters[id];
uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
if (use_value != value) {
gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
value, use_value);
}
if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->dirtied_local_settings = 1;
grpc_chttp2_initiate_write(exec_ctx, t, false, "push_setting");
}
}
void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx,
@ -1091,6 +1077,14 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* INPUT PROCESSING - GENERAL
*/
static void read_action_flush_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
t->check_read_ops_scheduled = false;
check_read_ops(exec_ctx, t);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "initiate_read_flush_locked");
}
static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_BEGIN("check_read_ops", 0);
grpc_chttp2_stream *s;
@ -1529,15 +1523,15 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
* INPUT PROCESSING - PARSING
*/
static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
static void read_action_begin(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
/* Control flow:
reading_action_locked ->
(parse_unlocked -> post_parse_locked)? ->
post_reading_action_locked */
GPR_TIMER_BEGIN("reading_action", 0);
grpc_chttp2_transport *t = tp;
grpc_combiner_execute(exec_ctx, t->combiner, &t->reading_action_locked,
grpc_combiner_execute(exec_ctx, t->combiner, &t->read_action_locked,
GRPC_ERROR_REF(error));
GPR_TIMER_END("reading_action", 0);
}
@ -1569,8 +1563,8 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
return error;
}
static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
GPR_TIMER_BEGIN("reading_action_locked", 0);
grpc_chttp2_transport *t = tp;
@ -1633,7 +1627,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action);
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_begin);
allow_endpoint_shutdown_locked(exec_ctx, t);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
@ -2011,5 +2005,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
gpr_slice_buffer_move_into(read_buffer, &t->read_buffer);
gpr_free(read_buffer);
}
reading_action(exec_ctx, t, GRPC_ERROR_NONE);
read_action_begin(exec_ctx, t, GRPC_ERROR_NONE);
}

@ -141,10 +141,10 @@ void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf,
stats->data_bytes += write_bytes;
}
grpc_error *grpc_chttp2_data_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
gpr_slice slice, int is_last) {
uint8_t *const beg = GPR_SLICE_START_PTR(slice);
uint8_t *const end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
@ -154,8 +154,8 @@ grpc_error *grpc_chttp2_data_parser_parse(
char *msg;
if (is_last && p->is_last_frame) {
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
true, false, GRPC_ERROR_NONE);
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
}
if (cur == end) {
@ -168,7 +168,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
return GRPC_ERROR_REF(p->error);
fh_0:
case GRPC_CHTTP2_DATA_FH_0:
stream_global->stats.incoming.framing_bytes++;
s->stats.incoming.framing_bytes++;
p->frame_type = *cur;
switch (p->frame_type) {
case 0:
@ -181,7 +181,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
p->error = GRPC_ERROR_CREATE(msg);
p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
(intptr_t)stream_global->id);
(intptr_t)s->id);
gpr_free(msg);
msg = gpr_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
p->error =
@ -198,7 +198,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_1:
stream_global->stats.incoming.framing_bytes++;
s->stats.incoming.framing_bytes++;
p->frame_size = ((uint32_t)*cur) << 24;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_2;
@ -206,7 +206,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_2:
stream_global->stats.incoming.framing_bytes++;
s->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur) << 16;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_3;
@ -214,7 +214,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_3:
stream_global->stats.incoming.framing_bytes++;
s->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur) << 8;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_4;
@ -222,7 +222,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_4:
stream_global->stats.incoming.framing_bytes++;
s->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
@ -231,8 +231,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
p->parsing_frame = incoming_byte_stream =
grpc_chttp2_incoming_byte_stream_create(exec_ctx, transport_global,
stream_global, p->frame_size,
grpc_chttp2_incoming_byte_stream_create(exec_ctx, t, s, p->frame_size,
message_flags);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
@ -241,7 +240,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
}
uint32_t remaining = (uint32_t)(end - cur);
if (remaining == p->frame_size) {
stream_global->stats.incoming.data_bytes += p->frame_size;
s->stats.incoming.data_bytes += p->frame_size;
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
@ -251,7 +250,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_ERROR_NONE;
} else if (remaining > p->frame_size) {
stream_global->stats.incoming.data_bytes += p->frame_size;
s->stats.incoming.data_bytes += p->frame_size;
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg),
@ -267,7 +266,7 @@ grpc_error *grpc_chttp2_data_parser_parse(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
p->frame_size -= remaining;
stream_global->stats.incoming.data_bytes += remaining;
s->stats.incoming.data_bytes += remaining;
return GRPC_ERROR_NONE;
}
}

@ -67,10 +67,11 @@ grpc_error *grpc_chttp2_goaway_parser_begin_frame(grpc_chttp2_goaway_parser *p,
return GRPC_ERROR_NONE;
}
grpc_error *grpc_chttp2_goaway_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
grpc_error *grpc_chttp2_goaway_parser_parse(grpc_exec_ctx *exec_ctx,
void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
gpr_slice slice, int is_last) {
uint8_t *const beg = GPR_SLICE_START_PTR(slice);
uint8_t *const end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
@ -149,7 +150,7 @@ grpc_error *grpc_chttp2_goaway_parser_parse(
p->state = GRPC_CHTTP2_GOAWAY_DEBUG;
if (is_last) {
grpc_chttp2_add_incoming_goaway(
exec_ctx, transport_global, (uint32_t)p->error_code,
exec_ctx, t, (uint32_t)p->error_code,
gpr_slice_new(p->debug_data, p->debug_length, gpr_free));
p->debug_data = NULL;
}

@ -73,10 +73,10 @@ grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
return GRPC_ERROR_NONE;
}
grpc_error *grpc_chttp2_ping_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
gpr_slice slice, int is_last) {
uint8_t *const beg = GPR_SLICE_START_PTR(slice);
uint8_t *const end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
@ -91,12 +91,11 @@ grpc_error *grpc_chttp2_ping_parser_parse(
if (p->byte == 8) {
GPR_ASSERT(is_last);
if (p->is_ack) {
grpc_chttp2_ack_ping(exec_ctx, transport_global, p->opaque_8bytes);
grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes);
} else {
gpr_slice_buffer_add(&transport_global->qbuf,
gpr_slice_buffer_add(&t->qbuf,
grpc_chttp2_ping_create(1, p->opaque_8bytes));
grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
"ping response");
grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response");
}
}

@ -85,10 +85,11 @@ grpc_error *grpc_chttp2_rst_stream_parser_begin_frame(
return GRPC_ERROR_NONE;
}
grpc_error *grpc_chttp2_rst_stream_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx,
void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
gpr_slice slice, int is_last) {
uint8_t *const beg = GPR_SLICE_START_PTR(slice);
uint8_t *const end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
@ -99,7 +100,7 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(
cur++;
p->byte++;
}
stream_global->stats.incoming.framing_bytes += (uint64_t)(end - cur);
s->stats.incoming.framing_bytes += (uint64_t)(end - cur);
if (p->byte == 4) {
GPR_ASSERT(is_last);
@ -112,17 +113,15 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(
error = grpc_error_set_int(GRPC_ERROR_CREATE("RST_STREAM"),
GRPC_ERROR_INT_HTTP2_ERROR, reason);
grpc_status_code status_code = grpc_chttp2_http2_error_to_grpc_status(
(grpc_chttp2_error_code)reason, stream_global->deadline);
(grpc_chttp2_error_code)reason, s->deadline);
char *status_details;
gpr_asprintf(&status_details, "Received RST_STREAM with error code %d",
reason);
gpr_slice slice_details = gpr_slice_from_copied_string(status_details);
gpr_free(status_details);
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global,
status_code, &slice_details);
grpc_chttp2_fake_status(exec_ctx, t, s, status_code, &slice_details);
}
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
true, true, error);
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, error);
}
return GRPC_ERROR_NONE;

@ -143,10 +143,10 @@ grpc_error *grpc_chttp2_settings_parser_begin_frame(
}
}
grpc_error *grpc_chttp2_settings_parser_parse(
grpc_exec_ctx *exec_ctx, void *p,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
gpr_slice slice, int is_last) {
grpc_chttp2_settings_parser *parser = p;
const uint8_t *cur = GPR_SLICE_START_PTR(slice);
const uint8_t *end = GPR_SLICE_END_PTR(slice);
@ -164,8 +164,7 @@ grpc_error *grpc_chttp2_settings_parser_parse(
if (is_last) {
memcpy(parser->target_settings, parser->incoming_settings,
GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t));
gpr_slice_buffer_add(&transport_global->qbuf,
grpc_chttp2_settings_ack_create());
gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
}
return GRPC_ERROR_NONE;
}
@ -225,9 +224,9 @@ grpc_error *grpc_chttp2_settings_parser_parse(
break;
case GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE:
grpc_chttp2_goaway_append(
transport_global->last_incoming_stream_id, sp->error_value,
t->last_incoming_stream_id, sp->error_value,
gpr_slice_from_static_string("HTTP2 settings error"),
&transport_global->qbuf);
&t->qbuf);
gpr_asprintf(&msg, "invalid value %u passed for %s",
parser->value, sp->name);
grpc_error *err = GRPC_ERROR_CREATE(msg);
@ -237,18 +236,17 @@ grpc_error *grpc_chttp2_settings_parser_parse(
}
if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[parser->id] != parser->value) {
transport_global->initial_window_update =
t->initial_window_update =
(int64_t)parser->value - parser->incoming_settings[parser->id];
if (grpc_http_trace) {
gpr_log(GPR_DEBUG, "adding %d for initial_window change",
(int)transport_global->initial_window_update);
(int)t->initial_window_update);
}
}
parser->incoming_settings[parser->id] = parser->value;
if (grpc_http_trace) {
gpr_log(GPR_DEBUG, "CHTTP2:%s: got setting %d = %d",
transport_global->is_client ? "CLI" : "SVR", parser->id,
parser->value);
t->is_client ? "CLI" : "SVR", parser->id, parser->value);
}
} else if (grpc_http_trace) {
gpr_log(GPR_ERROR, "CHTTP2: Ignoring unknown setting %d (value %d)",

@ -80,9 +80,8 @@ grpc_error *grpc_chttp2_window_update_parser_begin_frame(
}
grpc_error *grpc_chttp2_window_update_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, gpr_slice slice, int is_last) {
uint8_t *const beg = GPR_SLICE_START_PTR(slice);
uint8_t *const end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
@ -94,8 +93,8 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
p->byte++;
}
if (stream_global != NULL) {
stream_global->stats.incoming.framing_bytes += (uint32_t)(end - cur);
if (s != NULL) {
s->stats.incoming.framing_bytes += (uint32_t)(end - cur);
}
if (p->byte == 4) {
@ -109,24 +108,24 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
}
GPR_ASSERT(is_last);
if (transport_global->incoming_stream_id != 0) {
if (stream_global != NULL) {
bool was_zero = stream_global->outgoing_window <= 0;
GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", transport_global, stream_global,
outgoing_window, received_update);
bool is_zero = stream_global->outgoing_window <= 0;
if (t->incoming_stream_id != 0) {
if (s != NULL) {
bool was_zero = s->outgoing_window <= 0;
GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window,
received_update);
bool is_zero = s->outgoing_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
false, "stream.read_flow_control");
grpc_chttp2_become_writable(exec_ctx, t, s, false,
"stream.read_flow_control");
}
}
} else {
bool was_zero = transport_global->outgoing_window <= 0;
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", transport_global,
outgoing_window, received_update);
bool is_zero = transport_global->outgoing_window <= 0;
bool was_zero = t->outgoing_window <= 0;
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", t, outgoing_window,
received_update);
bool is_zero = t->outgoing_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
grpc_chttp2_initiate_write(exec_ctx, t, false,
"new_global_flow_control");
}
}

@ -1571,14 +1571,15 @@ grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx,
return p->state(exec_ctx, p, beg, end);
}
grpc_error *grpc_chttp2_header_parser_parse(
grpc_exec_ctx *exec_ctx, void *hpack_parser,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, gpr_slice slice, int is_last) {
grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx,
void *hpack_parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
gpr_slice slice, int is_last) {
grpc_chttp2_hpack_parser *parser = hpack_parser;
GPR_TIMER_BEGIN("grpc_chttp2_hpack_parser_parse", 0);
if (stream_global != NULL) {
stream_global->stats.incoming.header_bytes += GPR_SLICE_LENGTH(slice);
if (s != NULL) {
s->stats.incoming.header_bytes += GPR_SLICE_LENGTH(slice);
}
grpc_error *error = grpc_chttp2_hpack_parser_parse(
exec_ctx, parser, GPR_SLICE_START_PTR(slice), GPR_SLICE_END_PTR(slice));
@ -1594,21 +1595,17 @@ grpc_error *grpc_chttp2_header_parser_parse(
}
/* need to check for null stream: this can occur if we receive an invalid
stream id on a header */
if (stream_global != NULL) {
if (s != NULL) {
if (parser->is_boundary) {
if (stream_global->header_frames_received ==
GPR_ARRAY_SIZE(stream_global->metadata_buffer)) {
if (s->header_frames_received == GPR_ARRAY_SIZE(s->metadata_buffer)) {
return GRPC_ERROR_CREATE("Too many trailer frames");
}
stream_global
->published_metadata[stream_global->header_frames_received] = true;
stream_global->header_frames_received++;
grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
stream_global);
s->published_metadata[s->header_frames_received] = true;
s->header_frames_received++;
grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s);
}
if (parser->is_eof) {
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global,
stream_global, true, false,
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
}
}

@ -206,16 +206,14 @@ struct grpc_chttp2_transport {
/** maps stream id to grpc_chttp2_stream objects */
grpc_chttp2_stream_map stream_map;
/** closure to execute writing */
grpc_closure writing_action;
grpc_closure writing_done_action;
/** closure to finish writing */
grpc_closure terminate_writing;
/** closure to start reading from the endpoint */
grpc_closure reading_action;
grpc_closure reading_action_locked;
/** closure to flush read state up the stack */
grpc_closure initiate_read_flush_locked;
grpc_closure write_action_begin_locked;
grpc_closure write_action;
grpc_closure write_action_end;
grpc_closure write_action_end_locked;
grpc_closure read_action_begin;
grpc_closure read_action_locked;
grpc_closure read_action_flush_locked;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@ -319,8 +317,24 @@ struct grpc_chttp2_transport {
grpc_status_code goaway_error;
uint32_t goaway_last_stream_index;
gpr_slice goaway_text;
/* closures to finish after writing */
grpc_closure **finish_after_writing;
size_t finish_after_writing_count;
size_t finish_after_writing_capacity;
};
typedef enum {
GRPC_CHTTP2_CALL_WHEN_SCHEDULED,
GRPC_CHTTP2_CALL_WHEN_WRITTEN,
} grpc_chttp2_call_write_cb_when;
typedef struct grpc_chttp2_write_cb {
size_t call_at_byte;
grpc_closure *closure;
grpc_chttp2_call_write_cb_when when;
} grpc_chttp2_write_cb;
struct grpc_chttp2_stream {
grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
@ -350,11 +364,11 @@ struct grpc_chttp2_stream {
/** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata;
grpc_closure *send_initial_metadata_finished;
grpc_byte_stream *send_message;
grpc_closure *send_message_finished;
grpc_metadata_batch *send_trailing_metadata;
grpc_closure *send_trailing_metadata_finished;
grpc_byte_stream *fetching_send_message;
grpc_metadata_batch *recv_initial_metadata;
grpc_closure *recv_initial_metadata_ready;
grpc_byte_stream **recv_message;
@ -416,6 +430,10 @@ struct grpc_chttp2_stream {
gpr_slice fetching_slice;
size_t stream_fetched;
grpc_closure finished_fetch;
grpc_chttp2_write_cb *write_cbs;
size_t write_cb_count;
size_t write_cb_capacity;
};
/** Transport writing call flow:

@ -45,34 +45,34 @@
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/timeout_encoding.h"
static grpc_error *init_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
static grpc_error *init_header_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
int is_continuation);
static grpc_error *init_data_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
static grpc_error *init_rst_stream_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
static grpc_error *init_settings_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
static grpc_error *init_window_update_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
static grpc_error *init_ping_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
static grpc_error *init_goaway_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
static grpc_error *init_skip_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
int is_header);
static grpc_error *parse_frame_slice(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
gpr_slice slice, int is_last);
grpc_error *grpc_chttp2_perform_read(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
gpr_slice slice) {
static grpc_error *init_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
int is_continuation);
static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
static grpc_error *init_rst_stream_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
static grpc_error *init_settings_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
static grpc_error *init_window_update_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
static grpc_error *init_ping_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
static grpc_error *init_goaway_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
static grpc_error *init_skip_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
int is_header);
static grpc_error *parse_frame_slice(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, gpr_slice slice,
int is_last);
grpc_error *grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
gpr_slice slice) {
uint8_t *beg = GPR_SLICE_START_PTR(slice);
uint8_t *end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
@ -80,7 +80,7 @@ grpc_error *grpc_chttp2_perform_read(
if (cur == end) return GRPC_ERROR_NONE;
switch (transport_global->deframe_state) {
switch (t->deframe_state) {
case GRPC_DTS_CLIENT_PREFIX_0:
case GRPC_DTS_CLIENT_PREFIX_1:
case GRPC_DTS_CLIENT_PREFIX_2:
@ -105,25 +105,22 @@ grpc_error *grpc_chttp2_perform_read(
case GRPC_DTS_CLIENT_PREFIX_21:
case GRPC_DTS_CLIENT_PREFIX_22:
case GRPC_DTS_CLIENT_PREFIX_23:
while (cur != end && transport_global->deframe_state != GRPC_DTS_FH_0) {
if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_global
->deframe_state]) {
while (cur != end && t->deframe_state != GRPC_DTS_FH_0) {
if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state]) {
char *msg;
gpr_asprintf(
&msg,
"Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
"at byte %d",
GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_global
->deframe_state],
(int)(uint8_t)GRPC_CHTTP2_CLIENT_CONNECT_STRING
[transport_global->deframe_state],
*cur, (int)*cur, transport_global->deframe_state);
GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state],
(int)(uint8_t)GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state],
*cur, (int)*cur, t->deframe_state);
err = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return err;
}
++cur;
++transport_global->deframe_state;
++t->deframe_state;
}
if (cur == end) {
return GRPC_ERROR_NONE;
@ -132,104 +129,99 @@ grpc_error *grpc_chttp2_perform_read(
dts_fh_0:
case GRPC_DTS_FH_0:
GPR_ASSERT(cur < end);
transport_global->incoming_frame_size = ((uint32_t)*cur) << 16;
t->incoming_frame_size = ((uint32_t)*cur) << 16;
if (++cur == end) {
transport_global->deframe_state = GRPC_DTS_FH_1;
t->deframe_state = GRPC_DTS_FH_1;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_1:
GPR_ASSERT(cur < end);
transport_global->incoming_frame_size |= ((uint32_t)*cur) << 8;
t->incoming_frame_size |= ((uint32_t)*cur) << 8;
if (++cur == end) {
transport_global->deframe_state = GRPC_DTS_FH_2;
t->deframe_state = GRPC_DTS_FH_2;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_2:
GPR_ASSERT(cur < end);
transport_global->incoming_frame_size |= *cur;
t->incoming_frame_size |= *cur;
if (++cur == end) {
transport_global->deframe_state = GRPC_DTS_FH_3;
t->deframe_state = GRPC_DTS_FH_3;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_3:
GPR_ASSERT(cur < end);
transport_global->incoming_frame_type = *cur;
t->incoming_frame_type = *cur;
if (++cur == end) {
transport_global->deframe_state = GRPC_DTS_FH_4;
t->deframe_state = GRPC_DTS_FH_4;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_4:
GPR_ASSERT(cur < end);
transport_global->incoming_frame_flags = *cur;
t->incoming_frame_flags = *cur;
if (++cur == end) {
transport_global->deframe_state = GRPC_DTS_FH_5;
t->deframe_state = GRPC_DTS_FH_5;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_5:
GPR_ASSERT(cur < end);
transport_global->incoming_stream_id = (((uint32_t)*cur) & 0x7f) << 24;
t->incoming_stream_id = (((uint32_t)*cur) & 0x7f) << 24;
if (++cur == end) {
transport_global->deframe_state = GRPC_DTS_FH_6;
t->deframe_state = GRPC_DTS_FH_6;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_6:
GPR_ASSERT(cur < end);
transport_global->incoming_stream_id |= ((uint32_t)*cur) << 16;
t->incoming_stream_id |= ((uint32_t)*cur) << 16;
if (++cur == end) {
transport_global->deframe_state = GRPC_DTS_FH_7;
t->deframe_state = GRPC_DTS_FH_7;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_7:
GPR_ASSERT(cur < end);
transport_global->incoming_stream_id |= ((uint32_t)*cur) << 8;
t->incoming_stream_id |= ((uint32_t)*cur) << 8;
if (++cur == end) {
transport_global->deframe_state = GRPC_DTS_FH_8;
t->deframe_state = GRPC_DTS_FH_8;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_8:
GPR_ASSERT(cur < end);
transport_global->incoming_stream_id |= ((uint32_t)*cur);
transport_global->deframe_state = GRPC_DTS_FRAME;
err = init_frame_parser(exec_ctx, transport_global);
t->incoming_stream_id |= ((uint32_t)*cur);
t->deframe_state = GRPC_DTS_FRAME;
err = init_frame_parser(exec_ctx, t);
if (err != GRPC_ERROR_NONE) {
return err;
}
if (transport_global->incoming_stream_id != 0 &&
transport_global->incoming_stream_id >
transport_global->last_incoming_stream_id) {
transport_global->last_incoming_stream_id =
transport_global->incoming_stream_id;
if (t->incoming_stream_id != 0 &&
t->incoming_stream_id > t->last_incoming_stream_id) {
t->last_incoming_stream_id = t->incoming_stream_id;
}
if (transport_global->incoming_frame_size == 0) {
err =
parse_frame_slice(exec_ctx, transport_global, gpr_empty_slice(), 1);
if (t->incoming_frame_size == 0) {
err = parse_frame_slice(exec_ctx, t, gpr_empty_slice(), 1);
if (err != GRPC_ERROR_NONE) {
return err;
}
transport_global->incoming_stream = NULL;
t->incoming_stream = NULL;
if (++cur == end) {
transport_global->deframe_state = GRPC_DTS_FH_0;
t->deframe_state = GRPC_DTS_FH_0;
return GRPC_ERROR_NONE;
}
goto dts_fh_0; /* loop */
} else if (transport_global->incoming_frame_size >
transport_global
->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) {
} else if (t->incoming_frame_size >
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) {
char *msg;
gpr_asprintf(
&msg, "Frame size %d is larger than max frame size %d",
transport_global->incoming_frame_size,
transport_global->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]);
gpr_asprintf(&msg, "Frame size %d is larger than max frame size %d",
t->incoming_frame_size,
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]);
err = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return err;
@ -240,41 +232,39 @@ grpc_error *grpc_chttp2_perform_read(
/* fallthrough */
case GRPC_DTS_FRAME:
GPR_ASSERT(cur < end);
if ((uint32_t)(end - cur) == transport_global->incoming_frame_size) {
err = parse_frame_slice(exec_ctx, transport_global,
if ((uint32_t)(end - cur) == t->incoming_frame_size) {
err = parse_frame_slice(exec_ctx, t,
gpr_slice_sub_no_ref(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
1);
if (err != GRPC_ERROR_NONE) {
return err;
}
transport_global->deframe_state = GRPC_DTS_FH_0;
transport_global->incoming_stream = NULL;
t->deframe_state = GRPC_DTS_FH_0;
t->incoming_stream = NULL;
return GRPC_ERROR_NONE;
} else if ((uint32_t)(end - cur) >
transport_global->incoming_frame_size) {
} else if ((uint32_t)(end - cur) > t->incoming_frame_size) {
size_t cur_offset = (size_t)(cur - beg);
err = parse_frame_slice(
exec_ctx, transport_global,
gpr_slice_sub_no_ref(
slice, cur_offset,
cur_offset + transport_global->incoming_frame_size),
exec_ctx, t,
gpr_slice_sub_no_ref(slice, cur_offset,
cur_offset + t->incoming_frame_size),
1);
if (err != GRPC_ERROR_NONE) {
return err;
}
cur += transport_global->incoming_frame_size;
transport_global->incoming_stream = NULL;
cur += t->incoming_frame_size;
t->incoming_stream = NULL;
goto dts_fh_0; /* loop */
} else {
err = parse_frame_slice(exec_ctx, transport_global,
err = parse_frame_slice(exec_ctx, t,
gpr_slice_sub_no_ref(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
0);
if (err != GRPC_ERROR_NONE) {
return err;
}
transport_global->incoming_frame_size -= (uint32_t)(end - cur);
t->incoming_frame_size -= (uint32_t)(end - cur);
return GRPC_ERROR_NONE;
}
GPR_UNREACHABLE_CODE(return 0);
@ -283,73 +273,68 @@ grpc_error *grpc_chttp2_perform_read(
GPR_UNREACHABLE_CODE(return 0);
}
static grpc_error *init_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
if (transport_global->is_first_frame &&
transport_global->incoming_frame_type != GRPC_CHTTP2_FRAME_SETTINGS) {
static grpc_error *init_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
if (t->is_first_frame &&
t->incoming_frame_type != GRPC_CHTTP2_FRAME_SETTINGS) {
char *msg;
gpr_asprintf(
&msg, "Expected SETTINGS frame as the first frame, got frame type %d",
transport_global->incoming_frame_type);
t->incoming_frame_type);
grpc_error *err = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return err;
}
transport_global->is_first_frame = false;
if (transport_global->expect_continuation_stream_id != 0) {
if (transport_global->incoming_frame_type !=
GRPC_CHTTP2_FRAME_CONTINUATION) {
t->is_first_frame = false;
if (t->expect_continuation_stream_id != 0) {
if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
char *msg;
gpr_asprintf(&msg, "Expected CONTINUATION frame, got frame type %02x",
transport_global->incoming_frame_type);
t->incoming_frame_type);
grpc_error *err = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return err;
}
if (transport_global->expect_continuation_stream_id !=
transport_global->incoming_stream_id) {
if (t->expect_continuation_stream_id != t->incoming_stream_id) {
char *msg;
gpr_asprintf(
&msg,
"Expected CONTINUATION frame for grpc_chttp2_stream %08x, got "
"grpc_chttp2_stream %08x",
transport_global->expect_continuation_stream_id,
transport_global->incoming_stream_id);
t->expect_continuation_stream_id, t->incoming_stream_id);
grpc_error *err = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return err;
}
return init_header_frame_parser(exec_ctx, transport_global, 1);
return init_header_frame_parser(exec_ctx, t, 1);
}
switch (transport_global->incoming_frame_type) {
switch (t->incoming_frame_type) {
case GRPC_CHTTP2_FRAME_DATA:
return init_data_frame_parser(exec_ctx, transport_global);
return init_data_frame_parser(exec_ctx, t);
case GRPC_CHTTP2_FRAME_HEADER:
return init_header_frame_parser(exec_ctx, transport_global, 0);
return init_header_frame_parser(exec_ctx, t, 0);
case GRPC_CHTTP2_FRAME_CONTINUATION:
return GRPC_ERROR_CREATE("Unexpected CONTINUATION frame");
case GRPC_CHTTP2_FRAME_RST_STREAM:
return init_rst_stream_parser(exec_ctx, transport_global);
return init_rst_stream_parser(exec_ctx, t);
case GRPC_CHTTP2_FRAME_SETTINGS:
return init_settings_frame_parser(exec_ctx, transport_global);
return init_settings_frame_parser(exec_ctx, t);
case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
return init_window_update_frame_parser(exec_ctx, transport_global);
return init_window_update_frame_parser(exec_ctx, t);
case GRPC_CHTTP2_FRAME_PING:
return init_ping_parser(exec_ctx, transport_global);
return init_ping_parser(exec_ctx, t);
case GRPC_CHTTP2_FRAME_GOAWAY:
return init_goaway_parser(exec_ctx, transport_global);
return init_goaway_parser(exec_ctx, t);
default:
if (grpc_http_trace) {
gpr_log(GPR_ERROR, "Unknown frame type %02x",
transport_global->incoming_frame_type);
gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type);
}
return init_skip_frame_parser(exec_ctx, transport_global, 0);
return init_skip_frame_parser(exec_ctx, t, 0);
}
}
static grpc_error *skip_parser(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
gpr_slice slice, int is_last) {
return GRPC_ERROR_NONE;
}
@ -358,101 +343,94 @@ static void skip_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_mdelem *md) {
GRPC_MDELEM_UNREF(md);
}
static grpc_error *init_skip_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
int is_header) {
static grpc_error *init_skip_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
int is_header) {
if (is_header) {
uint8_t is_eoh = transport_global->expect_continuation_stream_id != 0;
transport_global->parser = grpc_chttp2_header_parser_parse;
transport_global->parser_data = &transport_global->hpack_parser;
transport_global->hpack_parser.on_header = skip_header;
transport_global->hpack_parser.on_header_user_data = NULL;
transport_global->hpack_parser.is_boundary = is_eoh;
transport_global->hpack_parser.is_eof =
(uint8_t)(is_eoh ? transport_global->header_eof : 0);
uint8_t is_eoh = t->expect_continuation_stream_id != 0;
t->parser = grpc_chttp2_header_parser_parse;
t->parser_data = &t->hpack_parser;
t->hpack_parser.on_header = skip_header;
t->hpack_parser.on_header_user_data = NULL;
t->hpack_parser.is_boundary = is_eoh;
t->hpack_parser.is_eof = (uint8_t)(is_eoh ? t->header_eof : 0);
} else {
transport_global->parser = skip_parser;
t->parser = skip_parser;
}
return GRPC_ERROR_NONE;
}
void grpc_chttp2_parsing_become_skip_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
init_skip_frame_parser(
exec_ctx, transport_global,
transport_global->parser == grpc_chttp2_header_parser_parse);
void grpc_chttp2_parsing_become_skip_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
init_skip_frame_parser(exec_ctx, t,
t->parser == grpc_chttp2_header_parser_parse);
}
static grpc_error *update_incoming_window(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
uint32_t incoming_frame_size = transport_global->incoming_frame_size;
if (incoming_frame_size > transport_global->incoming_window) {
static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
uint32_t incoming_frame_size = t->incoming_frame_size;
if (incoming_frame_size > t->incoming_window) {
char *msg;
gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64,
transport_global->incoming_frame_size,
transport_global->incoming_window);
t->incoming_frame_size, t->incoming_window);
grpc_error *err = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return err;
}
if (incoming_frame_size > stream_global->incoming_window) {
if (incoming_frame_size > s->incoming_window) {
char *msg;
gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64,
transport_global->incoming_frame_size,
stream_global->incoming_window);
t->incoming_frame_size, s->incoming_window);
grpc_error *err = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return err;
}
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", transport_global, incoming_window,
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", transport_global, stream_global,
incoming_window, incoming_frame_size);
stream_global->received_bytes += incoming_frame_size;
stream_global->max_recv_bytes -=
(uint32_t)GPR_MIN(stream_global->max_recv_bytes, incoming_frame_size);
GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window,
incoming_frame_size);
s->received_bytes += incoming_frame_size;
s->max_recv_bytes -=
(uint32_t)GPR_MIN(s->max_recv_bytes, incoming_frame_size);
return GRPC_ERROR_NONE;
}
static grpc_error *init_data_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
grpc_chttp2_stream_global *stream_global = grpc_chttp2_parsing_lookup_stream(
transport_global, transport_global->incoming_stream_id);
static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_stream *s =
grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
grpc_error *err = GRPC_ERROR_NONE;
if (stream_global == NULL) {
return init_skip_frame_parser(exec_ctx, transport_global, 0);
if (s == NULL) {
return init_skip_frame_parser(exec_ctx, t, 0);
}
stream_global->stats.incoming.framing_bytes += 9;
if (stream_global->read_closed) {
return init_skip_frame_parser(exec_ctx, transport_global, 0);
s->stats.incoming.framing_bytes += 9;
if (s->read_closed) {
return init_skip_frame_parser(exec_ctx, t, 0);
}
if (err == GRPC_ERROR_NONE) {
err = update_incoming_window(exec_ctx, transport_global, stream_global);
err = update_incoming_window(exec_ctx, t, s);
}
if (err == GRPC_ERROR_NONE) {
err = grpc_chttp2_data_parser_begin_frame(
&stream_global->data_parser, transport_global->incoming_frame_flags,
stream_global->id);
err = grpc_chttp2_data_parser_begin_frame(&s->data_parser,
t->incoming_frame_flags, s->id);
}
if (err == GRPC_ERROR_NONE) {
transport_global->incoming_stream = stream_global;
transport_global->parser = grpc_chttp2_data_parser_parse;
transport_global->parser_data = &stream_global->data_parser;
t->incoming_stream = s;
t->parser = grpc_chttp2_data_parser_parse;
t->parser_data = &s->data_parser;
return GRPC_ERROR_NONE;
} else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) {
/* handle stream errors by closing the stream */
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
true, false, err);
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, err);
gpr_slice_buffer_add(
&transport_global->qbuf,
grpc_chttp2_rst_stream_create(transport_global->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR,
&stream_global->stats.outgoing));
return init_skip_frame_parser(exec_ctx, transport_global, 0);
&t->qbuf, grpc_chttp2_rst_stream_create(t->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR,
&s->stats.outgoing));
return init_skip_frame_parser(exec_ctx, t, 0);
} else {
return err;
}
@ -462,21 +440,20 @@ static void free_timeout(void *p) { gpr_free(p); }
static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
grpc_mdelem *md) {
grpc_chttp2_transport_global *transport_global = tp;
grpc_chttp2_stream_global *stream_global = transport_global->incoming_stream;
grpc_chttp2_transport *t = tp;
grpc_chttp2_stream *s = t->incoming_stream;
GPR_TIMER_BEGIN("on_initial_header", 0);
GPR_ASSERT(stream_global);
GPR_ASSERT(s != NULL);
GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_INFO, "HTTP:%d:HDR:%s: %s: %s", stream_global->id,
transport_global->is_client ? "CLI" : "SVR",
GPR_INFO, "HTTP:%d:HDR:%s: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
if (md->key == GRPC_MDSTR_GRPC_STATUS && md != GRPC_MDELEM_GRPC_STATUS_0) {
/* TODO(ctiller): check for a status like " 0" */
stream_global->seen_error = true;
s->seen_error = true;
}
if (md->key == GRPC_MDSTR_GRPC_TIMEOUT) {
@ -493,31 +470,29 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
}
grpc_chttp2_incoming_metadata_buffer_set_deadline(
&stream_global->metadata_buffer[0],
&s->metadata_buffer[0],
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout));
GRPC_MDELEM_UNREF(md);
} else {
const size_t new_size =
stream_global->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md);
const size_t new_size = s->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md);
const size_t metadata_size_limit =
transport_global->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
if (new_size > metadata_size_limit) {
gpr_log(GPR_DEBUG,
"received initial metadata size exceeds limit (%" PRIuPTR
" vs. %" PRIuPTR ")",
new_size, metadata_size_limit);
grpc_chttp2_cancel_stream(
exec_ctx, transport_global, stream_global,
exec_ctx, t, s,
grpc_error_set_int(
GRPC_ERROR_CREATE("received initial metadata size exceeds limit"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
grpc_chttp2_parsing_become_skip_parser(exec_ctx, transport_global);
stream_global->seen_error = true;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
s->seen_error = true;
GRPC_MDELEM_UNREF(md);
} else {
grpc_chttp2_incoming_metadata_buffer_add(
&stream_global->metadata_buffer[0], md);
grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md);
}
}
@ -526,234 +501,213 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp,
grpc_mdelem *md) {
grpc_chttp2_transport_global *transport_global = tp;
grpc_chttp2_stream_global *stream_global = transport_global->incoming_stream;
grpc_chttp2_transport *t = tp;
grpc_chttp2_stream *s = t->incoming_stream;
GPR_TIMER_BEGIN("on_trailing_header", 0);
GPR_ASSERT(stream_global);
GPR_ASSERT(s != NULL);
GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_INFO, "HTTP:%d:TRL:%s: %s: %s", stream_global->id,
transport_global->is_client ? "CLI" : "SVR",
GPR_INFO, "HTTP:%d:TRL:%s: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
if (md->key == GRPC_MDSTR_GRPC_STATUS && md != GRPC_MDELEM_GRPC_STATUS_0) {
/* TODO(ctiller): check for a status like " 0" */
stream_global->seen_error = true;
s->seen_error = true;
}
const size_t new_size =
stream_global->metadata_buffer[1].size + GRPC_MDELEM_LENGTH(md);
const size_t new_size = s->metadata_buffer[1].size + GRPC_MDELEM_LENGTH(md);
const size_t metadata_size_limit =
transport_global->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
if (new_size > metadata_size_limit) {
gpr_log(GPR_DEBUG,
"received trailing metadata size exceeds limit (%" PRIuPTR
" vs. %" PRIuPTR ")",
new_size, metadata_size_limit);
grpc_chttp2_cancel_stream(
exec_ctx, transport_global, stream_global,
exec_ctx, t, s,
grpc_error_set_int(
GRPC_ERROR_CREATE("received trailing metadata size exceeds limit"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
grpc_chttp2_parsing_become_skip_parser(exec_ctx, transport_global);
stream_global->seen_error = true;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
s->seen_error = true;
GRPC_MDELEM_UNREF(md);
} else {
grpc_chttp2_incoming_metadata_buffer_add(&stream_global->metadata_buffer[1],
md);
grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md);
}
GPR_TIMER_END("on_trailing_header", 0);
}
static grpc_error *init_header_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
int is_continuation) {
uint8_t is_eoh = (transport_global->incoming_frame_flags &
GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
int via_accept = 0;
grpc_chttp2_stream_global *stream_global;
static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
int is_continuation) {
uint8_t is_eoh =
(t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
grpc_chttp2_stream *s;
/* TODO(ctiller): when to increment header_frames_received? */
if (is_eoh) {
transport_global->expect_continuation_stream_id = 0;
t->expect_continuation_stream_id = 0;
} else {
transport_global->expect_continuation_stream_id =
transport_global->incoming_stream_id;
t->expect_continuation_stream_id = t->incoming_stream_id;
}
if (!is_continuation) {
transport_global->header_eof = (transport_global->incoming_frame_flags &
GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
t->header_eof =
(t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
}
/* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */
stream_global = grpc_chttp2_parsing_lookup_stream(
transport_global, transport_global->incoming_stream_id);
if (stream_global == NULL) {
s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
if (s == NULL) {
if (is_continuation) {
gpr_log(GPR_ERROR,
"grpc_chttp2_stream disbanded before CONTINUATION received");
return init_skip_frame_parser(exec_ctx, transport_global, 1);
return init_skip_frame_parser(exec_ctx, t, 1);
}
if (transport_global->is_client) {
if ((transport_global->incoming_stream_id & 1) &&
transport_global->incoming_stream_id <
transport_global->next_stream_id) {
if (t->is_client) {
if ((t->incoming_stream_id & 1) &&
t->incoming_stream_id < t->next_stream_id) {
/* this is an old (probably cancelled) grpc_chttp2_stream */
} else {
gpr_log(GPR_ERROR,
"ignoring new grpc_chttp2_stream creation on client");
}
return init_skip_frame_parser(exec_ctx, transport_global, 1);
} else if (transport_global->last_incoming_stream_id >
transport_global->incoming_stream_id) {
return init_skip_frame_parser(exec_ctx, t, 1);
} else if (t->last_incoming_stream_id > t->incoming_stream_id) {
gpr_log(GPR_ERROR,
"ignoring out of order new grpc_chttp2_stream request on server; "
"last grpc_chttp2_stream "
"id=%d, new grpc_chttp2_stream id=%d",
transport_global->last_incoming_stream_id,
transport_global->incoming_stream_id);
return init_skip_frame_parser(exec_ctx, transport_global, 1);
} else if ((transport_global->incoming_stream_id & 1) == 0) {
t->last_incoming_stream_id, t->incoming_stream_id);
return init_skip_frame_parser(exec_ctx, t, 1);
} else if ((t->incoming_stream_id & 1) == 0) {
gpr_log(GPR_ERROR,
"ignoring grpc_chttp2_stream with non-client generated index %d",
transport_global->incoming_stream_id);
return init_skip_frame_parser(exec_ctx, transport_global, 1);
t->incoming_stream_id);
return init_skip_frame_parser(exec_ctx, t, 1);
}
stream_global = transport_global->incoming_stream =
grpc_chttp2_parsing_accept_stream(exec_ctx, transport_global,
transport_global->incoming_stream_id);
if (stream_global == NULL) {
s = t->incoming_stream =
grpc_chttp2_parsing_accept_stream(exec_ctx, t, t->incoming_stream_id);
if (s == NULL) {
gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted");
return init_skip_frame_parser(exec_ctx, transport_global, 1);
return init_skip_frame_parser(exec_ctx, t, 1);
}
via_accept = 1;
} else {
transport_global->incoming_stream = stream_global;
t->incoming_stream = s;
}
GPR_ASSERT(stream_global != NULL && (via_accept == 0 || via_accept == 1));
stream_global->stats.incoming.framing_bytes += 9;
if (stream_global->read_closed) {
GPR_ASSERT(s != NULL);
s->stats.incoming.framing_bytes += 9;
if (s->read_closed) {
gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header");
transport_global->incoming_stream = NULL;
return init_skip_frame_parser(exec_ctx, transport_global, 1);
t->incoming_stream = NULL;
return init_skip_frame_parser(exec_ctx, t, 1);
}
transport_global->parser = grpc_chttp2_header_parser_parse;
transport_global->parser_data = &transport_global->hpack_parser;
switch (stream_global->header_frames_received) {
t->parser = grpc_chttp2_header_parser_parse;
t->parser_data = &t->hpack_parser;
switch (s->header_frames_received) {
case 0:
transport_global->hpack_parser.on_header = on_initial_header;
t->hpack_parser.on_header = on_initial_header;
break;
case 1:
transport_global->hpack_parser.on_header = on_trailing_header;
t->hpack_parser.on_header = on_trailing_header;
break;
case 2:
gpr_log(GPR_ERROR, "too many header frames received");
return init_skip_frame_parser(exec_ctx, transport_global, 1);
return init_skip_frame_parser(exec_ctx, t, 1);
}
transport_global->hpack_parser.on_header_user_data = transport_global;
transport_global->hpack_parser.is_boundary = is_eoh;
transport_global->hpack_parser.is_eof =
(uint8_t)(is_eoh ? transport_global->header_eof : 0);
if (!is_continuation && (transport_global->incoming_frame_flags &
GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
grpc_chttp2_hpack_parser_set_has_priority(&transport_global->hpack_parser);
t->hpack_parser.on_header_user_data = t;
t->hpack_parser.is_boundary = is_eoh;
t->hpack_parser.is_eof = (uint8_t)(is_eoh ? t->header_eof : 0);
if (!is_continuation &&
(t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
grpc_chttp2_hpack_parser_set_has_priority(&t->hpack_parser);
}
return GRPC_ERROR_NONE;
}
static grpc_error *init_window_update_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
static grpc_error *init_window_update_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_error *err = grpc_chttp2_window_update_parser_begin_frame(
&transport_global->simple.window_update,
transport_global->incoming_frame_size,
transport_global->incoming_frame_flags);
&t->simple.window_update, t->incoming_frame_size,
t->incoming_frame_flags);
if (err != GRPC_ERROR_NONE) return err;
if (transport_global->incoming_stream_id != 0) {
grpc_chttp2_stream_global *stream_global =
transport_global->incoming_stream = grpc_chttp2_parsing_lookup_stream(
transport_global, transport_global->incoming_stream_id);
if (stream_global == NULL) {
return init_skip_frame_parser(exec_ctx, transport_global, 0);
if (t->incoming_stream_id != 0) {
grpc_chttp2_stream *s = t->incoming_stream =
grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
if (s == NULL) {
return init_skip_frame_parser(exec_ctx, t, 0);
}
stream_global->stats.incoming.framing_bytes += 9;
s->stats.incoming.framing_bytes += 9;
}
transport_global->parser = grpc_chttp2_window_update_parser_parse;
transport_global->parser_data = &transport_global->simple.window_update;
t->parser = grpc_chttp2_window_update_parser_parse;
t->parser_data = &t->simple.window_update;
return GRPC_ERROR_NONE;
}
static grpc_error *init_ping_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
static grpc_error *init_ping_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_error *err = grpc_chttp2_ping_parser_begin_frame(
&transport_global->simple.ping, transport_global->incoming_frame_size,
transport_global->incoming_frame_flags);
&t->simple.ping, t->incoming_frame_size, t->incoming_frame_flags);
if (err != GRPC_ERROR_NONE) return err;
transport_global->parser = grpc_chttp2_ping_parser_parse;
transport_global->parser_data = &transport_global->simple.ping;
t->parser = grpc_chttp2_ping_parser_parse;
t->parser_data = &t->simple.ping;
return GRPC_ERROR_NONE;
}
static grpc_error *init_rst_stream_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
static grpc_error *init_rst_stream_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_error *err = grpc_chttp2_rst_stream_parser_begin_frame(
&transport_global->simple.rst_stream,
transport_global->incoming_frame_size,
transport_global->incoming_frame_flags);
&t->simple.rst_stream, t->incoming_frame_size, t->incoming_frame_flags);
if (err != GRPC_ERROR_NONE) return err;
grpc_chttp2_stream_global *stream_global = transport_global->incoming_stream =
grpc_chttp2_parsing_lookup_stream(transport_global,
transport_global->incoming_stream_id);
if (!transport_global->incoming_stream) {
return init_skip_frame_parser(exec_ctx, transport_global, 0);
grpc_chttp2_stream *s = t->incoming_stream =
grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
if (!t->incoming_stream) {
return init_skip_frame_parser(exec_ctx, t, 0);
}
stream_global->stats.incoming.framing_bytes += 9;
transport_global->parser = grpc_chttp2_rst_stream_parser_parse;
transport_global->parser_data = &transport_global->simple.rst_stream;
s->stats.incoming.framing_bytes += 9;
t->parser = grpc_chttp2_rst_stream_parser_parse;
t->parser_data = &t->simple.rst_stream;
return GRPC_ERROR_NONE;
}
static grpc_error *init_goaway_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
static grpc_error *init_goaway_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_error *err = grpc_chttp2_goaway_parser_begin_frame(
&transport_global->goaway_parser, transport_global->incoming_frame_size,
transport_global->incoming_frame_flags);
&t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
if (err != GRPC_ERROR_NONE) return err;
transport_global->parser = grpc_chttp2_goaway_parser_parse;
transport_global->parser_data = &transport_global->goaway_parser;
t->parser = grpc_chttp2_goaway_parser_parse;
t->parser_data = &t->goaway_parser;
return GRPC_ERROR_NONE;
}
static grpc_error *init_settings_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
if (transport_global->incoming_stream_id != 0) {
static grpc_error *init_settings_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
if (t->incoming_stream_id != 0) {
return GRPC_ERROR_CREATE("Settings frame received for grpc_chttp2_stream");
}
grpc_error *err = grpc_chttp2_settings_parser_begin_frame(
&transport_global->simple.settings, transport_global->incoming_frame_size,
transport_global->incoming_frame_flags,
transport_global->settings[GRPC_PEER_SETTINGS]);
&t->simple.settings, t->incoming_frame_size, t->incoming_frame_flags,
t->settings[GRPC_PEER_SETTINGS]);
if (err != GRPC_ERROR_NONE) {
return err;
}
if (transport_global->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
memcpy(transport_global->settings[GRPC_ACKED_SETTINGS],
transport_global->settings[GRPC_SENT_SETTINGS],
if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
memcpy(t->settings[GRPC_ACKED_SETTINGS], t->settings[GRPC_SENT_SETTINGS],
GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t));
grpc_chttp2_hptbl_set_max_bytes(
&transport_global->hpack_parser.table,
transport_global->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
transport_global->sent_local_settings = 0;
&t->hpack_parser.table,
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
t->sent_local_settings = 0;
}
transport_global->parser = grpc_chttp2_settings_parser_parse;
transport_global->parser_data = &transport_global->simple.settings;
t->parser = grpc_chttp2_settings_parser_parse;
t->parser_data = &t->simple.settings;
return GRPC_ERROR_NONE;
}
@ -763,17 +717,14 @@ static int is_window_update_legal(int64_t window_update, int64_t window) {
}
*/
static grpc_error *parse_frame_slice(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
gpr_slice slice, int is_last) {
grpc_chttp2_stream_global *stream_global = transport_global->incoming_stream;
grpc_error *err =
transport_global->parser(exec_ctx, transport_global->parser_data,
transport_global, stream_global, slice, is_last);
static grpc_error *parse_frame_slice(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, gpr_slice slice,
int is_last) {
grpc_chttp2_stream *s = t->incoming_stream;
grpc_error *err = t->parser(exec_ctx, t->parser_data, t, s, slice, is_last);
if (err == GRPC_ERROR_NONE) {
if (stream_global != NULL) {
grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
stream_global);
if (s != NULL) {
grpc_chttp2_list_add_check_read_ops(exec_ctx, t, s);
}
return err;
} else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) {
@ -782,14 +733,13 @@ static grpc_error *parse_frame_slice(
gpr_log(GPR_ERROR, "%s", msg);
grpc_error_free_string(msg);
}
grpc_chttp2_parsing_become_skip_parser(exec_ctx, transport_global);
if (stream_global) {
stream_global->forced_close_error = err;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
if (s) {
s->forced_close_error = err;
gpr_slice_buffer_add(
&transport_global->qbuf,
grpc_chttp2_rst_stream_create(transport_global->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR,
&stream_global->stats.outgoing));
&t->qbuf, grpc_chttp2_rst_stream_create(t->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR,
&s->stats.outgoing));
} else {
GRPC_ERROR_UNREF(err);
}

@ -35,27 +35,6 @@
#include <grpc/support/log.h>
#define TRANSPORT_FROM_GLOBAL(tg) \
((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \
global)))
#define STREAM_FROM_GLOBAL(sg) \
((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global)))
#define TRANSPORT_FROM_WRITING(tw) \
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
writing)))
#define STREAM_FROM_WRITING(sw) \
((grpc_chttp2_stream *)((char *)(sw)-offsetof(grpc_chttp2_stream, writing)))
#define TRANSPORT_FROM_PARSING(tp) \
((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \
parsing)))
#define STREAM_FROM_PARSING(sp) \
((grpc_chttp2_stream *)((char *)(sp)-offsetof(grpc_chttp2_stream, parsing)))
/* core list management */
static int stream_list_empty(grpc_chttp2_transport *t,
@ -139,219 +118,114 @@ static bool stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
/* wrappers for specializations */
bool grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
GPR_ASSERT(stream_global->id != 0);
return stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE);
bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
GPR_ASSERT(s->id != 0);
return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITABLE);
}
int grpc_chttp2_list_pop_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_WRITABLE);
if (r != 0) {
*stream_global = &stream->global;
*stream_writing = &stream->writing;
}
return r;
int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITABLE);
}
bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE);
bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WRITABLE);
}
void grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
GPR_ASSERT(stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_WRITING));
void grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
GPR_ASSERT(stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITING));
}
int grpc_chttp2_list_have_writing_streams(
grpc_chttp2_transport_writing *transport_writing) {
return !stream_list_empty(TRANSPORT_FROM_WRITING(transport_writing),
GRPC_CHTTP2_LIST_WRITING);
int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t) {
return !stream_list_empty(t, GRPC_CHTTP2_LIST_WRITING);
}
int grpc_chttp2_list_pop_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream,
GRPC_CHTTP2_LIST_WRITING);
if (r != 0) {
*stream_writing = &stream->writing;
}
return r;
int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITING);
}
void grpc_chttp2_list_add_written_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_WRITTEN);
void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITTEN);
}
int grpc_chttp2_list_pop_written_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream,
GRPC_CHTTP2_LIST_WRITTEN);
if (r != 0) {
*stream_global = &stream->global;
*stream_writing = &stream->writing;
}
return r;
int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITTEN);
}
void grpc_chttp2_list_add_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
if (r != 0) {
*stream_global = &stream->global;
}
return r;
int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
void grpc_chttp2_list_add_check_read_ops(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
if (!t->executor.check_read_ops_scheduled) {
GRPC_CHTTP2_REF_TRANSPORT(TRANSPORT_FROM_GLOBAL(transport_global),
"initiate_read_flush_locked");
grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
&t->initiate_read_flush_locked,
GRPC_ERROR_NONE, false);
t->executor.check_read_ops_scheduled = true;
void grpc_chttp2_list_add_check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
if (!t->check_read_ops_scheduled) {
GRPC_CHTTP2_REF_TRANSPORT(t, "initiate_read_flush_locked");
grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->read_action_flush_locked, GRPC_ERROR_NONE,
false);
t->check_read_ops_scheduled = true;
}
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CHECK_READ_OPS);
stream_list_add(t, s, GRPC_CHTTP2_LIST_CHECK_READ_OPS);
}
bool grpc_chttp2_list_remove_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CHECK_READ_OPS);
bool grpc_chttp2_list_remove_check_read_ops(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_CHECK_READ_OPS);
}
int grpc_chttp2_list_pop_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_CHECK_READ_OPS);
if (r != 0) {
*stream_global = &stream->global;
}
return r;
int grpc_chttp2_list_pop_check_read_ops(grpc_chttp2_transport *t,
grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_CHECK_READ_OPS);
}
void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing);
gpr_log(GPR_DEBUG, "writing stalled %d", stream->global.id);
void grpc_chttp2_list_add_writing_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
grpc_chttp2_stream *stream = s;
gpr_log(GPR_DEBUG, "writing stalled %d", s->id);
if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) {
GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled");
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing_stalled");
}
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT);
stream_list_add(t, stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT);
}
bool grpc_chttp2_list_flush_writing_stalled_by_transport(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream *stream;
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
bool out = false;
grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing);
while (stream_list_pop(transport, &stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
gpr_log(GPR_DEBUG, "move %d from writing stalled to just stalled",
stream->global.id);
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
&stream->writing);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global,
"chttp2_writing_stalled");
while (
stream_list_pop(t, &s, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
gpr_log(GPR_DEBUG, "move %d from writing stalled to just stalled", s->id);
grpc_chttp2_list_add_stalled_by_transport(t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing_stalled");
out = true;
}
return out;
}
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
gpr_log(GPR_DEBUG, "stalled %d", stream_writing->id);
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}
void grpc_chttp2_list_remove_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
void grpc_chttp2_list_add_closed_waiting_for_writing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING);
int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
int grpc_chttp2_list_pop_closed_waiting_for_writing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING);
if (r != 0) {
*stream_global = &stream->global;
}
return r;
void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}

@ -40,343 +40,360 @@
#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/lib/profiling/timers.h"
static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_writing *transport_writing);
static void queue_write_callback(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_closure **c,
grpc_error *error,
grpc_chttp2_call_write_cb_when when) {
switch (when) {
case GRPC_CHTTP2_CALL_WHEN_SCHEDULED:
grpc_chttp2_complete_closure_step(exec_ctx, t, s, c, error);
break;
case GRPC_CHTTP2_CALL_WHEN_WRITTEN:
int grpc_chttp2_unlocking_check_writes(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;
break;
}
}
bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
GPR_TIMER_BEGIN("grpc_chttp2_unlocking_check_writes", 0);
GPR_TIMER_BEGIN("grpc_chttp2_begin_write", 0);
if (transport_global->dirtied_local_settings &&
!transport_global->sent_local_settings) {
if (t->dirtied_local_settings && !t->sent_local_settings) {
gpr_slice_buffer_add(
&transport_writing->outbuf,
&t->outbuf,
grpc_chttp2_settings_create(
transport_global->settings[GRPC_SENT_SETTINGS],
transport_global->settings[GRPC_LOCAL_SETTINGS],
transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
transport_global->force_send_settings = 0;
transport_global->dirtied_local_settings = 0;
transport_global->sent_local_settings = 1;
t->settings[GRPC_SENT_SETTINGS], t->settings[GRPC_LOCAL_SETTINGS],
t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
t->force_send_settings = 0;
t->dirtied_local_settings = 0;
t->sent_local_settings = 1;
}
/* simple writes are queued to qbuf, and flushed here */
gpr_slice_buffer_move_into(&transport_global->qbuf,
&transport_writing->outbuf);
GPR_ASSERT(transport_global->qbuf.count == 0);
gpr_slice_buffer_move_into(&t->qbuf, &t->outbuf);
GPR_ASSERT(t->qbuf.count == 0);
grpc_chttp2_hpack_compressor_set_max_table_size(
&transport_writing->hpack_compressor,
transport_global->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
if (transport_writing->outgoing_window > 0) {
while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
&stream_global)) {
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
false, "transport.read_flow_control");
&t->hpack_compressor,
t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
if (t->outgoing_window > 0) {
while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) {
grpc_chttp2_become_writable(exec_ctx, t, s, false,
"transport.read_flow_control");
}
}
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
while (grpc_chttp2_list_pop_writable_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
bool sent_initial_metadata = stream_writing->sent_initial_metadata;
while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
bool sent_initial_metadata = s->sent_initial_metadata;
bool become_writable = false;
stream_writing->id = stream_global->id;
stream_writing->read_closed = stream_global->read_closed;
GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_writing, stream_writing,
outgoing_window, stream_global,
GRPC_CHTTP2_FLOW_MOVE_STREAM("write", t, s, outgoing_window, s,
outgoing_window);
if (!sent_initial_metadata && stream_global->send_initial_metadata) {
stream_writing->send_initial_metadata =
stream_global->send_initial_metadata;
stream_global->send_initial_metadata = NULL;
/* send initial metadata if it's available */
if (!sent_initial_metadata && s->send_initial_metadata) {
grpc_chttp2_encode_header(&t->hpack_compressor, s->id,
s->send_initial_metadata, 0, &s->stats.outgoing,
&t->outbuf);
s->send_initial_metadata = NULL;
become_writable = true;
sent_initial_metadata = true;
}
/* send any window updates */
if (s->announce_window > 0 && s->send_initial_metadata == NULL) {
uint32_t announce = s->announce_window;
gpr_slice_buffer_add(&t->outbuf,
grpc_chttp2_window_update_create(
s->id, s->announce_window, &s->stats.outgoing));
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce);
/* TODO(ctiller): why? */
s->announce_window = 0;
}
if (sent_initial_metadata) {
if (stream_global->send_message != NULL) {
/* send any body bytes, if allowed by flow control */
if (s->flow_controlled_buffer.length > 0) {
uint32_t max_outgoing =
(uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
GPR_MIN(s->outgoing_window, t->outgoing_window));
uint32_t send_bytes =
(uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
bool is_last_data_frame =
s->fetching_send_message == NULL &&
send_bytes == s->flow_controlled_buffer.length;
bool is_last_frame =
is_last_data_frame && s->send_trailing_metadata != NULL &&
grpc_metadata_batch_is_empty(s->send_trailing_metadata);
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
is_last_frame, &s->stats.outgoing, &t->outbuf);
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
send_bytes);
if (is_last_frame) {
s->send_trailing_metadata = NULL;
s->sent_trailing_metadata = 1;
}
#if 0
if (s->send_message != NULL) {
gpr_slice hdr = gpr_slice_malloc(5);
uint8_t *p = GPR_SLICE_START_PTR(hdr);
uint32_t len = stream_global->send_message->length;
GPR_ASSERT(stream_writing->send_message == NULL);
p[0] = (stream_global->send_message->flags &
GRPC_WRITE_INTERNAL_COMPRESS) != 0;
uint32_t len = s->send_message->length;
GPR_ASSERT(s->send_message == NULL);
p[0] = (s->send_message->flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
p[1] = (uint8_t)(len >> 24);
p[2] = (uint8_t)(len >> 16);
p[3] = (uint8_t)(len >> 8);
p[4] = (uint8_t)(len);
gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer, hdr);
gpr_slice_buffer_add(&s->flow_controlled_buffer, hdr);
if (stream_global->send_message->length > 0) {
stream_writing->send_message = stream_global->send_message;
s->send_message = stream_global->send_message;
} else {
stream_writing->send_message = NULL;
s->send_message = NULL;
}
stream_writing->stream_fetched = 0;
stream_global->send_message = NULL;
s->stream_fetched = 0;
s->send_message = NULL;
}
if ((stream_writing->send_message != NULL ||
stream_writing->flow_controlled_buffer.length > 0) &&
stream_writing->outgoing_window > 0) {
if ((s->send_message != NULL || s->flow_controlled_buffer.length > 0) &&
s->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) {
become_writable = true;
} else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_stalled_by_transport(t, s);
}
}
if (stream_global->send_trailing_metadata) {
stream_writing->send_trailing_metadata =
stream_global->send_trailing_metadata;
stream_global->send_trailing_metadata = NULL;
#endif
if (stream_global->send_trailing_metadata) {
stream_writing->send_trailing_metadata =
stream_global->send_trailing_metadata;
stream_global->send_trailing_metadata = NULL;
become_writable = true;
}
}
if (!stream_global->read_closed &&
stream_global->unannounced_incoming_window_for_writing > 1024) {
GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
announce_window, stream_global,
unannounced_incoming_window_for_writing);
become_writable = true;
}
}
if (!stream_global->read_closed &&
stream_global->unannounced_incoming_window_for_writing > 1024) {
GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
announce_window, stream_global,
unannounced_incoming_window_for_writing);
become_writable = true;
if (become_writable) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} else {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
}
if (become_writable) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} else {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
if (transport_global->announce_incoming_window > 0) {
uint32_t announced = (uint32_t)GPR_MIN(
transport_global->announce_incoming_window, UINT32_MAX);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global,
announce_incoming_window, announced);
grpc_transport_one_way_stats throwaway_stats;
gpr_slice_buffer_add(
&transport_writing->outbuf,
grpc_chttp2_window_update_create(0, announced, &throwaway_stats));
}
}
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
if (transport_global->announce_incoming_window > 0) {
uint32_t announced = (uint32_t)GPR_MIN(
transport_global->announce_incoming_window, UINT32_MAX);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global,
announce_incoming_window, announced);
grpc_transport_one_way_stats throwaway_stats;
gpr_slice_buffer_add(
&transport_writing->outbuf,
grpc_chttp2_window_update_create(0, announced, &throwaway_stats));
}
GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0);
GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0);
return transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing);
}
return transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing);
}
void grpc_chttp2_perform_writes(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
grpc_endpoint *endpoint) {
GPR_ASSERT(transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing));
void grpc_chttp2_perform_writes(
grpc_exec_ctx * exec_ctx,
grpc_chttp2_transport_writing * transport_writing,
grpc_endpoint * endpoint) {
GPR_ASSERT(transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing));
finalize_outbuf(exec_ctx, transport_writing);
finalize_outbuf(exec_ctx, transport_writing);
GPR_ASSERT(endpoint);
GPR_ASSERT(endpoint);
if (transport_writing->outbuf.count > 0) {
grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
&transport_writing->done_cb);
} else {
grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE,
NULL);
if (transport_writing->outbuf.count > 0) {
grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
&transport_writing->done_cb);
} else {
grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb,
GRPC_ERROR_NONE, NULL);
}
}
}
static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_writing *stream_writing;
static void finalize_outbuf(
grpc_exec_ctx * exec_ctx,
grpc_chttp2_transport_writing * transport_writing) {
grpc_chttp2_stream_writing *stream_writing;
GPR_TIMER_BEGIN("finalize_outbuf", 0);
GPR_TIMER_BEGIN("finalize_outbuf", 0);
bool is_first_data_frame = true;
while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
uint32_t max_outgoing =
(uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
GPR_MIN(stream_writing->outgoing_window,
transport_writing->outgoing_window));
/* send initial metadata if it's available */
if (stream_writing->send_initial_metadata != NULL) {
grpc_chttp2_encode_header(
&transport_writing->hpack_compressor, stream_writing->id,
stream_writing->send_initial_metadata, 0, &stream_writing->stats,
&transport_writing->outbuf);
stream_writing->send_initial_metadata = NULL;
stream_writing->sent_initial_metadata = 1;
}
/* send any window updates */
if (stream_writing->announce_window > 0 &&
stream_writing->send_initial_metadata == NULL) {
uint32_t announce = stream_writing->announce_window;
gpr_slice_buffer_add(
&transport_writing->outbuf,
grpc_chttp2_window_update_create(stream_writing->id,
stream_writing->announce_window,
&stream_writing->stats));
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, stream_writing,
announce_window, announce);
stream_writing->announce_window = 0;
}
/* fetch any body bytes */
while (!stream_writing->fetching && stream_writing->send_message &&
stream_writing->flow_controlled_buffer.length < max_outgoing &&
stream_writing->stream_fetched <
stream_writing->send_message->length) {
if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message,
&stream_writing->fetching_slice, max_outgoing,
&stream_writing->finished_fetch)) {
stream_writing->stream_fetched +=
GPR_SLICE_LENGTH(stream_writing->fetching_slice);
if (stream_writing->stream_fetched ==
stream_writing->send_message->length) {
stream_writing->send_message = NULL;
bool is_first_data_frame = true;
while (grpc_chttp2_list_pop_writing_stream(transport_writing,
&stream_writing)) {
uint32_t max_outgoing =
(uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
GPR_MIN(stream_writing->outgoing_window,
transport_writing->outgoing_window));
/* fetch any body bytes */
while (!stream_writing->fetching && stream_writing->send_message &&
stream_writing->flow_controlled_buffer.length < max_outgoing &&
stream_writing->stream_fetched <
stream_writing->send_message->length) {
if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message,
&stream_writing->fetching_slice, max_outgoing,
&stream_writing->finished_fetch)) {
stream_writing->stream_fetched +=
GPR_SLICE_LENGTH(stream_writing->fetching_slice);
if (stream_writing->stream_fetched ==
stream_writing->send_message->length) {
stream_writing->send_message = NULL;
}
gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer,
stream_writing->fetching_slice);
} else {
stream_writing->fetching = 1;
}
gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer,
stream_writing->fetching_slice);
} else {
stream_writing->fetching = 1;
}
}
/* send any body bytes */
if (stream_writing->flow_controlled_buffer.length > 0) {
if (max_outgoing > 0) {
uint32_t send_bytes = (uint32_t)GPR_MIN(
max_outgoing, stream_writing->flow_controlled_buffer.length);
int is_last_data_frame =
stream_writing->send_message == NULL &&
send_bytes == stream_writing->flow_controlled_buffer.length;
int is_last_frame = is_last_data_frame &&
stream_writing->send_trailing_metadata != NULL &&
grpc_metadata_batch_is_empty(
stream_writing->send_trailing_metadata);
grpc_chttp2_encode_data(
stream_writing->id, &stream_writing->flow_controlled_buffer,
send_bytes, is_last_frame, &stream_writing->stats,
&transport_writing->outbuf);
if (is_first_data_frame) {
/* TODO(dgq): this is a hack. It'll be fix in a future refactoring */
stream_writing->stats.data_bytes -= 5; /* discount grpc framing */
is_first_data_frame = false;
/* send any body bytes */
if (stream_writing->flow_controlled_buffer.length > 0) {
if (max_outgoing > 0) {
uint32_t send_bytes = (uint32_t)GPR_MIN(
max_outgoing, stream_writing->flow_controlled_buffer.length);
int is_last_data_frame =
stream_writing->send_message == NULL &&
send_bytes == stream_writing->flow_controlled_buffer.length;
int is_last_frame = is_last_data_frame &&
stream_writing->send_trailing_metadata != NULL &&
grpc_metadata_batch_is_empty(
stream_writing->send_trailing_metadata);
grpc_chttp2_encode_data(
stream_writing->id, &stream_writing->flow_controlled_buffer,
send_bytes, is_last_frame, &stream_writing->stats,
&transport_writing->outbuf);
if (is_first_data_frame) {
/* TODO(dgq): this is a hack. It'll be fix in a future refactoring
*/
stream_writing->stats.data_bytes -= 5; /* discount grpc framing */
is_first_data_frame = false;
}
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing,
stream_writing, outgoing_window,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing,
outgoing_window, send_bytes);
if (is_last_frame) {
stream_writing->send_trailing_metadata = NULL;
stream_writing->sent_trailing_metadata = 1;
}
if (is_last_data_frame) {
GPR_ASSERT(stream_writing->send_message == NULL);
stream_writing->sent_message = 1;
}
} else if (transport_writing->outgoing_window == 0) {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing,
stream_writing);
}
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing,
stream_writing, outgoing_window,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing,
outgoing_window, send_bytes);
if (is_last_frame) {
stream_writing->send_trailing_metadata = NULL;
stream_writing->sent_trailing_metadata = 1;
}
/* send trailing metadata if it's available and we're ready for it */
if (stream_writing->send_message == NULL &&
stream_writing->flow_controlled_buffer.length == 0 &&
stream_writing->send_trailing_metadata != NULL) {
if (grpc_metadata_batch_is_empty(
stream_writing->send_trailing_metadata)) {
grpc_chttp2_encode_data(
stream_writing->id, &stream_writing->flow_controlled_buffer, 0, 1,
&stream_writing->stats, &transport_writing->outbuf);
} else {
grpc_chttp2_encode_header(
&transport_writing->hpack_compressor, stream_writing->id,
stream_writing->send_trailing_metadata, 1, &stream_writing->stats,
&transport_writing->outbuf);
}
if (is_last_data_frame) {
GPR_ASSERT(stream_writing->send_message == NULL);
stream_writing->sent_message = 1;
if (!transport_writing->is_client && !stream_writing->read_closed) {
gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_rst_stream_create(
stream_writing->id, GRPC_CHTTP2_NO_ERROR,
&stream_writing->stats));
}
} else if (transport_writing->outgoing_window == 0) {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
}
/* send trailing metadata if it's available and we're ready for it */
if (stream_writing->send_message == NULL &&
stream_writing->flow_controlled_buffer.length == 0 &&
stream_writing->send_trailing_metadata != NULL) {
if (grpc_metadata_batch_is_empty(
stream_writing->send_trailing_metadata)) {
grpc_chttp2_encode_data(
stream_writing->id, &stream_writing->flow_controlled_buffer, 0, 1,
&stream_writing->stats, &transport_writing->outbuf);
} else {
grpc_chttp2_encode_header(
&transport_writing->hpack_compressor, stream_writing->id,
stream_writing->send_trailing_metadata, 1, &stream_writing->stats,
&transport_writing->outbuf);
stream_writing->send_trailing_metadata = NULL;
stream_writing->sent_trailing_metadata = 1;
}
if (!transport_writing->is_client && !stream_writing->read_closed) {
gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_rst_stream_create(
stream_writing->id, GRPC_CHTTP2_NO_ERROR,
&stream_writing->stats));
}
stream_writing->send_trailing_metadata = NULL;
stream_writing->sent_trailing_metadata = 1;
}
/* if there's more to write, then loop, otherwise prepare to finish the
* write */
if ((stream_writing->flow_controlled_buffer.length > 0 ||
(stream_writing->send_message && !stream_writing->fetching)) &&
stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
/* if there's more to write, then loop, otherwise prepare to finish the
* write */
if ((stream_writing->flow_controlled_buffer.length > 0 ||
(stream_writing->send_message && !stream_writing->fetching)) &&
stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) {
grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing);
} else {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing,
stream_writing);
}
} else {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
} else {
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
}
GPR_TIMER_END("finalize_outbuf", 0);
}
void grpc_chttp2_cleanup_writing(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0);
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx,
transport_writing)) {
grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
"resume_stalled_stream");
GPR_TIMER_END("finalize_outbuf", 0);
}
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_initial_metadata) {
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_initial_metadata_finished, GRPC_ERROR_NONE);
}
grpc_transport_move_one_way_stats(&stream_writing->stats,
&stream_global->stats.outgoing);
if (stream_writing->sent_message) {
GPR_ASSERT(stream_writing->send_message == NULL);
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_message_finished, GRPC_ERROR_NONE);
stream_writing->sent_message = 0;
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_trailing_metadata_finished, GRPC_ERROR_NONE);
void grpc_chttp2_cleanup_writing(
grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_global * transport_global,
grpc_chttp2_transport_writing * transport_writing) {
GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0);
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
if (grpc_chttp2_list_flush_writing_stalled_by_transport(
exec_ctx, transport_writing)) {
grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
"resume_stalled_stream");
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
!transport_global->is_client, 1,
GRPC_ERROR_NONE);
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_initial_metadata) {
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_initial_metadata_finished, GRPC_ERROR_NONE);
}
grpc_transport_move_one_way_stats(&stream_writing->stats,
&stream_global->stats.outgoing);
if (stream_writing->sent_message) {
GPR_ASSERT(stream_writing->send_message == NULL);
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_message_finished, GRPC_ERROR_NONE);
stream_writing->sent_message = 0;
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_trailing_metadata_finished, GRPC_ERROR_NONE);
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_mark_stream_closed(
exec_ctx, transport_global, stream_global,
!transport_global->is_client, 1, GRPC_ERROR_NONE);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0);
}
gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0);
}

Loading…
Cancel
Save