|
|
|
@ -998,10 +998,12 @@ static void finalize_outbuf(transport *t) { |
|
|
|
|
|
|
|
|
|
while ((s = stream_list_remove_head(t, WRITING))) { |
|
|
|
|
grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops, |
|
|
|
|
s->send_closed != DONT_SEND_CLOSED, s->id, &t->hpack_compressor, &t->outbuf); |
|
|
|
|
s->send_closed != DONT_SEND_CLOSED, s->id, |
|
|
|
|
&t->hpack_compressor, &t->outbuf); |
|
|
|
|
s->writing_sopb.nops = 0; |
|
|
|
|
if (s->send_closed == SEND_CLOSED_WITH_RST_STREAM) { |
|
|
|
|
gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR)); |
|
|
|
|
gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create( |
|
|
|
|
s->id, GRPC_CHTTP2_NO_ERROR)); |
|
|
|
|
} |
|
|
|
|
if (s->send_closed != DONT_SEND_CLOSED) { |
|
|
|
|
stream_list_join(t, s, WRITTEN_CLOSED); |
|
|
|
@ -1064,12 +1066,12 @@ static void perform_write(transport *t, grpc_endpoint *ep) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void add_goaway(transport *t, gpr_uint32 goaway_error, gpr_slice goaway_text) { |
|
|
|
|
static void add_goaway(transport *t, gpr_uint32 goaway_error, |
|
|
|
|
gpr_slice goaway_text) { |
|
|
|
|
if (t->num_pending_goaways == t->cap_pending_goaways) { |
|
|
|
|
t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2); |
|
|
|
|
t->pending_goaways = |
|
|
|
|
gpr_realloc(t->pending_goaways, |
|
|
|
|
sizeof(pending_goaway) * t->cap_pending_goaways); |
|
|
|
|
t->pending_goaways = gpr_realloc( |
|
|
|
|
t->pending_goaways, sizeof(pending_goaway) * t->cap_pending_goaways); |
|
|
|
|
} |
|
|
|
|
t->pending_goaways[t->num_pending_goaways].status = |
|
|
|
|
grpc_chttp2_http2_error_to_grpc_status(goaway_error); |
|
|
|
@ -1077,13 +1079,12 @@ static void add_goaway(transport *t, gpr_uint32 goaway_error, gpr_slice goaway_t |
|
|
|
|
t->num_pending_goaways++; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void maybe_start_some_streams(transport *t) { |
|
|
|
|
/* start streams where we have free stream ids and free concurrency */ |
|
|
|
|
while ( |
|
|
|
|
t->next_stream_id <= MAX_CLIENT_STREAM_ID && |
|
|
|
|
grpc_chttp2_stream_map_size(&t->stream_map) < |
|
|
|
|
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { |
|
|
|
|
while (t->next_stream_id <= MAX_CLIENT_STREAM_ID && |
|
|
|
|
grpc_chttp2_stream_map_size(&t->stream_map) < |
|
|
|
|
t->settings[PEER_SETTINGS] |
|
|
|
|
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { |
|
|
|
|
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); |
|
|
|
|
if (!s) return; |
|
|
|
|
|
|
|
|
@ -1091,7 +1092,9 @@ static void maybe_start_some_streams(transport *t) { |
|
|
|
|
t->is_client ? "CLI" : "SVR", s, t->next_stream_id)); |
|
|
|
|
|
|
|
|
|
if (t->next_stream_id == MAX_CLIENT_STREAM_ID) { |
|
|
|
|
add_goaway(t, GRPC_CHTTP2_NO_ERROR, gpr_slice_from_copied_string("Exceeded sequence number limit")); |
|
|
|
|
add_goaway( |
|
|
|
|
t, GRPC_CHTTP2_NO_ERROR, |
|
|
|
|
gpr_slice_from_copied_string("Exceeded sequence number limit")); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(s->id == 0); |
|
|
|
@ -1109,7 +1112,10 @@ static void maybe_start_some_streams(transport *t) { |
|
|
|
|
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); |
|
|
|
|
if (!s) return; |
|
|
|
|
|
|
|
|
|
cancel_stream(t, s, GRPC_STATUS_UNAVAILABLE, grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL, 0); |
|
|
|
|
cancel_stream( |
|
|
|
|
t, s, GRPC_STATUS_UNAVAILABLE, |
|
|
|
|
grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL, |
|
|
|
|
0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1167,7 +1173,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { |
|
|
|
|
op_closure c; |
|
|
|
|
c.cb = op->on_consumed; |
|
|
|
|
c.user_data = op->on_consumed_user_data; |
|
|
|
|
schedule_cb(t, c, 1);
|
|
|
|
|
schedule_cb(t, c, 1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1262,8 +1268,8 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, |
|
|
|
|
/* synthesize a status if we don't believe we'll get one */ |
|
|
|
|
gpr_ltoa(local_status, buffer); |
|
|
|
|
add_incoming_metadata( |
|
|
|
|
t, s, |
|
|
|
|
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); |
|
|
|
|
t, s, grpc_mdelem_from_strings(t->metadata_context, "grpc-status", |
|
|
|
|
buffer)); |
|
|
|
|
if (!optional_message) { |
|
|
|
|
switch (local_status) { |
|
|
|
|
case GRPC_STATUS_CANCELLED: |
|
|
|
@ -1502,7 +1508,8 @@ static int init_header_frame_parser(transport *t, int is_continuation) { |
|
|
|
|
t->last_incoming_stream_id, t->incoming_stream_id); |
|
|
|
|
return init_skip_frame(t, 1); |
|
|
|
|
} else if ((t->incoming_stream_id & 1) == 0) { |
|
|
|
|
gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", t->incoming_stream_id); |
|
|
|
|
gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", |
|
|
|
|
t->incoming_stream_id); |
|
|
|
|
return init_skip_frame(t, 1); |
|
|
|
|
} |
|
|
|
|
t->incoming_stream = NULL; |
|
|
|
@ -1562,10 +1569,10 @@ static int init_ping_parser(transport *t) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int init_rst_stream_parser(transport *t) { |
|
|
|
|
int ok = GRPC_CHTTP2_PARSE_OK == |
|
|
|
|
grpc_chttp2_rst_stream_parser_begin_frame(&t->simple_parsers.rst_stream, |
|
|
|
|
t->incoming_frame_size, |
|
|
|
|
t->incoming_frame_flags); |
|
|
|
|
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame( |
|
|
|
|
&t->simple_parsers.rst_stream, |
|
|
|
|
t->incoming_frame_size, |
|
|
|
|
t->incoming_frame_flags); |
|
|
|
|
if (!ok) { |
|
|
|
|
drop_connection(t); |
|
|
|
|
} |
|
|
|
@ -1655,7 +1662,7 @@ static void add_metadata_batch(transport *t, stream *s) { |
|
|
|
|
we can reconstitute the list. |
|
|
|
|
We can't do list building here as later incoming metadata may reallocate |
|
|
|
|
the underlying array. */ |
|
|
|
|
b.list.tail = (void*)(gpr_intptr)s->incoming_metadata_count; |
|
|
|
|
b.list.tail = (void *)(gpr_intptr)s->incoming_metadata_count; |
|
|
|
|
b.garbage.head = b.garbage.tail = NULL; |
|
|
|
|
b.deadline = s->incoming_deadline; |
|
|
|
|
s->incoming_deadline = gpr_inf_future; |
|
|
|
@ -2013,7 +2020,7 @@ static void patch_metadata_ops(stream *s) { |
|
|
|
|
int found_metadata = 0; |
|
|
|
|
|
|
|
|
|
/* rework the array of metadata into a linked list, making use
|
|
|
|
|
of the breadcrumbs we left in metadata batches during
|
|
|
|
|
of the breadcrumbs we left in metadata batches during |
|
|
|
|
add_metadata_batch */ |
|
|
|
|
for (i = 0; i < nops; i++) { |
|
|
|
|
grpc_stream_op *op = &ops[i]; |
|
|
|
@ -2029,11 +2036,11 @@ static void patch_metadata_ops(stream *s) { |
|
|
|
|
op->data.metadata.list.head = &s->incoming_metadata[mdidx]; |
|
|
|
|
op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1]; |
|
|
|
|
for (j = mdidx + 1; j < last_mdidx; j++) { |
|
|
|
|
s->incoming_metadata[j].prev = &s->incoming_metadata[j-1]; |
|
|
|
|
s->incoming_metadata[j-1].next = &s->incoming_metadata[j]; |
|
|
|
|
s->incoming_metadata[j].prev = &s->incoming_metadata[j - 1]; |
|
|
|
|
s->incoming_metadata[j - 1].next = &s->incoming_metadata[j]; |
|
|
|
|
} |
|
|
|
|
s->incoming_metadata[mdidx].prev = NULL; |
|
|
|
|
s->incoming_metadata[last_mdidx-1].next = NULL; |
|
|
|
|
s->incoming_metadata[last_mdidx - 1].next = NULL; |
|
|
|
|
/* track where we're up to */ |
|
|
|
|
mdidx = last_mdidx; |
|
|
|
|
} |
|
|
|
@ -2045,7 +2052,8 @@ static void patch_metadata_ops(stream *s) { |
|
|
|
|
size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count; |
|
|
|
|
GPR_ASSERT(mdidx < s->incoming_metadata_count); |
|
|
|
|
s->incoming_metadata = gpr_malloc(copy_bytes); |
|
|
|
|
memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, copy_bytes); |
|
|
|
|
memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, |
|
|
|
|
copy_bytes); |
|
|
|
|
s->incoming_metadata_count = s->incoming_metadata_capacity = new_count; |
|
|
|
|
} else { |
|
|
|
|
s->incoming_metadata = NULL; |
|
|
|
@ -2082,7 +2090,6 @@ static void finish_reads(transport *t) { |
|
|
|
|
schedule_cb(t, s->recv_done_closure, 1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void schedule_cb(transport *t, op_closure closure, int success) { |
|
|
|
|