Remove global list of streams from transport

pull/7793/head
Craig Tiller 9 years ago
parent d2e161413d
commit 032b515d56
  1. 35
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 13
      src/core/ext/transport/chttp2/transport/internal.h
  3. 27
      src/core/ext/transport/chttp2/transport/stream_lists.c

@ -465,13 +465,6 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
}
#endif
static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_error *error) {
grpc_chttp2_stream *s = sp;
grpc_chttp2_register_stream(s->t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "init");
}
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data) {
@ -507,13 +500,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s);
grpc_chttp2_register_stream(t, s);
s->global.in_stream_map = true;
} else {
grpc_closure_init(&s->init_stream, finish_init_stream_locked, s);
GRPC_CHTTP2_STREAM_REF(&s->global, "init");
grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->init_stream,
GRPC_ERROR_NONE);
}
GPR_TIMER_END("init_stream", 0);
@ -532,11 +519,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_ASSERT((s->global.write_closed && s->global.read_closed) ||
s->global.id == 0);
GPR_ASSERT(!s->global.in_stream_map);
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
close_transport_locked(
exec_ctx, t,
GRPC_ERROR_CREATE("Last stream closed after sending goaway"));
}
if (s->global.id != 0) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->global.id) ==
NULL);
@ -1254,7 +1236,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
t->global.last_incoming_stream_id,
(uint32_t)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
gpr_slice_ref(*op->goaway_message), &t->global.qbuf);
close_transport = grpc_chttp2_has_streams(t)
close_transport = grpc_chttp2_stream_map_size(&t->stream_map) == 0
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("GOAWAY sent");
grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "goaway_sent");
@ -1408,7 +1390,8 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s->global.data_parser.parsing_frame = NULL;
}
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0 &&
t->global.sent_goaway) {
close_transport_locked(
exec_ctx, t, GRPC_ERROR_CREATE_REFERENCING(
"Last stream closed after sending GOAWAY", &error, 1));
@ -1738,20 +1721,20 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
typedef struct {
grpc_exec_ctx *exec_ctx;
grpc_error *error;
grpc_chttp2_transport *t;
} cancel_stream_cb_args;
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {
static void cancel_stream_cb(void *user_data, uint32_t key, void *stream) {
cancel_stream_cb_args *args = user_data;
cancel_from_api(args->exec_ctx, transport_global, stream_global,
grpc_chttp2_stream *s = stream;
cancel_from_api(args->exec_ctx, &args->t->global, &s->global,
GRPC_ERROR_REF(args->error));
}
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error) {
cancel_stream_cb_args args = {exec_ctx, error};
grpc_chttp2_for_all_streams(&t->global, &args, cancel_stream_cb);
cancel_stream_cb_args args = {exec_ctx, error, t};
grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
GRPC_ERROR_UNREF(error);
}

@ -59,7 +59,6 @@ typedef struct grpc_chttp2_stream grpc_chttp2_stream;
/* streams are kept in various linked lists depending on what things need to
happen to them... this enum labels each list */
typedef enum {
GRPC_CHTTP2_LIST_ALL_STREAMS,
GRPC_CHTTP2_LIST_CHECK_READ_OPS,
GRPC_CHTTP2_LIST_WRITABLE,
GRPC_CHTTP2_LIST_WRITING,
@ -475,7 +474,6 @@ struct grpc_chttp2_stream {
grpc_chttp2_stream_global global;
grpc_chttp2_stream_writing writing;
grpc_closure init_stream;
grpc_closure destroy_stream;
void *destroy_stream_arg;
@ -601,17 +599,6 @@ void grpc_chttp2_add_incoming_goaway(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
uint32_t goaway_error, gpr_slice goaway_text);
void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
/* returns 1 if this is the last stream, 0 otherwise */
int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT;
int grpc_chttp2_has_streams(grpc_chttp2_transport *t);
void grpc_chttp2_for_all_streams(
grpc_chttp2_transport_global *transport_global, void *user_data,
void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data,
grpc_chttp2_stream_global *stream_global));
void grpc_chttp2_parsing_become_skip_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);

@ -353,30 +353,3 @@ int grpc_chttp2_list_pop_closed_waiting_for_writing(
}
return r;
}
void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
return stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
int grpc_chttp2_has_streams(grpc_chttp2_transport *t) {
return !stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
void grpc_chttp2_for_all_streams(
grpc_chttp2_transport_global *transport_global, void *user_data,
void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data,
grpc_chttp2_stream_global *stream_global)) {
grpc_chttp2_stream *s;
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
for (s = t->lists[GRPC_CHTTP2_LIST_ALL_STREAMS].head; s != NULL;
s = s->links[GRPC_CHTTP2_LIST_ALL_STREAMS].next) {
cb(transport_global, user_data, &s->global);
}
}

Loading…
Cancel
Save