qps_test links with the breakup

pull/2149/head
Craig Tiller 10 years ago
parent 1937b06b78
commit 9850510e52
  1. 3
      src/core/transport/chttp2/hpack_parser.c
  2. 120
      src/core/transport/chttp2/incoming_metadata.c
  3. 24
      src/core/transport/chttp2/incoming_metadata.h
  4. 36
      src/core/transport/chttp2/internal.h
  5. 36
      src/core/transport/chttp2/parsing.c
  6. 99
      src/core/transport/chttp2/stream_lists.c
  7. 6
      src/core/transport/chttp2/stream_map.c
  8. 3
      src/core/transport/chttp2/writing.c
  9. 124
      src/core/transport/chttp2_transport.c

@ -1393,7 +1393,8 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
}
if (parser->is_boundary) {
grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
&stream_parsing->incoming_metadata, &stream_parsing->data_parser.incoming_sopb);
&stream_parsing->incoming_metadata,
&stream_parsing->data_parser.incoming_sopb);
}
if (parser->is_eof) {
stream_parsing->received_close = 1;

@ -40,36 +40,38 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
void grpc_chttp2_incoming_metadata_buffer_init(grpc_chttp2_incoming_metadata_buffer *buffer) {
void grpc_chttp2_incoming_metadata_buffer_init(
grpc_chttp2_incoming_metadata_buffer *buffer) {
buffer->deadline = gpr_inf_future;
}
void grpc_chttp2_incoming_metadata_buffer_destroy(grpc_chttp2_incoming_metadata_buffer *buffer) {
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_chttp2_incoming_metadata_buffer *buffer) {
gpr_free(buffer->elems);
}
void grpc_chttp2_incoming_metadata_buffer_add(grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_mdelem *elem) {
void grpc_chttp2_incoming_metadata_buffer_add(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem) {
if (buffer->capacity == buffer->count) {
buffer->capacity =
GPR_MAX(8, 2 * buffer->capacity);
buffer->capacity = GPR_MAX(8, 2 * buffer->capacity);
buffer->elems =
gpr_realloc(buffer->elems,
sizeof(*buffer->elems) *
buffer->capacity);
gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity);
}
buffer->elems[buffer->count++]
.md = elem;
buffer->elems[buffer->count++].md = elem;
}
void grpc_chttp2_incoming_metadata_buffer_set_deadline(grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) {
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) {
buffer->deadline = deadline;
}
#if 0
void grpc_chttp2_parsing_add_metadata_batch(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
void grpc_chttp2_incoming_metadata_live_op_buffer_end(
grpc_chttp2_incoming_metadata_live_op_buffer *buffer) {
gpr_free(buffer->elems);
buffer->elems = NULL;
}
void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb) {
grpc_metadata_batch b;
b.list.head = NULL;
@ -77,20 +79,19 @@ void grpc_chttp2_parsing_add_metadata_batch(
we can reconstitute the list.
We can't do list building here as later incoming metadata may reallocate
the underlying array. */
b.list.tail = (void *)(gpr_intptr)stream_parsing->incoming_metadata_count;
b.list.tail = (void*)(gpr_intptr)buffer->count;
b.garbage.head = b.garbage.tail = NULL;
b.deadline = stream_parsing->incoming_deadline;
stream_parsing->incoming_deadline = gpr_inf_future;
b.deadline = buffer->deadline;
buffer->deadline = gpr_inf_future;
grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b);
grpc_sopb_add_metadata(sopb, b);
}
#endif
#if 0
static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global,
grpc_chttp2_stream_parsing *stream_parsing) {
grpc_stream_op *ops = stream_global->incoming_sopb->ops;
size_t nops = stream_global->incoming_sopb->nops;
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb,
grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer) {
grpc_stream_op *ops = sopb->ops;
size_t nops = sopb->nops;
size_t i;
size_t j;
size_t mdidx = 0;
@ -109,40 +110,59 @@ static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global,
segment where this segment begins */
last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
GPR_ASSERT(last_mdidx > mdidx);
GPR_ASSERT(last_mdidx <= stream_parsing->incoming_metadata_count);
GPR_ASSERT(last_mdidx <= buffer->count);
/* turn the array into a doubly linked list */
op->data.metadata.list.head = &stream_parsing->incoming_metadata[mdidx];
op->data.metadata.list.tail =
&stream_parsing->incoming_metadata[last_mdidx - 1];
op->data.metadata.list.head = &buffer->elems[mdidx];
op->data.metadata.list.tail = &buffer->elems[last_mdidx - 1];
for (j = mdidx + 1; j < last_mdidx; j++) {
stream_parsing->incoming_metadata[j].prev =
&stream_parsing->incoming_metadata[j - 1];
stream_parsing->incoming_metadata[j - 1].next =
&stream_parsing->incoming_metadata[j];
buffer->elems[j].prev = &buffer->elems[j - 1];
buffer->elems[j - 1].next = &buffer->elems[j];
}
stream_parsing->incoming_metadata[mdidx].prev = NULL;
stream_parsing->incoming_metadata[last_mdidx - 1].next = NULL;
buffer->elems[mdidx].prev = NULL;
buffer->elems[last_mdidx - 1].next = NULL;
/* track where we're up to */
mdidx = last_mdidx;
}
if (found_metadata) {
stream_parsing->old_incoming_metadata = stream_parsing->incoming_metadata;
if (mdidx != stream_parsing->incoming_metadata_count) {
live_op_buffer->elems = buffer->elems;
if (mdidx != buffer->count) {
/* we have a partially read metadata batch still in incoming_metadata */
size_t new_count = stream_parsing->incoming_metadata_count - mdidx;
size_t copy_bytes =
sizeof(*stream_parsing->incoming_metadata) * new_count;
GPR_ASSERT(mdidx < stream_parsing->incoming_metadata_count);
stream_parsing->incoming_metadata = gpr_malloc(copy_bytes);
memcpy(stream_parsing->old_incoming_metadata + mdidx,
stream_parsing->incoming_metadata, copy_bytes);
stream_parsing->incoming_metadata_count =
stream_parsing->incoming_metadata_capacity = new_count;
size_t new_count = buffer->count - mdidx;
size_t copy_bytes = sizeof(*buffer->elems) * new_count;
GPR_ASSERT(mdidx < buffer->count);
buffer->elems = gpr_malloc(copy_bytes);
memcpy(live_op_buffer->elems + mdidx, buffer->elems, copy_bytes);
buffer->count = buffer->capacity = new_count;
} else {
stream_parsing->incoming_metadata = NULL;
stream_parsing->incoming_metadata_count = 0;
stream_parsing->incoming_metadata_capacity = 0;
buffer->elems = NULL;
buffer->count = 0;
buffer->capacity = 0;
}
}
}
#if 0
void grpc_chttp2_parsing_add_metadata_batch(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
grpc_metadata_batch b;
b.list.head = NULL;
/* Store away the last element of the list, so that in patch_metadata_ops
we can reconstitute the list.
We can't do list building here as later incoming metadata may reallocate
the underlying array. */
b.list.tail = (void *)(gpr_intptr)stream_parsing->incoming_metadata_count;
b.garbage.head = b.garbage.tail = NULL;
b.deadline = stream_parsing->incoming_deadline;
stream_parsing->incoming_deadline = gpr_inf_future;
grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b);
}
#endif
#if 0
static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global,
grpc_chttp2_stream_parsing *stream_parsing) {
}
#endif

@ -48,21 +48,29 @@ typedef struct {
} grpc_chttp2_incoming_metadata_live_op_buffer;
/** assumes everything initially zeroed */
void grpc_chttp2_incoming_metadata_buffer_init(grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_destroy(grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_reset(grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_init(
grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_reset(
grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_add(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem);
void grpc_chttp2_incoming_metadata_buffer_set_deadline(grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline);
void grpc_chttp2_incoming_metadata_buffer_add(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem);
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline);
/** extend sopb with a metadata batch; this must be post-processed by
grpc_chttp2_incoming_metadata_buffer_postprocess_sopb before being handed
out of the transport */
void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb);
void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb);
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb, grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer);
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb,
grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer);
void grpc_chttp2_incoming_metadata_live_op_buffer_end(grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer);
void grpc_chttp2_incoming_metadata_live_op_buffer_end(
grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer);
#endif /* GRPC_INTERNAL_CORE_CHTTP2_INCOMING_METADATA_H */

@ -88,7 +88,7 @@ typedef enum {
PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE,
OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE,
NEW_OUTGOING_WINDOW,
#endif
#endif
STREAM_LIST_COUNT /* must be last */
} grpc_chttp2_stream_list_id;
@ -466,13 +466,13 @@ struct grpc_chttp2_stream_parsing {
/** incoming metadata */
grpc_chttp2_incoming_metadata_buffer incoming_metadata;
/*
grpc_linked_mdelem *incoming_metadata;
size_t incoming_metadata_count;
size_t incoming_metadata_capacity;
grpc_linked_mdelem *old_incoming_metadata;
gpr_timespec incoming_deadline;
*/
/*
grpc_linked_mdelem *incoming_metadata;
size_t incoming_metadata_count;
size_t incoming_metadata_capacity;
grpc_linked_mdelem *old_incoming_metadata;
gpr_timespec incoming_deadline;
*/
};
struct grpc_chttp2_stream {
@ -599,14 +599,22 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
gpr_slice goaway_text);
void grpc_chttp2_add_incoming_goaway(
grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
gpr_slice goaway_text);
void grpc_chttp2_remove_from_stream_map(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
void grpc_chttp2_remove_from_stream_map(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
void grpc_chttp2_for_all_streams(grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global));
void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
void grpc_chttp2_for_all_streams(
grpc_chttp2_transport_global *transport_global, void *user_data,
void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data,
grpc_chttp2_stream_global *stream_global));
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \

@ -61,8 +61,7 @@ static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing,
gpr_slice slice, int is_last);
void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing) {
}
grpc_chttp2_transport_parsing *parsing) {}
void grpc_chttp2_publish_reads(
grpc_chttp2_transport_global *transport_global,
@ -134,7 +133,9 @@ void grpc_chttp2_publish_reads(
/* move goaway to the global state if we received one (it will be
published later */
if (transport_parsing->goaway_received) {
grpc_chttp2_add_incoming_goaway(transport_global, transport_parsing->goaway_error, transport_parsing->goaway_text);
grpc_chttp2_add_incoming_goaway(transport_global,
transport_parsing->goaway_error,
transport_parsing->goaway_text);
transport_parsing->goaway_received = 0;
}
@ -164,11 +165,13 @@ void grpc_chttp2_publish_reads(
/* updating closed status */
if (stream_parsing->received_close) {
stream_global->read_closed = 1;
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
if (stream_parsing->saw_rst_stream) {
stream_global->cancelled = 1;
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
}
}
@ -486,10 +489,10 @@ static int init_data_frame_parser(
stream_parsing->received_close = 1;
stream_parsing->saw_rst_stream = 1;
stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR;
gpr_slice_buffer_add(&transport_parsing->qbuf,
grpc_chttp2_rst_stream_create(
transport_parsing->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR));
gpr_slice_buffer_add(
&transport_parsing->qbuf,
grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR));
return init_skip_frame_parser(transport_parsing, 0);
case GRPC_CHTTP2_CONNECTION_ERROR:
return 0;
@ -526,10 +529,13 @@ static void on_header(void *tp, grpc_mdelem *md) {
}
grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
}
grpc_chttp2_incoming_metadata_buffer_set_deadline(&stream_parsing->incoming_metadata, gpr_time_add(gpr_now(), *cached_timeout));
grpc_chttp2_incoming_metadata_buffer_set_deadline(
&stream_parsing->incoming_metadata,
gpr_time_add(gpr_now(), *cached_timeout));
grpc_mdelem_unref(md);
} else {
grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata, md);
grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata,
md);
}
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
@ -711,10 +717,10 @@ static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing,
if (stream_parsing) {
stream_parsing->saw_rst_stream = 1;
stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR;
gpr_slice_buffer_add(&transport_parsing->qbuf,
grpc_chttp2_rst_stream_create(
transport_parsing->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR));
gpr_slice_buffer_add(
&transport_parsing->qbuf,
grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR));
}
return 1;
case GRPC_CHTTP2_CONNECTION_ERROR:

@ -35,29 +35,26 @@
#include <grpc/support/log.h>
#define TRANSPORT_FROM_GLOBAL(tg) \
#define TRANSPORT_FROM_GLOBAL(tg) \
((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \
global)))
#define STREAM_FROM_GLOBAL(sg) \
((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, \
global)))
((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global)))
#define TRANSPORT_FROM_WRITING(tw) \
#define TRANSPORT_FROM_WRITING(tw) \
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
writing)))
#define STREAM_FROM_WRITING(sw) \
((grpc_chttp2_stream *)((char *)(sw)-offsetof(grpc_chttp2_stream, \
writing)))
((grpc_chttp2_stream *)((char *)(sw)-offsetof(grpc_chttp2_stream, writing)))
#define TRANSPORT_FROM_PARSING(tp) \
#define TRANSPORT_FROM_PARSING(tp) \
((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \
parsing)))
#define STREAM_FROM_PARSING(sp) \
((grpc_chttp2_stream *)((char *)(sp)-offsetof(grpc_chttp2_stream, \
parsing)))
((grpc_chttp2_stream *)((char *)(sp)-offsetof(grpc_chttp2_stream, parsing)))
/* core list management */
@ -66,8 +63,9 @@ static int stream_list_empty(grpc_chttp2_transport *t,
return t->lists[id].head == NULL;
}
static int stream_list_pop(
grpc_chttp2_transport *t, grpc_chttp2_stream **stream, grpc_chttp2_stream_list_id id) {
static int stream_list_pop(grpc_chttp2_transport *t,
grpc_chttp2_stream **stream,
grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *s = t->lists[id].head;
if (s) {
grpc_chttp2_stream *new_head = s->links[id].next;
@ -121,7 +119,7 @@ static void stream_list_add_tail(grpc_chttp2_transport *t,
}
static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
return;
}
@ -133,7 +131,8 @@ static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
void grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE);
}
int grpc_chttp2_list_pop_writable_stream(
@ -142,7 +141,8 @@ int grpc_chttp2_list_pop_writable_stream(
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WRITABLE);
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_WRITABLE);
*stream_global = &stream->global;
*stream_writing = &stream->writing;
return r;
@ -151,19 +151,23 @@ int grpc_chttp2_list_pop_writable_stream(
void grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), GRPC_CHTTP2_LIST_WRITING);
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_WRITING);
}
int grpc_chttp2_list_have_writing_streams(
grpc_chttp2_transport_writing *transport_writing) {
return stream_list_empty(TRANSPORT_FROM_WRITING(transport_writing), GRPC_CHTTP2_LIST_WRITING);
return stream_list_empty(TRANSPORT_FROM_WRITING(transport_writing),
GRPC_CHTTP2_LIST_WRITING);
}
int grpc_chttp2_list_pop_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, GRPC_CHTTP2_LIST_WRITING);
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream,
GRPC_CHTTP2_LIST_WRITING);
*stream_writing = &stream->writing;
return r;
}
@ -171,7 +175,9 @@ int grpc_chttp2_list_pop_writing_stream(
void grpc_chttp2_list_add_written_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), GRPC_CHTTP2_LIST_WRITTEN);
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_WRITTEN);
}
int grpc_chttp2_list_pop_written_stream(
@ -180,7 +186,8 @@ int grpc_chttp2_list_pop_written_stream(
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, GRPC_CHTTP2_LIST_WRITTEN);
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream,
GRPC_CHTTP2_LIST_WRITTEN);
*stream_writing = &stream->writing;
return r;
}
@ -188,14 +195,17 @@ int grpc_chttp2_list_pop_written_stream(
void grpc_chttp2_list_add_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
}
int grpc_chttp2_list_pop_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
*stream_global = &stream->global;
return r;
}
@ -203,7 +213,9 @@ int grpc_chttp2_list_pop_writable_window_update_stream(
void grpc_chttp2_list_add_parsing_seen_stream(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
stream_list_add(TRANSPORT_FROM_PARSING(transport_parsing), STREAM_FROM_PARSING(stream_parsing), GRPC_CHTTP2_LIST_PARSING_SEEN);
stream_list_add(TRANSPORT_FROM_PARSING(transport_parsing),
STREAM_FROM_PARSING(stream_parsing),
GRPC_CHTTP2_LIST_PARSING_SEEN);
}
int grpc_chttp2_list_pop_parsing_seen_stream(
@ -212,7 +224,8 @@ int grpc_chttp2_list_pop_parsing_seen_stream(
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_parsing **stream_parsing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_PARSING(transport_parsing), &stream, GRPC_CHTTP2_LIST_PARSING_SEEN);
int r = stream_list_pop(TRANSPORT_FROM_PARSING(transport_parsing), &stream,
GRPC_CHTTP2_LIST_PARSING_SEEN);
*stream_global = &stream->global;
*stream_parsing = &stream->parsing;
return r;
@ -221,14 +234,17 @@ int grpc_chttp2_list_pop_parsing_seen_stream(
void grpc_chttp2_list_add_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
*stream_global = &stream->global;
return r;
}
@ -236,14 +252,17 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
void grpc_chttp2_list_add_cancelled_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING);
}
int grpc_chttp2_list_pop_cancelled_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING);
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING);
*stream_global = &stream->global;
return r;
}
@ -251,26 +270,38 @@ int grpc_chttp2_list_pop_cancelled_waiting_for_parsing(
void grpc_chttp2_list_add_read_write_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
}
void grpc_chttp2_list_add_incoming_window_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_INCOMING_WINDOW_STATE_CHANGED);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_INCOMING_WINDOW_STATE_CHANGED);
}
void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
void grpc_chttp2_for_all_streams(grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global)) {
void grpc_chttp2_for_all_streams(
grpc_chttp2_transport_global *transport_global, void *user_data,
void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data,
grpc_chttp2_stream_global *stream_global)) {
grpc_chttp2_stream *s;
for (s = TRANSPORT_FROM_GLOBAL(transport_global)->lists[GRPC_CHTTP2_LIST_ALL_STREAMS].head; s; s = s->links[GRPC_CHTTP2_LIST_ALL_STREAMS].next) {
for (s = TRANSPORT_FROM_GLOBAL(transport_global)
->lists[GRPC_CHTTP2_LIST_ALL_STREAMS]
.head;
s; s = s->links[GRPC_CHTTP2_LIST_ALL_STREAMS].next) {
cb(transport_global, user_data, &s->global);
}
}

@ -96,7 +96,8 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, gpr_uint32 key,
map->count = count + 1;
}
void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, grpc_chttp2_stream_map *dst) {
void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src,
grpc_chttp2_stream_map *dst) {
/* if src is empty we dont need to do anything */
if (src->count == src->free) {
return;
@ -115,7 +116,8 @@ void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, grpc_chttp2_s
/* the first element of src must be greater than the last of dst */
GPR_ASSERT(src->keys[0] > dst->keys[dst->count - 1]);
memcpy(dst->keys + dst->count, src->keys, src->count * sizeof(gpr_uint32));
memcpy(dst->values + dst->count, src->values, src->count * sizeof(gpr_uint32));
memcpy(dst->values + dst->count, src->values,
src->count * sizeof(gpr_uint32));
dst->count += src->count;
dst->free += src->free;
src->count = 0;

@ -192,7 +192,8 @@ void grpc_chttp2_cleanup_writing(
if (!transport_global->is_client) {
stream_global->read_closed = 1;
}
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
}
transport_writing->outbuf.count = 0;

@ -66,13 +66,12 @@ int grpc_flowctl_trace = 0;
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
parsing)))
#define TRANSPORT_FROM_GLOBAL(tg) \
#define TRANSPORT_FROM_GLOBAL(tg) \
((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \
global)))
#define STREAM_FROM_GLOBAL(sg) \
((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, \
global)))
((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global)))
static const grpc_transport_vtable vtable;
@ -98,13 +97,14 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
static void drop_connection(grpc_chttp2_transport *t);
/** Perform a transport_op */
static void perform_op_locked(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_op *op);
static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_transport_op *op);
/** Cancel a stream: coming from the transport API */
static void cancel_from_api(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status);
static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status);
/** Add endpoint from this transport to pollset */
static void add_to_pollset_locked(grpc_chttp2_transport *t,
@ -394,7 +394,8 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) {
grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
grpc_chttp2_stream *s = grpc_chttp2_stream_map_find(&t->parsing_stream_map, id);
grpc_chttp2_stream *s =
grpc_chttp2_stream_map_find(&t->parsing_stream_map, id);
return &s->parsing;
}
@ -404,7 +405,8 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
GPR_ASSERT(t->accepting_stream == NULL);
t->accepting_stream = &accepting;
t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, &t->base, (void *)(gpr_uintptr)id);
t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data,
&t->base, (void *)(gpr_uintptr)id);
t->accepting_stream = NULL;
return &accepting->parsing;
}
@ -508,8 +510,9 @@ static void writing_action(void *gt, int iomgr_success_ignored) {
grpc_chttp2_perform_writes(&t->writing, t->ep);
}
void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
gpr_slice goaway_text) {
void grpc_chttp2_add_incoming_goaway(
grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
gpr_slice goaway_text) {
if (transport_global->goaway_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
transport_global->goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
transport_global->goaway_text = goaway_text;
@ -519,18 +522,21 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_glo
}
}
static void maybe_start_some_streams(grpc_chttp2_transport_global *transport_global) {
static void maybe_start_some_streams(
grpc_chttp2_transport_global *transport_global) {
grpc_chttp2_stream_global *stream_global;
/* start streams where we have free grpc_chttp2_stream ids and free
* concurrency */
while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID &&
transport_global->concurrent_stream_count <
transport_global->settings[PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) {
IF_TRACING(gpr_log(
GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
transport_global->is_client ? "CLI" : "SVR", stream_global, transport_global->next_stream_id));
transport_global->settings
[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) {
IF_TRACING(gpr_log(GPR_DEBUG,
"HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
transport_global->is_client ? "CLI" : "SVR",
stream_global, transport_global->next_stream_id));
if (transport_global->next_stream_id == MAX_CLIENT_STREAM_ID) {
grpc_chttp2_add_incoming_goaway(
@ -545,20 +551,25 @@ static void maybe_start_some_streams(grpc_chttp2_transport_global *transport_glo
transport_global
->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
stream_global->incoming_window =
transport_global->
settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
grpc_chttp2_stream_map_add(&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, stream_global->id, STREAM_FROM_GLOBAL(stream_global));
transport_global
->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
grpc_chttp2_stream_map_add(
&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
transport_global->concurrent_stream_count++;
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id > MAX_CLIENT_STREAM_ID &&
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) {
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) {
cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE);
}
}
static void perform_op_locked(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_op *op) {
static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_transport_op *op) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_from_api(transport_global, stream_global, op->cancel_with_status);
}
@ -572,19 +583,20 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, gr
stream_global->write_state = WRITE_STATE_QUEUED_CLOSE;
}
if (stream_global->id == 0) {
IF_TRACING(gpr_log(GPR_DEBUG,
"HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency",
transport_global->is_client ? "CLI" : "SVR", stream_global));
grpc_chttp2_list_add_waiting_for_concurrency(
transport_global, stream_global
);
IF_TRACING(gpr_log(
GPR_DEBUG,
"HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency",
transport_global->is_client ? "CLI" : "SVR", stream_global));
grpc_chttp2_list_add_waiting_for_concurrency(transport_global,
stream_global);
maybe_start_some_streams(transport_global);
} else if (stream_global->outgoing_window > 0) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
} else {
grpc_sopb_reset(op->send_ops);
grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 0);
grpc_chttp2_schedule_closure(transport_global,
stream_global->send_done_closure, 0);
}
}
@ -594,13 +606,17 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, gr
stream_global->recv_done_closure = op->on_done_recv;
stream_global->incoming_sopb = op->recv_ops;
stream_global->incoming_sopb->nops = 0;
grpc_chttp2_incoming_metadata_live_op_buffer_end(&stream_global->outstanding_metadata);
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_incoming_window_state_changed(transport_global, stream_global);
grpc_chttp2_incoming_metadata_live_op_buffer_end(
&stream_global->outstanding_metadata);
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
grpc_chttp2_list_add_incoming_window_state_changed(transport_global,
stream_global);
}
if (op->bind_pollset) {
add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global), op->bind_pollset);
add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global),
op->bind_pollset);
}
if (op->on_consumed) {
@ -651,11 +667,13 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) {
we are not parsing before continuing the cancellation to keep things in
a sane state */
if (!t->parsing_active) {
while (grpc_chttp2_list_pop_cancelled_waiting_for_parsing(transport_global, &stream_global)) {
while (grpc_chttp2_list_pop_cancelled_waiting_for_parsing(transport_global,
&stream_global)) {
GPR_ASSERT(stream_global->in_stream_map);
grpc_chttp2_stream_map_delete(&t->parsing_stream_map, stream_global->id);
stream_global->in_stream_map = 0;
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
}
@ -674,17 +692,18 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) {
#endif
}
static void cancel_from_api(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status) {
static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status) {
stream_global->cancelled = 1;
if (stream_global->in_stream_map) {
gpr_slice_buffer_add(&transport_global->qbuf,
grpc_chttp2_rst_stream_create(stream_global->id,
grpc_chttp2_grpc_status_to_http2_status(status)));
grpc_chttp2_rst_stream_create(
stream_global->id,
grpc_chttp2_grpc_status_to_http2_status(status)));
} else {
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
}
@ -773,7 +792,9 @@ static void cancel_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
}
#endif
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global) {
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {
cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE);
}
@ -848,8 +869,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
t->parsing_active = 1;
grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
gpr_mu_unlock(&t->mu);
for (;
i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
i++) {
gpr_slice_unref(slices[i]);
}
@ -954,7 +974,7 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
return;
}
if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NONE) {
if (t->global.goaway_state == GRPC_CHTTP2_ERROR_STATE_SEEN &&
if (t->global.goaway_state == GRPC_CHTTP2_ERROR_STATE_SEEN &&
t->global.error_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) {
notify_goaways_args *a = gpr_malloc(sizeof(*a));
a->error = t->global.goaway_error;
@ -973,12 +993,14 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
t->channel_callback.executing = 1;
ref_transport(t);
grpc_chttp2_schedule_closure(&t->global, &t->channel_callback.notify_closed, 1);
grpc_chttp2_schedule_closure(&t->global, &t->channel_callback.notify_closed,
1);
}
}
void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success) {
void grpc_chttp2_schedule_closure(
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success) {
closure->success = success;
closure->next = transport_global->pending_closures;
transport_global->pending_closures = closure;

Loading…
Cancel
Save