diff --git a/src/core/surface/server.c b/src/core/surface/server.c index ee394bb33a7..341ca2942c8 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -114,7 +114,6 @@ typedef struct channel_registered_method { struct channel_data { grpc_server *server; - size_t num_calls; grpc_connectivity_state connectivity_state; grpc_channel *channel; grpc_mdstr *path_key; @@ -183,10 +182,6 @@ typedef enum { struct call_data { grpc_call *call; - /** is this call counted towards the channels total - number of calls? */ - gpr_uint8 active; - call_state state; grpc_mdstr *path; grpc_mdstr *host; @@ -208,9 +203,7 @@ struct call_data { typedef struct { grpc_channel **channels; - grpc_channel **disconnects; size_t num_channels; - size_t num_disconnects; } channel_broadcaster; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -229,26 +222,15 @@ static void maybe_finish_shutdown(grpc_server *server); static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { channel_data *c; size_t count = 0; - size_t dc_count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { count++; - if (c->num_calls == 0) { - dc_count++; - } } cb->num_channels = count; - cb->num_disconnects = dc_count; cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels); - cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects); count = 0; - dc_count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { cb->channels[count++] = c->channel; GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast"); - if (c->num_calls == 0) { - cb->disconnects[dc_count++] = c->channel; - GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect"); - } } } @@ -287,28 +269,11 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, int send_goaway, int force_disconnect) { size_t i; - if (send_goaway) { - for (i = 0; i < cb->num_channels; i++) { - send_shutdown(cb->channels[i], 1, 0); - } - } - if (force_disconnect) { - for (i = 0; i < cb->num_channels; i++) { - send_shutdown(cb->channels[i], 0, 1); - } - } else { - for (i = 0; i < cb->num_disconnects; i++) { - send_shutdown(cb->disconnects[i], 0, 1); - } - } for (i = 0; i < cb->num_channels; i++) { + send_shutdown(cb->channels[i], send_goaway, force_disconnect); GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); } - for (i = 0; i < cb->num_disconnects; i++) { - GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect"); - } gpr_free(cb->channels); - gpr_free(cb->disconnects); } /* call list */ @@ -548,22 +513,10 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static int decrement_call_count(channel_data *chand) { - int disconnect = 0; - chand->num_calls--; - if (0 == chand->num_calls && chand->server->shutdown) { - disconnect = 1; - } - maybe_finish_shutdown(chand->server); - return disconnect; -} - static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - int remove_res; - int disconnect = 0; if (success && !calld->got_initial_metadata) { size_t i; @@ -608,21 +561,7 @@ static void server_on_recv(void *ptr, int success) { grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); } - remove_res = calld->active; - calld->active = 0; gpr_mu_unlock(&chand->server->mu_call); - gpr_mu_lock(&chand->server->mu_global); - if (remove_res) { - disconnect = decrement_call_count(chand); - if (disconnect) { - GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect"); - } - } - gpr_mu_unlock(&chand->server->mu_global); - if (disconnect) { - send_shutdown(chand->channel, 0, 1); - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect"); - } break; } @@ -684,14 +623,9 @@ static void init_call_elem(grpc_call_element *elem, memset(calld, 0, sizeof(call_data)); calld->deadline = gpr_inf_future; calld->call = grpc_call_from_top_element(elem); - calld->active = 1; grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); - gpr_mu_lock(&chand->server->mu_global); - chand->num_calls++; - gpr_mu_unlock(&chand->server->mu_global); - server_ref(chand->server); if (initial_op) server_mutate_op(elem, initial_op); @@ -700,30 +634,13 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - int disconnect = 0; - int active; size_t i; gpr_mu_lock(&chand->server->mu_call); for (i = 0; i < CALL_LIST_COUNT; i++) { call_list_remove(elem->call_data, i); } - active = calld->active; - calld->active = 0; gpr_mu_unlock(&chand->server->mu_call); - if (active) { - gpr_mu_lock(&chand->server->mu_global); - disconnect = decrement_call_count(chand); - if (disconnect) { - GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect"); - } - gpr_mu_unlock(&chand->server->mu_global); - } - - if (disconnect) { - send_shutdown(chand->channel, 0, 1); - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect"); - } if (calld->host) { grpc_mdstr_unref(calld->host); @@ -743,7 +660,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GPR_ASSERT(is_first); GPR_ASSERT(!is_last); chand->server = NULL; - chand->num_calls = 0; chand->channel = NULL; chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 7f98a5bd711..bdd4b432ebd 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -173,6 +173,8 @@ typedef struct { /** have we seen a goaway */ gpr_uint8 seen_goaway; + /** have we sent a goaway */ + gpr_uint8 sent_goaway; /** is this transport a client? */ gpr_uint8 is_client; @@ -557,8 +559,10 @@ void grpc_chttp2_add_incoming_goaway( void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, - grpc_chttp2_stream *s); +/* returns 1 if this is the last stream, 0 otherwise */ +int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT; +int grpc_chttp2_has_streams(grpc_chttp2_transport *t); void grpc_chttp2_for_all_streams( grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 85691b32d26..4fea058c193 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -354,9 +354,14 @@ void grpc_chttp2_register_stream(grpc_chttp2_transport *t, stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); } -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, +int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); + stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); + return stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS); +} + +int grpc_chttp2_has_streams(grpc_chttp2_transport *t) { + return !stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS); } void grpc_chttp2_for_all_streams( diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 0a7b8f5bf9a..d955de88654 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -385,7 +385,9 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); - grpc_chttp2_unregister_stream(t, s); + if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { + close_transport_locked(t); + } if (!t->parsing_active && s->global.id) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, s->global.id) == NULL); @@ -684,10 +686,14 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { } if (op->send_goaway) { + t->global.sent_goaway = 1; grpc_chttp2_goaway_append( t->global.last_incoming_stream_id, grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), *op->goaway_message, &t->global.qbuf); + if (!grpc_chttp2_has_streams(t)) { + close_transport_locked(t); + } } if (op->set_accept_stream != NULL) { @@ -736,6 +742,9 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { t->parsing.incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(&t->parsing); } + if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { + close_transport_locked(t); + } new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + grpc_chttp2_stream_map_size(&t->new_stream_map); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 579bcc943fa..1429737721c 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -91,7 +91,9 @@ typedef struct grpc_transport_op { grpc_connectivity_state *connectivity_state; /** should the transport be disconnected */ int disconnect; - /** should we send a goaway? */ + /** should we send a goaway? + after a goaway is sent, once there are no more active calls on + the transport, the transport should disconnect */ int send_goaway; /** what should the goaway contain? */ grpc_status_code goaway_status;