Beginning integration of buffer pools && chttp2

reviewable/pr8239/r2
Craig Tiller 8 years ago
parent ce2ff3c071
commit 25f29afa84
  1. 104
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 20
      src/core/ext/transport/chttp2/transport/internal.h
  3. 3
      src/core/ext/transport/chttp2/transport/stream_map.h

@ -118,6 +118,15 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_error *error); grpc_error *error);
static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
/******************************************************************************* /*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING * CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/ */
@ -240,6 +249,10 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t); grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t);
grpc_closure_init(&t->read_action_begin, read_action_begin, t); grpc_closure_init(&t->read_action_begin, read_action_begin, t);
grpc_closure_init(&t->read_action_locked, read_action_locked, t); grpc_closure_init(&t->read_action_locked, read_action_locked, t);
grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t);
grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t);
grpc_closure_init(&t->benign_reclaimer, benign_reclaimer_locked, t);
grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer_locked, t);
grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(&t->hpack_parser); grpc_chttp2_hpack_parser_init(&t->hpack_parser);
@ -645,6 +658,13 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
} }
if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE("goaway sent"));
}
}
grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
switch (t->write_state) { switch (t->write_state) {
@ -1155,6 +1175,14 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_free(msg); gpr_free(msg);
} }
static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_error_code error, gpr_slice data) {
t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)error, data,
&t->qbuf);
grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
}
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
void *stream_op, void *stream_op,
grpc_error *error_ignored) { grpc_error *error_ignored) {
@ -1169,15 +1197,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
} }
if (op->send_goaway) { if (op->send_goaway) {
t->sent_goaway = 1; send_goaway(exec_ctx, t,
grpc_chttp2_goaway_append( grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
t->last_new_stream_id, gpr_slice_ref(*op->goaway_message));
(uint32_t)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
gpr_slice_ref(*op->goaway_message), &t->qbuf);
close_transport = grpc_chttp2_stream_map_size(&t->stream_map) == 0
? GRPC_ERROR_CREATE("GOAWAY sent")
: GRPC_ERROR_NONE;
grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
} }
if (op->set_accept_stream) { if (op->set_accept_stream) {
@ -1314,11 +1336,20 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s->data_parser.parsing_frame = NULL; s->data_parser.parsing_frame = NULL;
} }
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0 && t->sent_goaway) { if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
if (!t->benign_reclaimer_registered) {
t->benign_reclaimer_registered = true;
grpc_buffer_user_post_reclaimer(exec_ctx,
grpc_endpoint_get_buffer_user(t->ep),
false, &t->benign_reclaimer);
}
if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
close_transport_locked( close_transport_locked(
exec_ctx, t, GRPC_ERROR_CREATE_REFERENCING( exec_ctx, t,
GRPC_ERROR_CREATE_REFERENCING(
"Last stream closed after sending GOAWAY", &error, 1)); "Last stream closed after sending GOAWAY", &error, 1));
} }
}
if (grpc_chttp2_list_remove_writable_stream(t, s)) { if (grpc_chttp2_list_remove_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream"); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream");
} }
@ -2045,6 +2076,57 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
return incoming_byte_stream; return incoming_byte_stream;
} }
/*******************************************************************************
* BUFFER POOLS
*/
static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked,
GRPC_ERROR_REF(error), false);
}
static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked,
GRPC_ERROR_REF(error), false);
}
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
if (error == GRPC_ERROR_NONE &&
grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM,
gpr_slice_from_static_string("Buffers full"));
}
t->benign_reclaimer_registered = false;
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer");
}
static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
t->destructive_reclaimer_registered = false;
if (error == GRPC_ERROR_NONE && n > 0) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map);
grpc_chttp2_cancel_stream(
exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
GRPC_ERROR_INT_HTTP2_ERROR,
GRPC_CHTTP2_ENHANCE_YOUR_CALM));
if (n > 1) {
t->destructive_reclaimer_registered = true;
grpc_buffer_user_post_reclaimer(exec_ctx,
grpc_endpoint_get_buffer_user(t->ep),
true, &t->destructive_reclaimer);
}
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer");
}
/******************************************************************************* /*******************************************************************************
* TRACING * TRACING
*/ */

@ -138,6 +138,12 @@ typedef enum {
GRPC_NUM_SETTING_SETS GRPC_NUM_SETTING_SETS
} grpc_chttp2_setting_set; } grpc_chttp2_setting_set;
typedef enum {
GRPC_CHTTP2_NO_GOAWAY_SEND,
GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED,
GRPC_CHTTP2_GOAWAY_SENT,
} grpc_chttp2_sent_goaway_state;
/* Outstanding ping request data */ /* Outstanding ping request data */
typedef struct grpc_chttp2_outstanding_ping { typedef struct grpc_chttp2_outstanding_ping {
uint8_t id[8]; uint8_t id[8];
@ -249,7 +255,7 @@ struct grpc_chttp2_transport {
/** have we seen a goaway */ /** have we seen a goaway */
uint8_t seen_goaway; uint8_t seen_goaway;
/** have we sent a goaway */ /** have we sent a goaway */
uint8_t sent_goaway; grpc_chttp2_sent_goaway_state sent_goaway_state;
/** are the local settings dirty and need to be sent? */ /** are the local settings dirty and need to be sent? */
uint8_t dirtied_local_settings; uint8_t dirtied_local_settings;
@ -316,6 +322,18 @@ struct grpc_chttp2_transport {
gpr_slice goaway_text; gpr_slice goaway_text;
grpc_chttp2_write_cb *write_cb_pool; grpc_chttp2_write_cb *write_cb_pool;
/* buffer pool state */
/** have we scheduled a benign cleanup? */
bool benign_reclaimer_registered;
/** have we scheduled a destructive cleanup? */
bool destructive_reclaimer_registered;
/** benign cleanup closure */
grpc_closure benign_reclaimer;
grpc_closure benign_reclaimer_locked;
/** destructive cleanup closure */
grpc_closure destructive_reclaimer;
grpc_closure destructive_reclaimer_locked;
}; };
typedef enum { typedef enum {

@ -68,6 +68,9 @@ void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map, uint32_t key);
/* Return an existing key, or NULL if it does not exist */ /* Return an existing key, or NULL if it does not exist */
void *grpc_chttp2_stream_map_find(grpc_chttp2_stream_map *map, uint32_t key); void *grpc_chttp2_stream_map_find(grpc_chttp2_stream_map *map, uint32_t key);
/* Return a random entry */
void *grpc_chttp2_stream_map_rand(grpc_chttp2_stream_map *map);
/* How many (populated) entries are in the stream map? */ /* How many (populated) entries are in the stream map? */
size_t grpc_chttp2_stream_map_size(grpc_chttp2_stream_map *map); size_t grpc_chttp2_stream_map_size(grpc_chttp2_stream_map *map);

Loading…
Cancel
Save