From 25f29afa8488b6c714d15ca46ff8d17df420233b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 26 Sep 2016 16:31:00 -0700 Subject: [PATCH] Beginning integration of buffer pools && chttp2 --- .../chttp2/transport/chttp2_transport.c | 108 +++++++++++++++--- .../ext/transport/chttp2/transport/internal.h | 20 +++- .../transport/chttp2/transport/stream_map.h | 3 + 3 files changed, 117 insertions(+), 14 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 77ae890c066..fcfe6e2955e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -118,6 +118,15 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error); +static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); +static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); +static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); +static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); + /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -240,6 +249,10 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t); grpc_closure_init(&t->read_action_begin, read_action_begin, t); grpc_closure_init(&t->read_action_locked, read_action_locked, t); + grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t); + grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t); + grpc_closure_init(&t->benign_reclaimer, benign_reclaimer_locked, t); + grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer_locked, t); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(&t->hpack_parser); @@ -645,6 +658,13 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); } + if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { + t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; + if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE("goaway sent")); + } + } + grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); switch (t->write_state) { @@ -1155,6 +1175,14 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_free(msg); } +static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_error_code error, gpr_slice data) { + t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED; + grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)error, data, + &t->qbuf); + grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent"); +} + static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_error *error_ignored) { @@ -1169,15 +1197,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->send_goaway) { - t->sent_goaway = 1; - grpc_chttp2_goaway_append( - t->last_new_stream_id, - (uint32_t)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), - gpr_slice_ref(*op->goaway_message), &t->qbuf); - close_transport = grpc_chttp2_stream_map_size(&t->stream_map) == 0 - ? GRPC_ERROR_CREATE("GOAWAY sent") - : GRPC_ERROR_NONE; - grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent"); + send_goaway(exec_ctx, t, + grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), + gpr_slice_ref(*op->goaway_message)); } if (op->set_accept_stream) { @@ -1314,10 +1336,19 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->data_parser.parsing_frame = NULL; } - if (grpc_chttp2_stream_map_size(&t->stream_map) == 0 && t->sent_goaway) { - close_transport_locked( - exec_ctx, t, GRPC_ERROR_CREATE_REFERENCING( - "Last stream closed after sending GOAWAY", &error, 1)); + if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + if (!t->benign_reclaimer_registered) { + t->benign_reclaimer_registered = true; + grpc_buffer_user_post_reclaimer(exec_ctx, + grpc_endpoint_get_buffer_user(t->ep), + false, &t->benign_reclaimer); + } + if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) { + close_transport_locked( + exec_ctx, t, + GRPC_ERROR_CREATE_REFERENCING( + "Last stream closed after sending GOAWAY", &error, 1)); + } } if (grpc_chttp2_list_remove_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream"); @@ -2045,6 +2076,57 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( return incoming_byte_stream; } +/******************************************************************************* + * BUFFER POOLS + */ + +static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked, + GRPC_ERROR_REF(error), false); +} + +static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked, + GRPC_ERROR_REF(error), false); +} + +static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (error == GRPC_ERROR_NONE && + grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM, + gpr_slice_from_static_string("Buffers full")); + } + t->benign_reclaimer_registered = false; + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer"); +} + +static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + size_t n = grpc_chttp2_stream_map_size(&t->stream_map); + t->destructive_reclaimer_registered = false; + if (error == GRPC_ERROR_NONE && n > 0) { + grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map); + grpc_chttp2_cancel_stream( + exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, + GRPC_CHTTP2_ENHANCE_YOUR_CALM)); + if (n > 1) { + t->destructive_reclaimer_registered = true; + grpc_buffer_user_post_reclaimer(exec_ctx, + grpc_endpoint_get_buffer_user(t->ep), + true, &t->destructive_reclaimer); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer"); +} + /******************************************************************************* * TRACING */ diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 3263c99bde5..42e05a2b2ba 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -138,6 +138,12 @@ typedef enum { GRPC_NUM_SETTING_SETS } grpc_chttp2_setting_set; +typedef enum { + GRPC_CHTTP2_NO_GOAWAY_SEND, + GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED, + GRPC_CHTTP2_GOAWAY_SENT, +} grpc_chttp2_sent_goaway_state; + /* Outstanding ping request data */ typedef struct grpc_chttp2_outstanding_ping { uint8_t id[8]; @@ -249,7 +255,7 @@ struct grpc_chttp2_transport { /** have we seen a goaway */ uint8_t seen_goaway; /** have we sent a goaway */ - uint8_t sent_goaway; + grpc_chttp2_sent_goaway_state sent_goaway_state; /** are the local settings dirty and need to be sent? */ uint8_t dirtied_local_settings; @@ -316,6 +322,18 @@ struct grpc_chttp2_transport { gpr_slice goaway_text; grpc_chttp2_write_cb *write_cb_pool; + + /* buffer pool state */ + /** have we scheduled a benign cleanup? */ + bool benign_reclaimer_registered; + /** have we scheduled a destructive cleanup? */ + bool destructive_reclaimer_registered; + /** benign cleanup closure */ + grpc_closure benign_reclaimer; + grpc_closure benign_reclaimer_locked; + /** destructive cleanup closure */ + grpc_closure destructive_reclaimer; + grpc_closure destructive_reclaimer_locked; }; typedef enum { diff --git a/src/core/ext/transport/chttp2/transport/stream_map.h b/src/core/ext/transport/chttp2/transport/stream_map.h index e76312dd1a0..203f6406802 100644 --- a/src/core/ext/transport/chttp2/transport/stream_map.h +++ b/src/core/ext/transport/chttp2/transport/stream_map.h @@ -68,6 +68,9 @@ void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map, uint32_t key); /* Return an existing key, or NULL if it does not exist */ void *grpc_chttp2_stream_map_find(grpc_chttp2_stream_map *map, uint32_t key); +/* Return a random entry */ +void *grpc_chttp2_stream_map_rand(grpc_chttp2_stream_map *map); + /* How many (populated) entries are in the stream map? */ size_t grpc_chttp2_stream_map_size(grpc_chttp2_stream_map *map);