New chttp2 list implementation

pull/2149/head
Craig Tiller 10 years ago
parent b787c50dec
commit 6459db484a
  1. 2
      BUILD
  2. 2
      Makefile
  3. 1
      build.json
  4. 24
      src/core/transport/chttp2/internal.h
  5. 4
      src/core/transport/chttp2/parsing.c
  6. 177
      src/core/transport/chttp2/stream_lists.c
  7. 2
      src/core/transport/chttp2/writing.c
  8. 12
      src/core/transport/chttp2_transport.c
  9. 2
      tools/doxygen/Doxyfile.core.internal
  10. 2
      vsprojects/grpc/grpc.vcxproj
  11. 3
      vsprojects/grpc/grpc.vcxproj.filters
  12. 2
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
  13. 3
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -337,6 +337,7 @@ cc_library(
"src/core/transport/chttp2/parsing.c",
"src/core/transport/chttp2/status_conversion.c",
"src/core/transport/chttp2/stream_encoder.c",
"src/core/transport/chttp2/stream_lists.c",
"src/core/transport/chttp2/stream_map.c",
"src/core/transport/chttp2/timeout_encoding.c",
"src/core/transport/chttp2/varint.c",
@ -543,6 +544,7 @@ cc_library(
"src/core/transport/chttp2/parsing.c",
"src/core/transport/chttp2/status_conversion.c",
"src/core/transport/chttp2/stream_encoder.c",
"src/core/transport/chttp2/stream_lists.c",
"src/core/transport/chttp2/stream_map.c",
"src/core/transport/chttp2/timeout_encoding.c",
"src/core/transport/chttp2/varint.c",

@ -3033,6 +3033,7 @@ LIBGRPC_SRC = \
src/core/transport/chttp2/parsing.c \
src/core/transport/chttp2/status_conversion.c \
src/core/transport/chttp2/stream_encoder.c \
src/core/transport/chttp2/stream_lists.c \
src/core/transport/chttp2/stream_map.c \
src/core/transport/chttp2/timeout_encoding.c \
src/core/transport/chttp2/varint.c \
@ -3282,6 +3283,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/transport/chttp2/parsing.c \
src/core/transport/chttp2/status_conversion.c \
src/core/transport/chttp2/stream_encoder.c \
src/core/transport/chttp2/stream_lists.c \
src/core/transport/chttp2/stream_map.c \
src/core/transport/chttp2/timeout_encoding.c \
src/core/transport/chttp2/varint.c \

@ -277,6 +277,7 @@
"src/core/transport/chttp2/parsing.c",
"src/core/transport/chttp2/status_conversion.c",
"src/core/transport/chttp2/stream_encoder.c",
"src/core/transport/chttp2/stream_lists.c",
"src/core/transport/chttp2/stream_map.c",
"src/core/transport/chttp2/timeout_encoding.c",
"src/core/transport/chttp2/varint.c",

@ -55,7 +55,14 @@ typedef struct grpc_chttp2_stream grpc_chttp2_stream;
happen to them... this enum labels each list */
typedef enum {
GRPC_CHTTP2_LIST_ALL_STREAMS,
GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED,
GRPC_CHTTP2_LIST_INCOMING_WINDOW_STATE_CHANGED,
GRPC_CHTTP2_LIST_WRITABLE,
GRPC_CHTTP2_LIST_WRITING,
GRPC_CHTTP2_LIST_WRITTEN,
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE,
GRPC_CHTTP2_LIST_PARSING_SEEN,
GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
@ -574,16 +581,18 @@ int grpc_chttp2_list_pop_cancelled_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_schedule_closure(
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success);
void grpc_chttp2_read_write_state_changed(
void grpc_chttp2_list_add_read_write_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
void grpc_chttp2_incoming_window_state_changed(
void grpc_chttp2_list_add_incoming_window_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
void grpc_chttp2_schedule_closure(
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
@ -594,11 +603,10 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport_global *transport_glo
void grpc_chttp2_remove_from_stream_map(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s);
void grpc_chttp2_for_all_streams(grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global));
void grpc_chttp2_flowctl_trace(grpc_chttp2_transport *t, const char *flow,
gpr_int32 window, gpr_uint32 id, gpr_int32 delta);
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)

@ -160,11 +160,11 @@ void grpc_chttp2_publish_reads(
/* updating closed status */
if (stream_parsing->received_close) {
stream_global->read_closed = 1;
grpc_chttp2_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
}
if (stream_parsing->saw_rst_stream) {
stream_global->cancelled = 1;
grpc_chttp2_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
}
}
}

@ -31,6 +31,34 @@
*
*/
#include "src/core/transport/chttp2/internal.h"
#include <grpc/support/log.h>
#define TRANSPORT_FROM_GLOBAL(tg) \
((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \
global)))
#define STREAM_FROM_GLOBAL(sg) \
((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, \
global)))
#define TRANSPORT_FROM_WRITING(tw) \
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
writing)))
#define STREAM_FROM_WRITING(sw) \
((grpc_chttp2_stream *)((char *)(sw)-offsetof(grpc_chttp2_stream, \
writing)))
#define TRANSPORT_FROM_PARSING(tp) \
((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \
parsing)))
#define STREAM_FROM_PARSING(sp) \
((grpc_chttp2_stream *)((char *)(sp)-offsetof(grpc_chttp2_stream, \
parsing)))
/* core list management */
static int stream_list_empty(grpc_chttp2_transport *t,
@ -38,8 +66,8 @@ static int stream_list_empty(grpc_chttp2_transport *t,
return t->lists[id].head == NULL;
}
static grpc_chttp2_stream *stream_list_remove_head(
grpc_chttp2_transport *t, grpc_chttp2_stream_list_id id) {
static int stream_list_pop(
grpc_chttp2_transport *t, grpc_chttp2_stream **stream, grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *s = t->lists[id].head;
if (s) {
grpc_chttp2_stream *new_head = s->links[id].next;
@ -53,12 +81,13 @@ static grpc_chttp2_stream *stream_list_remove_head(
}
s->included[id] = 0;
}
return s;
*stream = s;
return s != 0;
}
static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (!s->included[id]) return;
GPR_ASSERT(s->included[id]);
s->included[id] = 0;
if (s->links[id].prev) {
s->links[id].prev->links[id].next = s->links[id].next;
@ -91,7 +120,7 @@ static void stream_list_add_tail(grpc_chttp2_transport *t,
s->included[id] = 1;
}
static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
return;
@ -99,3 +128,141 @@ static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
stream_list_add_tail(t, s, id);
}
/* wrappers for specializations */
void grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE);
}
int grpc_chttp2_list_pop_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WRITABLE);
*stream_global = &stream->global;
*stream_writing = &stream->writing;
return r;
}
void grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), GRPC_CHTTP2_LIST_WRITING);
}
int grpc_chttp2_list_have_writing_streams(
grpc_chttp2_transport_writing *transport_writing) {
return stream_list_empty(TRANSPORT_FROM_WRITING(transport_writing), GRPC_CHTTP2_LIST_WRITING);
}
int grpc_chttp2_list_pop_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, GRPC_CHTTP2_LIST_WRITING);
*stream_writing = &stream->writing;
return r;
}
void grpc_chttp2_list_add_written_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), GRPC_CHTTP2_LIST_WRITTEN);
}
int grpc_chttp2_list_pop_written_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream, GRPC_CHTTP2_LIST_WRITTEN);
*stream_writing = &stream->writing;
return r;
}
void grpc_chttp2_list_add_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
}
int grpc_chttp2_list_pop_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
*stream_global = &stream->global;
return r;
}
void grpc_chttp2_list_add_parsing_seen_stream(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
stream_list_add(TRANSPORT_FROM_PARSING(transport_parsing), STREAM_FROM_PARSING(stream_parsing), GRPC_CHTTP2_LIST_PARSING_SEEN);
}
int grpc_chttp2_list_pop_parsing_seen_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_parsing **stream_parsing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_PARSING(transport_parsing), &stream, GRPC_CHTTP2_LIST_PARSING_SEEN);
*stream_global = &stream->global;
*stream_parsing = &stream->parsing;
return r;
}
void grpc_chttp2_list_add_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
*stream_global = &stream->global;
return r;
}
void grpc_chttp2_list_add_cancelled_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING);
}
int grpc_chttp2_list_pop_cancelled_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING);
*stream_global = &stream->global;
return r;
}
void grpc_chttp2_list_add_read_write_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
}
void grpc_chttp2_list_add_incoming_window_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_INCOMING_WINDOW_STATE_CHANGED);
}
void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
}

@ -192,7 +192,7 @@ void grpc_chttp2_cleanup_writing(
if (!transport_global->is_client) {
stream_global->read_closed = 1;
}
grpc_chttp2_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
}
}
transport_writing->outbuf.count = 0;

@ -378,6 +378,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
ref_transport(t);
lock(t);
grpc_chttp2_register_stream(t, s);
if (server_data) {
GPR_ASSERT(t->parsing_active);
s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
@ -405,6 +406,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
s->global.id == 0);
grpc_chttp2_unregister_stream(t, s);
gpr_mu_unlock(&t->mu);
@ -604,8 +606,8 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, gr
stream_global->incoming_sopb = op->recv_ops;
stream_global->incoming_sopb->nops = 0;
grpc_chttp2_incoming_metadata_live_op_buffer_end(&stream_global->outstanding_metadata);
grpc_chttp2_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_incoming_window_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_incoming_window_state_changed(transport_global, stream_global);
}
if (op->bind_pollset) {
@ -664,7 +666,7 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) {
GPR_ASSERT(stream_global->in_stream_map);
grpc_chttp2_stream_map_delete(&t->parsing_stream_map, stream_global->id);
stream_global->in_stream_map = 0;
grpc_chttp2_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
}
}
@ -678,7 +680,7 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) {
while ((s = stream_list_remove_head(t, CANCELLED))) {
s->global.read_closed = 1;
s->global.write_state = WRITE_STATE_SENT_CLOSE;
grpc_chttp2_read_write_state_changed(&t->global, &s->global);
grpc_chttp2_list_add_read_write_state_changed(&t->global, &s->global);
}
#endif
}
@ -693,7 +695,7 @@ static void cancel_from_api(
grpc_chttp2_rst_stream_create(stream_global->id,
grpc_chttp2_grpc_status_to_http2_status(status)));
} else {
grpc_chttp2_read_write_state_changed(transport_global, stream_global);
grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global);
}
}

File diff suppressed because one or more lines are too long

@ -477,6 +477,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\stream_encoder.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\stream_lists.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\stream_map.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\timeout_encoding.c">

@ -331,6 +331,9 @@
<ClCompile Include="..\..\src\core\transport\chttp2\stream_encoder.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\stream_lists.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\stream_map.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>

@ -415,6 +415,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\stream_encoder.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\stream_lists.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\stream_map.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\timeout_encoding.c">

@ -265,6 +265,9 @@
<ClCompile Include="..\..\src\core\transport\chttp2\stream_encoder.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\stream_lists.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\chttp2\stream_map.c">
<Filter>src\core\transport\chttp2</Filter>
</ClCompile>

Loading…
Cancel
Save