From 606d874d162a9a254035839701bf6926f681c77b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 15 Jun 2015 06:58:50 -0700 Subject: [PATCH] Progress on splitting reading from transport lock --- include/grpc/support/slice_buffer.h | 2 + src/core/transport/chttp2/internal.h | 132 +++++++--- src/core/transport/chttp2/parsing.c | 137 ++++++++++ src/core/transport/chttp2/stream_map.h | 3 + src/core/transport/chttp2_transport.c | 350 ++++++++----------------- 5 files changed, 351 insertions(+), 273 deletions(-) diff --git a/include/grpc/support/slice_buffer.h b/include/grpc/support/slice_buffer.h index 1545dbfd76a..ec048e8c91f 100644 --- a/include/grpc/support/slice_buffer.h +++ b/include/grpc/support/slice_buffer.h @@ -84,6 +84,8 @@ void gpr_slice_buffer_pop(gpr_slice_buffer *sb); void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb); /* swap the contents of two slice buffers */ void gpr_slice_buffer_swap(gpr_slice_buffer *a, gpr_slice_buffer *b); +/* move all of the elements of src into dst */ +void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst); #ifdef __cplusplus } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index f89d327f8cb..ffadd047621 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -143,9 +143,9 @@ typedef struct { } grpc_chttp2_stream_link; typedef enum { - ERROR_STATE_NONE, - ERROR_STATE_SEEN, - ERROR_STATE_NOTIFIED + GRPC_CHTTP2_ERROR_STATE_NONE, + GRPC_CHTTP2_ERROR_STATE_SEEN, + GRPC_CHTTP2_ERROR_STATE_NOTIFIED } grpc_chttp2_error_state; /* We keep several sets of connection wide parameters */ @@ -198,11 +198,32 @@ typedef struct { /** settings values */ gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; + /** has there been a connection level error, and have we notified + anyone about it? */ + grpc_chttp2_error_state error_state; + + /** what is the next stream id to be allocated by this peer? + copied to next_stream_id in parsing when parsing commences */ + gpr_uint32 next_stream_id; + /** last received stream id */ gpr_uint32 last_incoming_stream_id; /** pings awaiting responses */ grpc_chttp2_outstanding_ping pings; + /** next payload for an outgoing ping */ + gpr_uint64 ping_counter; + + /** concurrent stream count: updated when not parsing, + so this is a strict over-estimation on the client */ + gpr_uint32 concurrent_stream_count; + + /** is there a goaway available? */ + gpr_uint8 have_goaway; + /** what is the debug text of the goaway? */ + gpr_slice goaway_text; + /** what is the status code of the goaway? */ + grpc_status_code goaway_error; } grpc_chttp2_transport_global; typedef struct { @@ -279,7 +300,6 @@ struct grpc_chttp2_transport_parsing { grpc_chttp2_outstanding_ping pings; }; - struct grpc_chttp2_transport { grpc_transport base; /* must be first */ grpc_endpoint *ep; @@ -287,18 +307,67 @@ struct grpc_chttp2_transport { gpr_refcount refs; gpr_mu mu; - gpr_cv cv; + + /** is the transport destroying itself? */ + gpr_uint8 destroying; + /** has the upper layer closed the transport? */ + gpr_uint8 closed; /** is a thread currently writing */ gpr_uint8 writing_active; + /** is a thread currently parsing */ + gpr_uint8 parsing_active; + /** is there a read request to the endpoint outstanding? */ + gpr_uint8 endpoint_reading; + + /** various lists of streams */ + grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; + + /** global state for reading/writing */ + grpc_chttp2_transport_global global; + /** state only accessible by the chain of execution that + set writing_active=1 */ + grpc_chttp2_transport_writing writing; + /** state only accessible by the chain of execution that + set parsing_active=1 */ + grpc_chttp2_transport_parsing parsing; + + /** maps stream id to grpc_chttp2_stream objects; + owned by the parsing thread when parsing */ + grpc_chttp2_stream_map parsing_stream_map; + + /** streams created by the client (possibly during parsing); + merged with parsing_stream_map during unlock when no + parsing is occurring */ + grpc_chttp2_stream_map new_stream_map; + + /** closure to execute writing */ + grpc_iomgr_closure writing_action; + + /** address to place a newly accepted stream - set and unset by + grpc_chttp2_parsing_accept_stream; used by init_stream to + publish the accepted server stream */ + grpc_chttp2_stream **accepting_stream; + + struct { + /** is a thread currently performing channel callbacks */ + gpr_uint8 executing; + /** transport channel-level callback */ + const grpc_transport_callbacks *cb; + /** user data for cb calls */ + void *cb_user_data; + /** closure for notifying transport closure */ + grpc_iomgr_closure notify_closed; + } channel_callback; + +#if 0 /* basic state management - what are we doing at the moment? */ gpr_uint8 reading; /** are we calling back any grpc_transport_op completion events */ gpr_uint8 calling_back_ops; gpr_uint8 destroying; gpr_uint8 closed; - grpc_chttp2_error_state error_state; /* stream indexing */ gpr_uint32 next_stream_id; @@ -306,40 +375,19 @@ struct grpc_chttp2_transport { /* window management */ gpr_uint32 outgoing_window_update; - /* goaway */ - grpc_chttp2_pending_goaway *pending_goaways; - size_t num_pending_goaways; - size_t cap_pending_goaways; - /* state for a stream that's not yet been created */ grpc_stream_op_buffer new_stream_sopb; /* stream ops that need to be destroyed, but outside of the lock */ grpc_stream_op_buffer nuke_later_sopb; - grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; - grpc_chttp2_stream_map stream_map; - /* pings */ gpr_int64 ping_counter; - grpc_chttp2_transport_global global; - grpc_chttp2_transport_writing writing; - grpc_chttp2_transport_parsing parsing; - /** closure to execute writing */ - grpc_iomgr_closure writing_action; + grpc_chttp2_stream **accepting_stream; - struct { - /** is a thread currently performing channel callbacks */ - gpr_uint8 executing; - /** transport channel-level callback */ - const grpc_transport_callbacks *cb; - /** user data for cb calls */ - void *cb_user_data; - /** closure for notifying transport closure */ - grpc_iomgr_closure notify_closed; - } channel_callback; +#endif }; typedef struct { @@ -361,6 +409,13 @@ typedef struct { grpc_chttp2_write_state write_state; /** is this stream closed (boolean) */ gpr_uint8 read_closed; + + /** stream state already published to the upper layer */ + grpc_stream_state published_state; + /** address to publish next stream state to */ + grpc_stream_state *publish_state; + /** pointer to sop buffer to fill in with new stream ops */ + grpc_stream_op_buffer *incoming_sopb; } grpc_chttp2_stream_global; typedef struct { @@ -377,12 +432,12 @@ struct grpc_chttp2_stream_parsing { gpr_uint32 id; /** has this stream received a close */ gpr_uint8 received_close; - /** incoming_window has been reduced during parsing */ - gpr_uint8 incoming_window_changed; /** saw an error on this stream during parsing (it should be cancelled) */ gpr_uint8 saw_error; /** saw a rst_stream */ gpr_uint8 saw_rst_stream; + /** incoming_window has been reduced by this much during parsing */ + gpr_uint32 incoming_window_delta; /** window available for peer to send to us */ gpr_uint32 incoming_window; /** parsing state for data frames */ @@ -403,20 +458,18 @@ struct grpc_chttp2_stream_parsing { struct grpc_chttp2_stream { grpc_chttp2_stream_global global; grpc_chttp2_stream_writing writing; - - gpr_uint32 outgoing_window_update; - gpr_uint8 cancelled; + grpc_chttp2_stream_parsing parsing; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT]; - /* sops from application */ - grpc_stream_op_buffer *incoming_sopb; - grpc_stream_state *publish_state; - grpc_stream_state published_state; +#if 0 + gpr_uint32 outgoing_window_update; + gpr_uint8 cancelled; grpc_stream_state callback_state; grpc_stream_op_buffer callback_sopb; +#endif }; /** Transport writing call flow: @@ -434,6 +487,7 @@ void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writ void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); /** Process one slice of incoming data */ +void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice); void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); @@ -450,9 +504,11 @@ int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport_writing *transport void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); +void grpc_chttp2_list_add_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); int grpc_chttp2_list_pop_writable_window_update_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); void grpc_chttp2_list_add_parsing_seen_stream(grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing); +int grpc_chttp2_list_pop_parsing_seen_stream(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_parsing **stream_parsing); void grpc_chttp2_schedule_closure(grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure, int success); void grpc_chttp2_read_write_state_changed(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index e2c39bec3c9..9f501941259 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -32,6 +32,9 @@ */ #include "src/core/transport/chttp2/internal.h" + +#include + #include "src/core/transport/chttp2/timeout_encoding.h" #include @@ -50,6 +53,9 @@ static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsi static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last); void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing) { + grpc_chttp2_stream_global *stream_global; + grpc_chttp2_stream_parsing *stream_parsing; + /* transport_parsing->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when sending GOAWAY frame. https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8 @@ -59,6 +65,84 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, g if (!transport_parsing->is_client) { transport_global->last_incoming_stream_id = transport_parsing->incoming_stream_id; } + + /* TODO(ctiller): re-implement */ + GPR_ASSERT(transport_parsing->initial_window_update == 0); + +#if 0 + while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { + int publish = 0; + GPR_ASSERT(s->incoming_sopb); + *s->publish_state = + compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed); + if (*s->publish_state != s->published_state) { + s->published_state = *s->publish_state; + publish = 1; + if (s->published_state == GRPC_STREAM_CLOSED) { + remove_from_stream_map(t, s); + } + } + if (s->parser.incoming_sopb.nops > 0) { + grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb); + publish = 1; + } + if (publish) { + if (s->incoming_metadata_count > 0) { + patch_metadata_ops(s); + } + s->incoming_sopb = NULL; + schedule_cb(t, s->global.recv_done_closure, 1); + } + } +#endif + + /* copy parsing qbuf to global qbuf */ + gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf); + + /* update global settings */ + if (transport_parsing->settings_updated) { + memcpy(transport_global->settings[PEER_SETTINGS], transport_parsing->settings, sizeof(transport_parsing->settings)); + transport_parsing->settings_updated = 0; + } + + /* update settings based on ack if received */ + if (transport_parsing->settings_ack_received) { + memcpy(transport_global->settings[ACKED_SETTINGS], transport_global->settings[SENT_SETTINGS], + GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32)); + transport_parsing->settings_ack_received = 0; + } + + /* move goaway to the global state if we received one (it will be + published later */ + if (transport_parsing->goaway_received) { + gpr_slice_unref(transport_global->goaway_text); + transport_global->goaway_text = gpr_slice_ref(transport_parsing->goaway_text); + transport_global->goaway_error = transport_parsing->goaway_error; + transport_global->have_goaway = 1; + transport_parsing->goaway_received = 0; + } + + /* for each stream that saw an update, fixup global state */ + while (grpc_chttp2_list_pop_parsing_seen_stream(transport_global, transport_parsing, &stream_global, &stream_parsing)) { + /* update incoming flow control window */ + if (stream_parsing->incoming_window_delta) { + stream_global->incoming_window -= stream_parsing->incoming_window_delta; + stream_parsing->incoming_window_delta = 0; + grpc_chttp2_list_add_writable_window_update_stream(transport_global, stream_global); + } + + /* update outgoing flow control window */ + if (stream_parsing->outgoing_window_update) { + int was_zero = stream_global->outgoing_window <= 0; + int is_zero; + stream_global->outgoing_window += stream_parsing->outgoing_window_update; + stream_parsing->outgoing_window_update = 0; + is_zero = stream_global->outgoing_window <= 0; + if (was_zero && !is_zero) { + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + } + } + } } int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice) { @@ -555,6 +639,59 @@ void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *trans grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b); } +static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, grpc_chttp2_stream_parsing *stream_parsing) { + grpc_stream_op *ops = stream_global->incoming_sopb->ops; + size_t nops = stream_global->incoming_sopb->nops; + size_t i; + size_t j; + size_t mdidx = 0; + size_t last_mdidx; + int found_metadata = 0; + + /* rework the array of metadata into a linked list, making use + of the breadcrumbs we left in metadata batches during + add_metadata_batch */ + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + found_metadata = 1; + /* we left a breadcrumb indicating where the end of this list is, + and since we add sequentially, we know from the end of the last + segment where this segment begins */ + last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail); + GPR_ASSERT(last_mdidx > mdidx); + GPR_ASSERT(last_mdidx <= stream_parsing->incoming_metadata_count); + /* turn the array into a doubly linked list */ + op->data.metadata.list.head = &stream_parsing->incoming_metadata[mdidx]; + op->data.metadata.list.tail = &stream_parsing->incoming_metadata[last_mdidx - 1]; + for (j = mdidx + 1; j < last_mdidx; j++) { + stream_parsing->incoming_metadata[j].prev = &stream_parsing->incoming_metadata[j - 1]; + stream_parsing->incoming_metadata[j - 1].next = &stream_parsing->incoming_metadata[j]; + } + stream_parsing->incoming_metadata[mdidx].prev = NULL; + stream_parsing->incoming_metadata[last_mdidx - 1].next = NULL; + /* track where we're up to */ + mdidx = last_mdidx; + } + if (found_metadata) { + stream_parsing->old_incoming_metadata = stream_parsing->incoming_metadata; + if (mdidx != stream_parsing->incoming_metadata_count) { + /* we have a partially read metadata batch still in incoming_metadata */ + size_t new_count = stream_parsing->incoming_metadata_count - mdidx; + size_t copy_bytes = sizeof(*stream_parsing->incoming_metadata) * new_count; + GPR_ASSERT(mdidx < stream_parsing->incoming_metadata_count); + stream_parsing->incoming_metadata = gpr_malloc(copy_bytes); + memcpy(stream_parsing->old_incoming_metadata + mdidx, stream_parsing->incoming_metadata, + copy_bytes); + stream_parsing->incoming_metadata_count = stream_parsing->incoming_metadata_capacity = new_count; + } else { + stream_parsing->incoming_metadata = NULL; + stream_parsing->incoming_metadata_count = 0; + stream_parsing->incoming_metadata_capacity = 0; + } + } +} + static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last) { grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream; switch (transport_parsing->parser(transport_parsing->parser_data, transport_parsing, stream_parsing, slice, is_last)) { diff --git a/src/core/transport/chttp2/stream_map.h b/src/core/transport/chttp2/stream_map.h index d338d2f8921..f59dece7467 100644 --- a/src/core/transport/chttp2/stream_map.h +++ b/src/core/transport/chttp2/stream_map.h @@ -66,6 +66,9 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, gpr_uint32 key, void *grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map *map, gpr_uint32 key); +/* Move all elements of src into dst */ +void grpc_chttp2_stream_map_move_into(grpc_chttp2_stream_map *src, grpc_chttp2_stream_map *dst); + /* Return an existing key, or NULL if it does not exist */ void *grpc_chttp2_stream_map_find(grpc_chttp2_stream_map *map, gpr_uint32 key); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 13ddeacc028..5db9b92727f 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -138,34 +138,31 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser); - grpc_mdstr_unref(t->constants.str_grpc_timeout); + grpc_mdstr_unref(t->parsing.str_grpc_timeout); for (i = 0; i < STREAM_LIST_COUNT; i++) { GPR_ASSERT(t->lists[i].head == NULL); GPR_ASSERT(t->lists[i].tail == NULL); } - GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0); + GPR_ASSERT(grpc_chttp2_stream_map_size(&t->parsing_stream_map) == 0); + GPR_ASSERT(grpc_chttp2_stream_map_size(&t->new_stream_map) == 0); - grpc_chttp2_stream_map_destroy(&t->stream_map); + grpc_chttp2_stream_map_destroy(&t->parsing_stream_map); + grpc_chttp2_stream_map_destroy(&t->new_stream_map); gpr_mu_unlock(&t->mu); gpr_mu_destroy(&t->mu); - gpr_cv_destroy(&t->cv); /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ - for (i = 0; i < t->ping_count; i++) { - t->pings[i].cb(t->pings[i].user_data); + while (t->global.pings.next != &t->global.pings) { + grpc_chttp2_outstanding_ping *ping = t->global.pings.next; + grpc_iomgr_add_delayed_callback(ping->on_recv, 0); + ping->next->prev = ping->prev; + ping->prev->next = ping->next; + gpr_free(ping); } - gpr_free(t->pings); - - for (i = 0; i < t->num_pending_goaways; i++) { - gpr_slice_unref(t->pending_goaways[i].debug); - } - gpr_free(t->pending_goaways); - - grpc_sopb_destroy(&t->nuke_later_sopb); grpc_mdctx_unref(t->metadata_context); @@ -187,7 +184,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba int j; grpc_transport_setup_result sr; - GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN); + GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); memset(t, 0, sizeof(*t)); @@ -196,20 +193,20 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba /* one ref is for destroy, the other for when ep becomes NULL */ gpr_ref_init(&t->refs, 2); gpr_mu_init(&t->mu); - gpr_cv_init(&t->cv); grpc_mdctx_ref(mdctx); t->metadata_context = mdctx; - t->constants.str_grpc_timeout = - grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); - t->reading = 1; - t->error_state = ERROR_STATE_NONE; - t->next_stream_id = is_client ? 1 : 2; - t->constants.is_client = is_client; + t->endpoint_reading = 1; + t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NONE; + t->global.next_stream_id = is_client ? 1 : 2; + t->global.is_client = is_client; t->global.outgoing_window = DEFAULT_WINDOW; t->global.incoming_window = DEFAULT_WINDOW; t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; - t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; - t->ping_counter = gpr_now().tv_nsec; + t->global.ping_counter = 1; + t->parsing.is_client = is_client; + t->parsing.str_grpc_timeout = + grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); + t->parsing.deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; gpr_slice_buffer_init(&t->global.qbuf); @@ -222,17 +219,17 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context); grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t); - grpc_sopb_init(&t->nuke_later_sopb); if (is_client) { gpr_slice_buffer_add(&t->global.qbuf, - gpr_slice_from_copied_string(CLIENT_CONNECT_STRING)); + gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); } /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet large enough that the exponential growth should happen nicely when it's needed. TODO(ctiller): tune this */ - grpc_chttp2_stream_map_init(&t->stream_map, 8); + grpc_chttp2_stream_map_init(&t->parsing_stream_map, 8); + grpc_chttp2_stream_map_init(&t->new_stream_map, 8); /* copy in initial settings to all setting sets */ for (i = 0; i < NUM_SETTING_SETS; i++) { @@ -247,7 +244,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba t->global.sent_local_settings = 0; /* configure http2 the way we like it */ - if (t->constants.is_client) { + if (is_client) { push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } @@ -257,7 +254,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) { - if (t->constants.is_client) { + if (is_client) { gpr_log(GPR_ERROR, "%s: is ignored on the client", GRPC_ARG_MAX_CONCURRENT_STREAMS); } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) { @@ -272,13 +269,13 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba if (channel_args->args[i].type != GRPC_ARG_INTEGER) { gpr_log(GPR_ERROR, "%s: must be an integer", GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER); - } else if ((t->next_stream_id & 1) != + } else if ((t->global.next_stream_id & 1) != (channel_args->args[i].value.integer & 1)) { gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1, - t->constants.is_client ? "client" : "server"); + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->global.next_stream_id & 1, + is_client ? "client" : "server"); } else { - t->next_stream_id = channel_args->args[i].value.integer; + t->global.next_stream_id = channel_args->args[i].value.integer; } } } @@ -295,7 +292,6 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba t->channel_callback.cb = sr.callbacks; t->channel_callback.cb_user_data = sr.user_data; t->channel_callback.executing = 0; - if (t->destroying) gpr_cv_signal(&t->cv); unlock(t); ref_transport(t); /* matches unref inside recv_data */ @@ -309,28 +305,9 @@ static void destroy_transport(grpc_transport *gt) { lock(t); t->destroying = 1; - /* Wait for pending stuff to finish. - We need to be not calling back to ensure that closed() gets a chance to - trigger if needed during unlock() before we die. - We need to be not writing as cancellation finalization may produce some - callbacks that NEED to be made to close out some streams when t->writing - becomes 0. */ - while (t->channel_callback.executing || t->writing_active) { - gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); - } drop_connection(t); unlock(t); - /* The drop_connection() above puts the grpc_chttp2_transport into an error state, and - the follow-up unlock should then (as part of the cleanup work it does) - ensure that cb is NULL, and therefore not call back anything further. - This check validates this very subtle behavior. - It's shutdown path, so I don't believe an extra lock pair is going to be - problematic for performance. */ - lock(t); - GPR_ASSERT(t->error_state == ERROR_STATE_NOTIFIED); - unlock(t); - unref_transport(t); } @@ -354,7 +331,7 @@ static void goaway(grpc_transport *gt, grpc_status_code status, gpr_slice debug_data) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; lock(t); - grpc_chttp2_goaway_append(t->last_incoming_stream_id, + grpc_chttp2_goaway_append(t->global.last_incoming_stream_id, grpc_chttp2_grpc_status_to_http2_error(status), debug_data, &t->global.qbuf); unlock(t); @@ -367,41 +344,30 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, memset(s, 0, sizeof(*s)); + s->parsing.incoming_deadline = gpr_inf_future; + grpc_sopb_init(&s->writing.sopb); + grpc_chttp2_data_parser_init(&s->parsing.data_parser); + ref_transport(t); lock(t); - if (!server_data) { - s->global.id = 0; - s->global.outgoing_window = 0; - s->global.incoming_window = 0; - } else { - /* already locked */ + if (server_data) { + GPR_ASSERT(t->parsing_active); s->global.id = (gpr_uint32)(gpr_uintptr)server_data; s->global.outgoing_window = t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->global.incoming_window = t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - t->incoming_stream = s; - grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s); + *t->accepting_stream = s; + grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s); } - s->incoming_deadline = gpr_inf_future; - grpc_sopb_init(&s->writing.sopb); - grpc_sopb_init(&s->callback_sopb); - grpc_chttp2_data_parser_init(&s->parser); - if (initial_op) perform_op_locked(t, s, initial_op); - unlock(t); return 0; } -static void schedule_nuke_sopb(grpc_chttp2_transport *t, grpc_stream_op_buffer *sopb) { - grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops); - sopb->nops = 0; -} - static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; @@ -409,7 +375,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { gpr_mu_lock(&t->mu); - GPR_ASSERT(s->published_state == GRPC_STREAM_CLOSED || s->global.id == 0); + GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || s->global.id == 0); for (i = 0; i < STREAM_LIST_COUNT; i++) { stream_list_remove(t, s, i); @@ -418,15 +384,14 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { gpr_mu_unlock(&t->mu); GPR_ASSERT(s->global.outgoing_sopb == NULL); - GPR_ASSERT(s->incoming_sopb == NULL); + GPR_ASSERT(s->global.incoming_sopb == NULL); grpc_sopb_destroy(&s->writing.sopb); - grpc_sopb_destroy(&s->callback_sopb); - grpc_chttp2_data_parser_destroy(&s->parser); - for (i = 0; i < s->incoming_metadata_count; i++) { - grpc_mdelem_unref(s->incoming_metadata[i].md); + grpc_chttp2_data_parser_destroy(&s->parsing.data_parser); + for (i = 0; i < s->parsing.incoming_metadata_count; i++) { + grpc_mdelem_unref(s->parsing.incoming_metadata[i].md); } - gpr_free(s->incoming_metadata); - gpr_free(s->old_incoming_metadata); + gpr_free(s->parsing.incoming_metadata); + gpr_free(s->parsing.old_incoming_metadata); unref_transport(t); } @@ -495,14 +460,16 @@ static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gr stream_list_add_tail(t, s, id); } +#if 0 static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { if (s->global.id == 0) return; IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing grpc_chttp2_stream %d", - t->constants.is_client ? "CLI" : "SVR", s->global.id)); + t->global.is_client ? "CLI" : "SVR", s->global.id)); if (grpc_chttp2_stream_map_delete(&t->stream_map, s->global.id)) { maybe_start_some_streams(t); } } +#endif /* * LOCK MANAGEMENT @@ -518,7 +485,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_chttp2_transport *t) { grpc_iomgr_closure *run_closures; - if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->constants, &t->global, &t->writing)) { + if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; ref_transport(t); schedule_cb(t, &t->writing_action, 1); @@ -568,15 +535,12 @@ void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writ } /* cleanup writing related jazz */ - grpc_chttp2_cleanup_writing(&t->constants, &t->global, &t->writing); + grpc_chttp2_cleanup_writing(&t->global, &t->writing); /* leave the writing flag up on shutdown to prevent further writes in unlock() from starting */ t->writing_active = 0; - if (t->destroying) { - gpr_cv_signal(&t->cv); - } - if (!t->reading) { + if (!t->endpoint_reading) { grpc_endpoint_destroy(t->ep); t->ep = NULL; unref_transport(t); /* safe because we'll still have the ref for write */ @@ -595,50 +559,42 @@ static void writing_action(void *gt, int iomgr_success_ignored) { static void add_goaway(grpc_chttp2_transport *t, gpr_uint32 goaway_error, gpr_slice goaway_text) { - if (t->num_pending_goaways == t->cap_pending_goaways) { - t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2); - t->pending_goaways = gpr_realloc( - t->pending_goaways, sizeof(grpc_chttp2_pending_goaway) * t->cap_pending_goaways); - } - t->pending_goaways[t->num_pending_goaways].status = - grpc_chttp2_http2_error_to_grpc_status(goaway_error); - t->pending_goaways[t->num_pending_goaways].debug = goaway_text; - t->num_pending_goaways++; + gpr_slice_unref(t->channel_callback.goaway_text); + t->channel_callback.have_goaway = 1; + t->channel_callback.goaway_text = goaway_text; + t->channel_callback.goaway_error = goaway_error; } static void maybe_start_some_streams(grpc_chttp2_transport *t) { + grpc_chttp2_stream *s; /* start streams where we have free grpc_chttp2_stream ids and free concurrency */ - while (!t->parsing.executing && t->next_stream_id <= MAX_CLIENT_STREAM_ID && - grpc_chttp2_stream_map_size(&t->stream_map) < + while (t->global.next_stream_id <= MAX_CLIENT_STREAM_ID && + t->global.concurrent_stream_count < t->global.settings[PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { - grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); - if (!s) return; - + [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && + (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) { IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d", - t->constants.is_client ? "CLI" : "SVR", s, t->next_stream_id)); + t->global.is_client ? "CLI" : "SVR", s, t->global.next_stream_id)); - if (t->next_stream_id == MAX_CLIENT_STREAM_ID) { + if (t->global.next_stream_id == MAX_CLIENT_STREAM_ID) { add_goaway( t, GRPC_CHTTP2_NO_ERROR, gpr_slice_from_copied_string("Exceeded sequence number limit")); } GPR_ASSERT(s->global.id == 0); - s->global.id = t->next_stream_id; - t->next_stream_id += 2; + s->global.id = t->global.next_stream_id; + t->global.next_stream_id += 2; s->global.outgoing_window = t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->global.incoming_window = t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s); + grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s); + t->global.concurrent_stream_count++; stream_list_join(t, s, WRITABLE); } /* cancel out streams that will never be started */ - while (t->next_stream_id > MAX_CLIENT_STREAM_ID) { - grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); - if (!s) return; - + while (t->global.next_stream_id > MAX_CLIENT_STREAM_ID && (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) { cancel_stream( t, s, GRPC_STATUS_UNAVAILABLE, grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL, @@ -646,6 +602,7 @@ static void maybe_start_some_streams(grpc_chttp2_transport *t) { } } +#if 0 static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_transport_op *op) { if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_stream( @@ -665,27 +622,27 @@ static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, g if (s->global.id == 0) { IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency", - t->constants.is_client ? "CLI" : "SVR", s)); + t->global.is_client ? "CLI" : "SVR", s)); stream_list_join(t, s, WAITING_FOR_CONCURRENCY); maybe_start_some_streams(t); } else if (s->global.outgoing_window > 0) { stream_list_join(t, s, WRITABLE); } } else { - schedule_nuke_sopb(t, op->send_ops); + grpc_sopb_reset(op->send_ops); schedule_cb(t, s->global.send_done_closure, 0); } } if (op->recv_ops) { - GPR_ASSERT(s->incoming_sopb == NULL); - GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED); + GPR_ASSERT(s->global.incoming_sopb == NULL); + GPR_ASSERT(s->global.published_state != GRPC_STREAM_CLOSED); s->global.recv_done_closure = op->on_done_recv; - s->incoming_sopb = op->recv_ops; - s->incoming_sopb->nops = 0; - s->publish_state = op->recv_state; - gpr_free(s->old_incoming_metadata); - s->old_incoming_metadata = NULL; + s->global.incoming_sopb = op->recv_ops; + s->global.incoming_sopb->nops = 0; + s->global.publish_state = op->recv_state; + gpr_free(s->global.old_incoming_metadata); + s->global.old_incoming_metadata = NULL; maybe_finish_read(t, s, 0); maybe_join_window_updates(t, s); } @@ -698,6 +655,7 @@ static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, g schedule_cb(t, op->on_consumed, 1); } } +#endif static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) { @@ -709,28 +667,23 @@ static void perform_op(grpc_transport *gt, grpc_stream *gs, unlock(t); } -static void send_ping(grpc_transport *gt, void (*cb)(void *user_data), - void *user_data) { +static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_outstanding_ping *p; + grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); lock(t); - if (t->ping_capacity == t->ping_count) { - t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2); - t->pings = - gpr_realloc(t->pings, sizeof(grpc_chttp2_outstanding_ping) * t->ping_capacity); - } - p = &t->pings[t->ping_count++]; - p->id[0] = (t->ping_counter >> 56) & 0xff; - p->id[1] = (t->ping_counter >> 48) & 0xff; - p->id[2] = (t->ping_counter >> 40) & 0xff; - p->id[3] = (t->ping_counter >> 32) & 0xff; - p->id[4] = (t->ping_counter >> 24) & 0xff; - p->id[5] = (t->ping_counter >> 16) & 0xff; - p->id[6] = (t->ping_counter >> 8) & 0xff; - p->id[7] = t->ping_counter & 0xff; - p->cb = cb; - p->user_data = user_data; + p->next = &t->global.pings; + p->prev = p->next->prev; + p->prev->next = p->next->prev = p; + p->id[0] = (t->global.ping_counter >> 56) & 0xff; + p->id[1] = (t->global.ping_counter >> 48) & 0xff; + p->id[2] = (t->global.ping_counter >> 40) & 0xff; + p->id[3] = (t->global.ping_counter >> 32) & 0xff; + p->id[4] = (t->global.ping_counter >> 24) & 0xff; + p->id[5] = (t->global.ping_counter >> 16) & 0xff; + p->id[6] = (t->global.ping_counter >> 8) & 0xff; + p->id[7] = t->global.ping_counter & 0xff; + p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); unlock(t); } @@ -753,6 +706,7 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) { } } +#if 0 static void cancel_stream_inner(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, @@ -844,15 +798,17 @@ static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *grpc_chttp2_s static void end_all_the_calls(grpc_chttp2_transport *t) { grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t); } +#endif static void drop_connection(grpc_chttp2_transport *t) { - if (t->error_state == ERROR_STATE_NONE) { - t->error_state = ERROR_STATE_SEEN; + if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { + t->global.error_state = GRPC_CHTTP2_ERROR_STATE_SEEN; } close_transport_locked(t); end_all_the_calls(t); } +#if 0 static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, int is_parser) { if (is_parser) { stream_list_join(t, s, MAYBE_FINISH_READ_AFTER_PARSE); @@ -860,6 +816,7 @@ static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, i stream_list_join(t, s, FINISHED_READ_OP); } } +#endif static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { if (t->parsing.executing) { @@ -875,15 +832,16 @@ static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stre } } +#if 0 static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id) { return grpc_chttp2_stream_map_find(&t->stream_map, id); } +#endif /* tcp read callback */ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error) { grpc_chttp2_transport *t = tp; - grpc_chttp2_stream *s; size_t i; int keep_reading = 0; @@ -893,8 +851,8 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, case GRPC_ENDPOINT_CB_ERROR: lock(t); drop_connection(t); - t->reading = 0; - if (!t->writing.executing && t->ep) { + t->endpoint_reading = 0; + if (!t->writing_active && t->ep) { grpc_endpoint_destroy(t->ep); t->ep = NULL; unref_transport(t); /* safe as we still have a ref for read */ @@ -904,9 +862,10 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, break; case GRPC_ENDPOINT_CB_OK: lock(t); - GPR_ASSERT(!t->parsing.executing); - if (t->error_state == ERROR_STATE_NONE) { - t->parsing.executing = 1; + GPR_ASSERT(!t->parsing_active); + if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { + t->parsing_active = 1; + grpc_chttp2_prepare_to_read(&t->global, &t->parsing); gpr_mu_unlock(&t->mu); for (i = 0; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); i++) ; @@ -914,8 +873,14 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, if (i != nslices) { drop_connection(t); } - t->parsing.executing = 0; + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); + t->global.concurrent_stream_count = grpc_stream_map_size(&t->parsing_stream_map); + /* handle higher level things */ + grpc_chttp2_publish_reads(&t->global, &t->parsing); + t->parsing_active = 0; } +#if 0 while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) { maybe_finish_read(t, s, 0); } @@ -940,6 +905,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, t->global.outgoing_window += t->global.outgoing_window_update; t->global.outgoing_window_update = 0; maybe_start_some_streams(t); +#endif unlock(t); keep_reading = 1; break; @@ -964,92 +930,6 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, return GRPC_STREAM_OPEN; } -static void patch_metadata_ops(grpc_chttp2_stream *s) { - grpc_stream_op *ops = s->incoming_sopb->ops; - size_t nops = s->incoming_sopb->nops; - size_t i; - size_t j; - size_t mdidx = 0; - size_t last_mdidx; - int found_metadata = 0; - - /* rework the array of metadata into a linked list, making use - of the breadcrumbs we left in metadata batches during - add_metadata_batch */ - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) continue; - found_metadata = 1; - /* we left a breadcrumb indicating where the end of this list is, - and since we add sequentially, we know from the end of the last - segment where this segment begins */ - last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail); - GPR_ASSERT(last_mdidx > mdidx); - GPR_ASSERT(last_mdidx <= s->incoming_metadata_count); - /* turn the array into a doubly linked list */ - op->data.metadata.list.head = &s->incoming_metadata[mdidx]; - op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1]; - for (j = mdidx + 1; j < last_mdidx; j++) { - s->incoming_metadata[j].prev = &s->incoming_metadata[j - 1]; - s->incoming_metadata[j - 1].next = &s->incoming_metadata[j]; - } - s->incoming_metadata[mdidx].prev = NULL; - s->incoming_metadata[last_mdidx - 1].next = NULL; - /* track where we're up to */ - mdidx = last_mdidx; - } - if (found_metadata) { - s->old_incoming_metadata = s->incoming_metadata; - if (mdidx != s->incoming_metadata_count) { - /* we have a partially read metadata batch still in incoming_metadata */ - size_t new_count = s->incoming_metadata_count - mdidx; - size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count; - GPR_ASSERT(mdidx < s->incoming_metadata_count); - s->incoming_metadata = gpr_malloc(copy_bytes); - memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, - copy_bytes); - s->incoming_metadata_count = s->incoming_metadata_capacity = new_count; - } else { - s->incoming_metadata = NULL; - s->incoming_metadata_count = 0; - s->incoming_metadata_capacity = 0; - } - } -} - -static void unlock_check_parser(grpc_chttp2_transport *t) { - grpc_chttp2_stream *s; - - if (t->parsing.executing) { - return; - } - - while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { - int publish = 0; - GPR_ASSERT(s->incoming_sopb); - *s->publish_state = - compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed); - if (*s->publish_state != s->published_state) { - s->published_state = *s->publish_state; - publish = 1; - if (s->published_state == GRPC_STREAM_CLOSED) { - remove_from_stream_map(t, s); - } - } - if (s->parser.incoming_sopb.nops > 0) { - grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb); - publish = 1; - } - if (publish) { - if (s->incoming_metadata_count > 0) { - patch_metadata_ops(s); - } - s->incoming_sopb = NULL; - schedule_cb(t, s->global.recv_done_closure, 1); - } - } -} - typedef struct { grpc_chttp2_transport *t; grpc_chttp2_pending_goaway *goaways;