Progress on splitting reading from transport lock

pull/2149/head
Craig Tiller 10 years ago
parent 3719f07233
commit 606d874d16
  1. 2
      include/grpc/support/slice_buffer.h
  2. 132
      src/core/transport/chttp2/internal.h
  3. 137
      src/core/transport/chttp2/parsing.c
  4. 3
      src/core/transport/chttp2/stream_map.h
  5. 350
      src/core/transport/chttp2_transport.c

@ -84,6 +84,8 @@ void gpr_slice_buffer_pop(gpr_slice_buffer *sb);
void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb);
/* swap the contents of two slice buffers */
void gpr_slice_buffer_swap(gpr_slice_buffer *a, gpr_slice_buffer *b);
/* move all of the elements of src into dst */
void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst);
#ifdef __cplusplus
}

@ -143,9 +143,9 @@ typedef struct {
} grpc_chttp2_stream_link;
typedef enum {
ERROR_STATE_NONE,
ERROR_STATE_SEEN,
ERROR_STATE_NOTIFIED
GRPC_CHTTP2_ERROR_STATE_NONE,
GRPC_CHTTP2_ERROR_STATE_SEEN,
GRPC_CHTTP2_ERROR_STATE_NOTIFIED
} grpc_chttp2_error_state;
/* We keep several sets of connection wide parameters */
@ -198,11 +198,32 @@ typedef struct {
/** settings values */
gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
/** has there been a connection level error, and have we notified
anyone about it? */
grpc_chttp2_error_state error_state;
/** what is the next stream id to be allocated by this peer?
copied to next_stream_id in parsing when parsing commences */
gpr_uint32 next_stream_id;
/** last received stream id */
gpr_uint32 last_incoming_stream_id;
/** pings awaiting responses */
grpc_chttp2_outstanding_ping pings;
/** next payload for an outgoing ping */
gpr_uint64 ping_counter;
/** concurrent stream count: updated when not parsing,
so this is a strict over-estimation on the client */
gpr_uint32 concurrent_stream_count;
/** is there a goaway available? */
gpr_uint8 have_goaway;
/** what is the debug text of the goaway? */
gpr_slice goaway_text;
/** what is the status code of the goaway? */
grpc_status_code goaway_error;
} grpc_chttp2_transport_global;
typedef struct {
@ -279,7 +300,6 @@ struct grpc_chttp2_transport_parsing {
grpc_chttp2_outstanding_ping pings;
};
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
grpc_endpoint *ep;
@ -287,18 +307,67 @@ struct grpc_chttp2_transport {
gpr_refcount refs;
gpr_mu mu;
gpr_cv cv;
/** is the transport destroying itself? */
gpr_uint8 destroying;
/** has the upper layer closed the transport? */
gpr_uint8 closed;
/** is a thread currently writing */
gpr_uint8 writing_active;
/** is a thread currently parsing */
gpr_uint8 parsing_active;
/** is there a read request to the endpoint outstanding? */
gpr_uint8 endpoint_reading;
/** various lists of streams */
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT];
/** global state for reading/writing */
grpc_chttp2_transport_global global;
/** state only accessible by the chain of execution that
set writing_active=1 */
grpc_chttp2_transport_writing writing;
/** state only accessible by the chain of execution that
set parsing_active=1 */
grpc_chttp2_transport_parsing parsing;
/** maps stream id to grpc_chttp2_stream objects;
owned by the parsing thread when parsing */
grpc_chttp2_stream_map parsing_stream_map;
/** streams created by the client (possibly during parsing);
merged with parsing_stream_map during unlock when no
parsing is occurring */
grpc_chttp2_stream_map new_stream_map;
/** closure to execute writing */
grpc_iomgr_closure writing_action;
/** address to place a newly accepted stream - set and unset by
grpc_chttp2_parsing_accept_stream; used by init_stream to
publish the accepted server stream */
grpc_chttp2_stream **accepting_stream;
struct {
/** is a thread currently performing channel callbacks */
gpr_uint8 executing;
/** transport channel-level callback */
const grpc_transport_callbacks *cb;
/** user data for cb calls */
void *cb_user_data;
/** closure for notifying transport closure */
grpc_iomgr_closure notify_closed;
} channel_callback;
#if 0
/* basic state management - what are we doing at the moment? */
gpr_uint8 reading;
/** are we calling back any grpc_transport_op completion events */
gpr_uint8 calling_back_ops;
gpr_uint8 destroying;
gpr_uint8 closed;
grpc_chttp2_error_state error_state;
/* stream indexing */
gpr_uint32 next_stream_id;
@ -306,40 +375,19 @@ struct grpc_chttp2_transport {
/* window management */
gpr_uint32 outgoing_window_update;
/* goaway */
grpc_chttp2_pending_goaway *pending_goaways;
size_t num_pending_goaways;
size_t cap_pending_goaways;
/* state for a stream that's not yet been created */
grpc_stream_op_buffer new_stream_sopb;
/* stream ops that need to be destroyed, but outside of the lock */
grpc_stream_op_buffer nuke_later_sopb;
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT];
grpc_chttp2_stream_map stream_map;
/* pings */
gpr_int64 ping_counter;
grpc_chttp2_transport_global global;
grpc_chttp2_transport_writing writing;
grpc_chttp2_transport_parsing parsing;
/** closure to execute writing */
grpc_iomgr_closure writing_action;
grpc_chttp2_stream **accepting_stream;
struct {
/** is a thread currently performing channel callbacks */
gpr_uint8 executing;
/** transport channel-level callback */
const grpc_transport_callbacks *cb;
/** user data for cb calls */
void *cb_user_data;
/** closure for notifying transport closure */
grpc_iomgr_closure notify_closed;
} channel_callback;
#endif
};
typedef struct {
@ -361,6 +409,13 @@ typedef struct {
grpc_chttp2_write_state write_state;
/** is this stream closed (boolean) */
gpr_uint8 read_closed;
/** stream state already published to the upper layer */
grpc_stream_state published_state;
/** address to publish next stream state to */
grpc_stream_state *publish_state;
/** pointer to sop buffer to fill in with new stream ops */
grpc_stream_op_buffer *incoming_sopb;
} grpc_chttp2_stream_global;
typedef struct {
@ -377,12 +432,12 @@ struct grpc_chttp2_stream_parsing {
gpr_uint32 id;
/** has this stream received a close */
gpr_uint8 received_close;
/** incoming_window has been reduced during parsing */
gpr_uint8 incoming_window_changed;
/** saw an error on this stream during parsing (it should be cancelled) */
gpr_uint8 saw_error;
/** saw a rst_stream */
gpr_uint8 saw_rst_stream;
/** incoming_window has been reduced by this much during parsing */
gpr_uint32 incoming_window_delta;
/** window available for peer to send to us */
gpr_uint32 incoming_window;
/** parsing state for data frames */
@ -403,20 +458,18 @@ struct grpc_chttp2_stream_parsing {
struct grpc_chttp2_stream {
grpc_chttp2_stream_global global;
grpc_chttp2_stream_writing writing;
gpr_uint32 outgoing_window_update;
gpr_uint8 cancelled;
grpc_chttp2_stream_parsing parsing;
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
/* sops from application */
grpc_stream_op_buffer *incoming_sopb;
grpc_stream_state *publish_state;
grpc_stream_state published_state;
#if 0
gpr_uint32 outgoing_window_update;
gpr_uint8 cancelled;
grpc_stream_state callback_state;
grpc_stream_op_buffer callback_sopb;
#endif
};
/** Transport writing call flow:
@ -434,6 +487,7 @@ void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writ
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing);
/** Process one slice of incoming data */
void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing);
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice);
void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing);
@ -450,9 +504,11 @@ int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport_writing *transport
void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_add_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_parsing_seen_stream(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing);
int grpc_chttp2_list_pop_parsing_seen_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_parsing **stream_parsing);
void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success);
void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global);

@ -32,6 +32,9 @@
*/
#include "src/core/transport/chttp2/internal.h"
#include <string.h>
#include "src/core/transport/chttp2/timeout_encoding.h"
#include <grpc/support/alloc.h>
@ -50,6 +53,9 @@ static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsi
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last);
void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_parsing *stream_parsing;
/* transport_parsing->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when
sending GOAWAY frame.
https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
@ -59,6 +65,84 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, g
if (!transport_parsing->is_client) {
transport_global->last_incoming_stream_id = transport_parsing->incoming_stream_id;
}
/* TODO(ctiller): re-implement */
GPR_ASSERT(transport_parsing->initial_window_update == 0);
#if 0
while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
int publish = 0;
GPR_ASSERT(s->incoming_sopb);
*s->publish_state =
compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
if (*s->publish_state != s->published_state) {
s->published_state = *s->publish_state;
publish = 1;
if (s->published_state == GRPC_STREAM_CLOSED) {
remove_from_stream_map(t, s);
}
}
if (s->parser.incoming_sopb.nops > 0) {
grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
publish = 1;
}
if (publish) {
if (s->incoming_metadata_count > 0) {
patch_metadata_ops(s);
}
s->incoming_sopb = NULL;
schedule_cb(t, s->global.recv_done_closure, 1);
}
}
#endif
/* copy parsing qbuf to global qbuf */
gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf);
/* update global settings */
if (transport_parsing->settings_updated) {
memcpy(transport_global->settings[PEER_SETTINGS], transport_parsing->settings, sizeof(transport_parsing->settings));
transport_parsing->settings_updated = 0;
}
/* update settings based on ack if received */
if (transport_parsing->settings_ack_received) {
memcpy(transport_global->settings[ACKED_SETTINGS], transport_global->settings[SENT_SETTINGS],
GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
transport_parsing->settings_ack_received = 0;
}
/* move goaway to the global state if we received one (it will be
published later */
if (transport_parsing->goaway_received) {
gpr_slice_unref(transport_global->goaway_text);
transport_global->goaway_text = gpr_slice_ref(transport_parsing->goaway_text);
transport_global->goaway_error = transport_parsing->goaway_error;
transport_global->have_goaway = 1;
transport_parsing->goaway_received = 0;
}
/* for each stream that saw an update, fixup global state */
while (grpc_chttp2_list_pop_parsing_seen_stream(transport_global, transport_parsing, &stream_global, &stream_parsing)) {
/* update incoming flow control window */
if (stream_parsing->incoming_window_delta) {
stream_global->incoming_window -= stream_parsing->incoming_window_delta;
stream_parsing->incoming_window_delta = 0;
grpc_chttp2_list_add_writable_window_update_stream(transport_global, stream_global);
}
/* update outgoing flow control window */
if (stream_parsing->outgoing_window_update) {
int was_zero = stream_global->outgoing_window <= 0;
int is_zero;
stream_global->outgoing_window += stream_parsing->outgoing_window_update;
stream_parsing->outgoing_window_update = 0;
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
}
}
}
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice) {
@ -555,6 +639,59 @@ void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *trans
grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b);
}
static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, grpc_chttp2_stream_parsing *stream_parsing) {
grpc_stream_op *ops = stream_global->incoming_sopb->ops;
size_t nops = stream_global->incoming_sopb->nops;
size_t i;
size_t j;
size_t mdidx = 0;
size_t last_mdidx;
int found_metadata = 0;
/* rework the array of metadata into a linked list, making use
of the breadcrumbs we left in metadata batches during
add_metadata_batch */
for (i = 0; i < nops; i++) {
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
found_metadata = 1;
/* we left a breadcrumb indicating where the end of this list is,
and since we add sequentially, we know from the end of the last
segment where this segment begins */
last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
GPR_ASSERT(last_mdidx > mdidx);
GPR_ASSERT(last_mdidx <= stream_parsing->incoming_metadata_count);
/* turn the array into a doubly linked list */
op->data.metadata.list.head = &stream_parsing->incoming_metadata[mdidx];
op->data.metadata.list.tail = &stream_parsing->incoming_metadata[last_mdidx - 1];
for (j = mdidx + 1; j < last_mdidx; j++) {
stream_parsing->incoming_metadata[j].prev = &stream_parsing->incoming_metadata[j - 1];
stream_parsing->incoming_metadata[j - 1].next = &stream_parsing->incoming_metadata[j];
}
stream_parsing->incoming_metadata[mdidx].prev = NULL;
stream_parsing->incoming_metadata[last_mdidx - 1].next = NULL;
/* track where we're up to */
mdidx = last_mdidx;
}
if (found_metadata) {
stream_parsing->old_incoming_metadata = stream_parsing->incoming_metadata;
if (mdidx != stream_parsing->incoming_metadata_count) {
/* we have a partially read metadata batch still in incoming_metadata */
size_t new_count = stream_parsing->incoming_metadata_count - mdidx;
size_t copy_bytes = sizeof(*stream_parsing->incoming_metadata) * new_count;
GPR_ASSERT(mdidx < stream_parsing->incoming_metadata_count);
stream_parsing->incoming_metadata = gpr_malloc(copy_bytes);
memcpy(stream_parsing->old_incoming_metadata + mdidx, stream_parsing->incoming_metadata,
copy_bytes);
stream_parsing->incoming_metadata_count = stream_parsing->incoming_metadata_capacity = new_count;
} else {
stream_parsing->incoming_metadata = NULL;
stream_parsing->incoming_metadata_count = 0;
stream_parsing->incoming_metadata_capacity = 0;
}
}
}
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream;
switch (transport_parsing->parser(transport_parsing->parser_data, transport_parsing, stream_parsing, slice, is_last)) {

@ -66,6 +66,9 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, gpr_uint32 key,
void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map,
gpr_uint32 key);
/* Move all elements of src into dst */
void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, grpc_chttp2_stream_map *dst);
/* Return an existing key, or NULL if it does not exist */
void *grpc_chttp2_stream_map_find(grpc_chttp2_stream_map *map, gpr_uint32 key);

@ -138,34 +138,31 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
grpc_mdstr_unref(t->constants.str_grpc_timeout);
grpc_mdstr_unref(t->parsing.str_grpc_timeout);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
GPR_ASSERT(t->lists[i].head == NULL);
GPR_ASSERT(t->lists[i].tail == NULL);
}
GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
GPR_ASSERT(grpc_chttp2_stream_map_size(&t->parsing_stream_map) == 0);
GPR_ASSERT(grpc_chttp2_stream_map_size(&t->new_stream_map) == 0);
grpc_chttp2_stream_map_destroy(&t->stream_map);
grpc_chttp2_stream_map_destroy(&t->parsing_stream_map);
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
gpr_mu_unlock(&t->mu);
gpr_mu_destroy(&t->mu);
gpr_cv_destroy(&t->cv);
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
for (i = 0; i < t->ping_count; i++) {
t->pings[i].cb(t->pings[i].user_data);
while (t->global.pings.next != &t->global.pings) {
grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
grpc_iomgr_add_delayed_callback(ping->on_recv, 0);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
}
gpr_free(t->pings);
for (i = 0; i < t->num_pending_goaways; i++) {
gpr_slice_unref(t->pending_goaways[i].debug);
}
gpr_free(t->pending_goaways);
grpc_sopb_destroy(&t->nuke_later_sopb);
grpc_mdctx_unref(t->metadata_context);
@ -187,7 +184,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
int j;
grpc_transport_setup_result sr;
GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
memset(t, 0, sizeof(*t));
@ -196,20 +193,20 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
/* one ref is for destroy, the other for when ep becomes NULL */
gpr_ref_init(&t->refs, 2);
gpr_mu_init(&t->mu);
gpr_cv_init(&t->cv);
grpc_mdctx_ref(mdctx);
t->metadata_context = mdctx;
t->constants.str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
t->reading = 1;
t->error_state = ERROR_STATE_NONE;
t->next_stream_id = is_client ? 1 : 2;
t->constants.is_client = is_client;
t->endpoint_reading = 1;
t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NONE;
t->global.next_stream_id = is_client ? 1 : 2;
t->global.is_client = is_client;
t->global.outgoing_window = DEFAULT_WINDOW;
t->global.incoming_window = DEFAULT_WINDOW;
t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
t->ping_counter = gpr_now().tv_nsec;
t->global.ping_counter = 1;
t->parsing.is_client = is_client;
t->parsing.str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
t->parsing.deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
gpr_slice_buffer_init(&t->global.qbuf);
@ -222,17 +219,17 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t);
grpc_sopb_init(&t->nuke_later_sopb);
if (is_client) {
gpr_slice_buffer_add(&t->global.qbuf,
gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
}
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet
large enough that the exponential growth should happen nicely when it's
needed.
TODO(ctiller): tune this */
grpc_chttp2_stream_map_init(&t->stream_map, 8);
grpc_chttp2_stream_map_init(&t->parsing_stream_map, 8);
grpc_chttp2_stream_map_init(&t->new_stream_map, 8);
/* copy in initial settings to all setting sets */
for (i = 0; i < NUM_SETTING_SETS; i++) {
@ -247,7 +244,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
t->global.sent_local_settings = 0;
/* configure http2 the way we like it */
if (t->constants.is_client) {
if (is_client) {
push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
}
@ -257,7 +254,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
for (i = 0; i < channel_args->num_args; i++) {
if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
if (t->constants.is_client) {
if (is_client) {
gpr_log(GPR_ERROR, "%s: is ignored on the client",
GRPC_ARG_MAX_CONCURRENT_STREAMS);
} else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
@ -272,13 +269,13 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
gpr_log(GPR_ERROR, "%s: must be an integer",
GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER);
} else if ((t->next_stream_id & 1) !=
} else if ((t->global.next_stream_id & 1) !=
(channel_args->args[i].value.integer & 1)) {
gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
t->constants.is_client ? "client" : "server");
GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->global.next_stream_id & 1,
is_client ? "client" : "server");
} else {
t->next_stream_id = channel_args->args[i].value.integer;
t->global.next_stream_id = channel_args->args[i].value.integer;
}
}
}
@ -295,7 +292,6 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
t->channel_callback.cb = sr.callbacks;
t->channel_callback.cb_user_data = sr.user_data;
t->channel_callback.executing = 0;
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
ref_transport(t); /* matches unref inside recv_data */
@ -309,28 +305,9 @@ static void destroy_transport(grpc_transport *gt) {
lock(t);
t->destroying = 1;
/* Wait for pending stuff to finish.
We need to be not calling back to ensure that closed() gets a chance to
trigger if needed during unlock() before we die.
We need to be not writing as cancellation finalization may produce some
callbacks that NEED to be made to close out some streams when t->writing
becomes 0. */
while (t->channel_callback.executing || t->writing_active) {
gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
}
drop_connection(t);
unlock(t);
/* The drop_connection() above puts the grpc_chttp2_transport into an error state, and
the follow-up unlock should then (as part of the cleanup work it does)
ensure that cb is NULL, and therefore not call back anything further.
This check validates this very subtle behavior.
It's shutdown path, so I don't believe an extra lock pair is going to be
problematic for performance. */
lock(t);
GPR_ASSERT(t->error_state == ERROR_STATE_NOTIFIED);
unlock(t);
unref_transport(t);
}
@ -354,7 +331,7 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
gpr_slice debug_data) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
lock(t);
grpc_chttp2_goaway_append(t->last_incoming_stream_id,
grpc_chttp2_goaway_append(t->global.last_incoming_stream_id,
grpc_chttp2_grpc_status_to_http2_error(status),
debug_data, &t->global.qbuf);
unlock(t);
@ -367,41 +344,30 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
memset(s, 0, sizeof(*s));
s->parsing.incoming_deadline = gpr_inf_future;
grpc_sopb_init(&s->writing.sopb);
grpc_chttp2_data_parser_init(&s->parsing.data_parser);
ref_transport(t);
lock(t);
if (!server_data) {
s->global.id = 0;
s->global.outgoing_window = 0;
s->global.incoming_window = 0;
} else {
/* already locked */
if (server_data) {
GPR_ASSERT(t->parsing_active);
s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
s->global.outgoing_window =
t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->global.incoming_window =
t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
t->incoming_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s);
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
}
s->incoming_deadline = gpr_inf_future;
grpc_sopb_init(&s->writing.sopb);
grpc_sopb_init(&s->callback_sopb);
grpc_chttp2_data_parser_init(&s->parser);
if (initial_op) perform_op_locked(t, s, initial_op);
unlock(t);
return 0;
}
static void schedule_nuke_sopb(grpc_chttp2_transport *t, grpc_stream_op_buffer *sopb) {
grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops);
sopb->nops = 0;
}
static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
@ -409,7 +375,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_lock(&t->mu);
GPR_ASSERT(s->published_state == GRPC_STREAM_CLOSED || s->global.id == 0);
GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || s->global.id == 0);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
stream_list_remove(t, s, i);
@ -418,15 +384,14 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_unlock(&t->mu);
GPR_ASSERT(s->global.outgoing_sopb == NULL);
GPR_ASSERT(s->incoming_sopb == NULL);
GPR_ASSERT(s->global.incoming_sopb == NULL);
grpc_sopb_destroy(&s->writing.sopb);
grpc_sopb_destroy(&s->callback_sopb);
grpc_chttp2_data_parser_destroy(&s->parser);
for (i = 0; i < s->incoming_metadata_count; i++) {
grpc_mdelem_unref(s->incoming_metadata[i].md);
grpc_chttp2_data_parser_destroy(&s->parsing.data_parser);
for (i = 0; i < s->parsing.incoming_metadata_count; i++) {
grpc_mdelem_unref(s->parsing.incoming_metadata[i].md);
}
gpr_free(s->incoming_metadata);
gpr_free(s->old_incoming_metadata);
gpr_free(s->parsing.incoming_metadata);
gpr_free(s->parsing.old_incoming_metadata);
unref_transport(t);
}
@ -495,14 +460,16 @@ static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gr
stream_list_add_tail(t, s, id);
}
#if 0
static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
if (s->global.id == 0) return;
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing grpc_chttp2_stream %d",
t->constants.is_client ? "CLI" : "SVR", s->global.id));
t->global.is_client ? "CLI" : "SVR", s->global.id));
if (grpc_chttp2_stream_map_delete(&t->stream_map, s->global.id)) {
maybe_start_some_streams(t);
}
}
#endif
/*
* LOCK MANAGEMENT
@ -518,7 +485,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_closure *run_closures;
if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->constants, &t->global, &t->writing)) {
if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
ref_transport(t);
schedule_cb(t, &t->writing_action, 1);
@ -568,15 +535,12 @@ void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writ
}
/* cleanup writing related jazz */
grpc_chttp2_cleanup_writing(&t->constants, &t->global, &t->writing);
grpc_chttp2_cleanup_writing(&t->global, &t->writing);
/* leave the writing flag up on shutdown to prevent further writes in unlock()
from starting */
t->writing_active = 0;
if (t->destroying) {
gpr_cv_signal(&t->cv);
}
if (!t->reading) {
if (!t->endpoint_reading) {
grpc_endpoint_destroy(t->ep);
t->ep = NULL;
unref_transport(t); /* safe because we'll still have the ref for write */
@ -595,50 +559,42 @@ static void writing_action(void *gt, int iomgr_success_ignored) {
static void add_goaway(grpc_chttp2_transport *t, gpr_uint32 goaway_error,
gpr_slice goaway_text) {
if (t->num_pending_goaways == t->cap_pending_goaways) {
t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
t->pending_goaways = gpr_realloc(
t->pending_goaways, sizeof(grpc_chttp2_pending_goaway) * t->cap_pending_goaways);
}
t->pending_goaways[t->num_pending_goaways].status =
grpc_chttp2_http2_error_to_grpc_status(goaway_error);
t->pending_goaways[t->num_pending_goaways].debug = goaway_text;
t->num_pending_goaways++;
gpr_slice_unref(t->channel_callback.goaway_text);
t->channel_callback.have_goaway = 1;
t->channel_callback.goaway_text = goaway_text;
t->channel_callback.goaway_error = goaway_error;
}
static void maybe_start_some_streams(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
/* start streams where we have free grpc_chttp2_stream ids and free concurrency */
while (!t->parsing.executing && t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
grpc_chttp2_stream_map_size(&t->stream_map) <
while (t->global.next_stream_id <= MAX_CLIENT_STREAM_ID &&
t->global.concurrent_stream_count <
t->global.settings[PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
if (!s) return;
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
(s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
t->constants.is_client ? "CLI" : "SVR", s, t->next_stream_id));
t->global.is_client ? "CLI" : "SVR", s, t->global.next_stream_id));
if (t->next_stream_id == MAX_CLIENT_STREAM_ID) {
if (t->global.next_stream_id == MAX_CLIENT_STREAM_ID) {
add_goaway(
t, GRPC_CHTTP2_NO_ERROR,
gpr_slice_from_copied_string("Exceeded sequence number limit"));
}
GPR_ASSERT(s->global.id == 0);
s->global.id = t->next_stream_id;
t->next_stream_id += 2;
s->global.id = t->global.next_stream_id;
t->global.next_stream_id += 2;
s->global.outgoing_window =
t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->global.incoming_window =
t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s);
grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
t->global.concurrent_stream_count++;
stream_list_join(t, s, WRITABLE);
}
/* cancel out streams that will never be started */
while (t->next_stream_id > MAX_CLIENT_STREAM_ID) {
grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
if (!s) return;
while (t->global.next_stream_id > MAX_CLIENT_STREAM_ID && (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
cancel_stream(
t, s, GRPC_STATUS_UNAVAILABLE,
grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL,
@ -646,6 +602,7 @@ static void maybe_start_some_streams(grpc_chttp2_transport *t) {
}
}
#if 0
static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_transport_op *op) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_stream(
@ -665,27 +622,27 @@ static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, g
if (s->global.id == 0) {
IF_TRACING(gpr_log(GPR_DEBUG,
"HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency",
t->constants.is_client ? "CLI" : "SVR", s));
t->global.is_client ? "CLI" : "SVR", s));
stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
maybe_start_some_streams(t);
} else if (s->global.outgoing_window > 0) {
stream_list_join(t, s, WRITABLE);
}
} else {
schedule_nuke_sopb(t, op->send_ops);
grpc_sopb_reset(op->send_ops);
schedule_cb(t, s->global.send_done_closure, 0);
}
}
if (op->recv_ops) {
GPR_ASSERT(s->incoming_sopb == NULL);
GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED);
GPR_ASSERT(s->global.incoming_sopb == NULL);
GPR_ASSERT(s->global.published_state != GRPC_STREAM_CLOSED);
s->global.recv_done_closure = op->on_done_recv;
s->incoming_sopb = op->recv_ops;
s->incoming_sopb->nops = 0;
s->publish_state = op->recv_state;
gpr_free(s->old_incoming_metadata);
s->old_incoming_metadata = NULL;
s->global.incoming_sopb = op->recv_ops;
s->global.incoming_sopb->nops = 0;
s->global.publish_state = op->recv_state;
gpr_free(s->global.old_incoming_metadata);
s->global.old_incoming_metadata = NULL;
maybe_finish_read(t, s, 0);
maybe_join_window_updates(t, s);
}
@ -698,6 +655,7 @@ static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, g
schedule_cb(t, op->on_consumed, 1);
}
}
#endif
static void perform_op(grpc_transport *gt, grpc_stream *gs,
grpc_transport_op *op) {
@ -709,28 +667,23 @@ static void perform_op(grpc_transport *gt, grpc_stream *gs,
unlock(t);
}
static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
void *user_data) {
static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_outstanding_ping *p;
grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
lock(t);
if (t->ping_capacity == t->ping_count) {
t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2);
t->pings =
gpr_realloc(t->pings, sizeof(grpc_chttp2_outstanding_ping) * t->ping_capacity);
}
p = &t->pings[t->ping_count++];
p->id[0] = (t->ping_counter >> 56) & 0xff;
p->id[1] = (t->ping_counter >> 48) & 0xff;
p->id[2] = (t->ping_counter >> 40) & 0xff;
p->id[3] = (t->ping_counter >> 32) & 0xff;
p->id[4] = (t->ping_counter >> 24) & 0xff;
p->id[5] = (t->ping_counter >> 16) & 0xff;
p->id[6] = (t->ping_counter >> 8) & 0xff;
p->id[7] = t->ping_counter & 0xff;
p->cb = cb;
p->user_data = user_data;
p->next = &t->global.pings;
p->prev = p->next->prev;
p->prev->next = p->next->prev = p;
p->id[0] = (t->global.ping_counter >> 56) & 0xff;
p->id[1] = (t->global.ping_counter >> 48) & 0xff;
p->id[2] = (t->global.ping_counter >> 40) & 0xff;
p->id[3] = (t->global.ping_counter >> 32) & 0xff;
p->id[4] = (t->global.ping_counter >> 24) & 0xff;
p->id[5] = (t->global.ping_counter >> 16) & 0xff;
p->id[6] = (t->global.ping_counter >> 8) & 0xff;
p->id[7] = t->global.ping_counter & 0xff;
p->on_recv = on_recv;
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
unlock(t);
}
@ -753,6 +706,7 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) {
}
}
#if 0
static void cancel_stream_inner(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
@ -844,15 +798,17 @@ static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *grpc_chttp2_s
static void end_all_the_calls(grpc_chttp2_transport *t) {
grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t);
}
#endif
static void drop_connection(grpc_chttp2_transport *t) {
if (t->error_state == ERROR_STATE_NONE) {
t->error_state = ERROR_STATE_SEEN;
if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
t->global.error_state = GRPC_CHTTP2_ERROR_STATE_SEEN;
}
close_transport_locked(t);
end_all_the_calls(t);
}
#if 0
static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, int is_parser) {
if (is_parser) {
stream_list_join(t, s, MAYBE_FINISH_READ_AFTER_PARSE);
@ -860,6 +816,7 @@ static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, i
stream_list_join(t, s, FINISHED_READ_OP);
}
}
#endif
static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
if (t->parsing.executing) {
@ -875,15 +832,16 @@ static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stre
}
}
#if 0
static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
return grpc_chttp2_stream_map_find(&t->stream_map, id);
}
#endif
/* tcp read callback */
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error) {
grpc_chttp2_transport *t = tp;
grpc_chttp2_stream *s;
size_t i;
int keep_reading = 0;
@ -893,8 +851,8 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
case GRPC_ENDPOINT_CB_ERROR:
lock(t);
drop_connection(t);
t->reading = 0;
if (!t->writing.executing && t->ep) {
t->endpoint_reading = 0;
if (!t->writing_active && t->ep) {
grpc_endpoint_destroy(t->ep);
t->ep = NULL;
unref_transport(t); /* safe as we still have a ref for read */
@ -904,9 +862,10 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
break;
case GRPC_ENDPOINT_CB_OK:
lock(t);
GPR_ASSERT(!t->parsing.executing);
if (t->error_state == ERROR_STATE_NONE) {
t->parsing.executing = 1;
GPR_ASSERT(!t->parsing_active);
if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
t->parsing_active = 1;
grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
gpr_mu_unlock(&t->mu);
for (i = 0; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); i++)
;
@ -914,8 +873,14 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
if (i != nslices) {
drop_connection(t);
}
t->parsing.executing = 0;
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map);
t->global.concurrent_stream_count = grpc_stream_map_size(&t->parsing_stream_map);
/* handle higher level things */
grpc_chttp2_publish_reads(&t->global, &t->parsing);
t->parsing_active = 0;
}
#if 0
while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) {
maybe_finish_read(t, s, 0);
}
@ -940,6 +905,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
t->global.outgoing_window += t->global.outgoing_window_update;
t->global.outgoing_window_update = 0;
maybe_start_some_streams(t);
#endif
unlock(t);
keep_reading = 1;
break;
@ -964,92 +930,6 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
return GRPC_STREAM_OPEN;
}
static void patch_metadata_ops(grpc_chttp2_stream *s) {
grpc_stream_op *ops = s->incoming_sopb->ops;
size_t nops = s->incoming_sopb->nops;
size_t i;
size_t j;
size_t mdidx = 0;
size_t last_mdidx;
int found_metadata = 0;
/* rework the array of metadata into a linked list, making use
of the breadcrumbs we left in metadata batches during
add_metadata_batch */
for (i = 0; i < nops; i++) {
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
found_metadata = 1;
/* we left a breadcrumb indicating where the end of this list is,
and since we add sequentially, we know from the end of the last
segment where this segment begins */
last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
GPR_ASSERT(last_mdidx > mdidx);
GPR_ASSERT(last_mdidx <= s->incoming_metadata_count);
/* turn the array into a doubly linked list */
op->data.metadata.list.head = &s->incoming_metadata[mdidx];
op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1];
for (j = mdidx + 1; j < last_mdidx; j++) {
s->incoming_metadata[j].prev = &s->incoming_metadata[j - 1];
s->incoming_metadata[j - 1].next = &s->incoming_metadata[j];
}
s->incoming_metadata[mdidx].prev = NULL;
s->incoming_metadata[last_mdidx - 1].next = NULL;
/* track where we're up to */
mdidx = last_mdidx;
}
if (found_metadata) {
s->old_incoming_metadata = s->incoming_metadata;
if (mdidx != s->incoming_metadata_count) {
/* we have a partially read metadata batch still in incoming_metadata */
size_t new_count = s->incoming_metadata_count - mdidx;
size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count;
GPR_ASSERT(mdidx < s->incoming_metadata_count);
s->incoming_metadata = gpr_malloc(copy_bytes);
memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata,
copy_bytes);
s->incoming_metadata_count = s->incoming_metadata_capacity = new_count;
} else {
s->incoming_metadata = NULL;
s->incoming_metadata_count = 0;
s->incoming_metadata_capacity = 0;
}
}
}
static void unlock_check_parser(grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
if (t->parsing.executing) {
return;
}
while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
int publish = 0;
GPR_ASSERT(s->incoming_sopb);
*s->publish_state =
compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
if (*s->publish_state != s->published_state) {
s->published_state = *s->publish_state;
publish = 1;
if (s->published_state == GRPC_STREAM_CLOSED) {
remove_from_stream_map(t, s);
}
}
if (s->parser.incoming_sopb.nops > 0) {
grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
publish = 1;
}
if (publish) {
if (s->incoming_metadata_count > 0) {
patch_metadata_ops(s);
}
s->incoming_sopb = NULL;
schedule_cb(t, s->global.recv_done_closure, 1);
}
}
}
typedef struct {
grpc_chttp2_transport *t;
grpc_chttp2_pending_goaway *goaways;

Loading…
Cancel
Save