Change transport contract to automatically disconnect after sending a goaway

iff there are no calls left - lets us remove this tracking from the server where it required a server-wide lock, and instead do the processing under the transport lock which parallelizes much more cleanly.
pull/2304/head
Craig Tiller 10 years ago
parent 092d8d1b7f
commit 9188d7a2df
  1. 86
      src/core/surface/server.c
  2. 8
      src/core/transport/chttp2/internal.h
  3. 9
      src/core/transport/chttp2/stream_lists.c
  4. 11
      src/core/transport/chttp2_transport.c
  5. 4
      src/core/transport/transport.h

@ -114,7 +114,6 @@ typedef struct channel_registered_method {
struct channel_data { struct channel_data {
grpc_server *server; grpc_server *server;
size_t num_calls;
grpc_connectivity_state connectivity_state; grpc_connectivity_state connectivity_state;
grpc_channel *channel; grpc_channel *channel;
grpc_mdstr *path_key; grpc_mdstr *path_key;
@ -183,10 +182,6 @@ typedef enum {
struct call_data { struct call_data {
grpc_call *call; grpc_call *call;
/** is this call counted towards the channels total
number of calls? */
gpr_uint8 active;
call_state state; call_state state;
grpc_mdstr *path; grpc_mdstr *path;
grpc_mdstr *host; grpc_mdstr *host;
@ -208,9 +203,7 @@ struct call_data {
typedef struct { typedef struct {
grpc_channel **channels; grpc_channel **channels;
grpc_channel **disconnects;
size_t num_channels; size_t num_channels;
size_t num_disconnects;
} channel_broadcaster; } channel_broadcaster;
#define SERVER_FROM_CALL_ELEM(elem) \ #define SERVER_FROM_CALL_ELEM(elem) \
@ -229,26 +222,15 @@ static void maybe_finish_shutdown(grpc_server *server);
static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
channel_data *c; channel_data *c;
size_t count = 0; size_t count = 0;
size_t dc_count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
count++; count++;
if (c->num_calls == 0) {
dc_count++;
}
} }
cb->num_channels = count; cb->num_channels = count;
cb->num_disconnects = dc_count;
cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels); cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects);
count = 0; count = 0;
dc_count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
cb->channels[count++] = c->channel; cb->channels[count++] = c->channel;
GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast"); GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
if (c->num_calls == 0) {
cb->disconnects[dc_count++] = c->channel;
GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect");
}
} }
} }
@ -287,28 +269,11 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb,
int send_goaway, int force_disconnect) { int send_goaway, int force_disconnect) {
size_t i; size_t i;
if (send_goaway) {
for (i = 0; i < cb->num_channels; i++) {
send_shutdown(cb->channels[i], 1, 0);
}
}
if (force_disconnect) {
for (i = 0; i < cb->num_channels; i++) {
send_shutdown(cb->channels[i], 0, 1);
}
} else {
for (i = 0; i < cb->num_disconnects; i++) {
send_shutdown(cb->disconnects[i], 0, 1);
}
}
for (i = 0; i < cb->num_channels; i++) { for (i = 0; i < cb->num_channels; i++) {
send_shutdown(cb->channels[i], send_goaway, force_disconnect);
GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
} }
for (i = 0; i < cb->num_disconnects; i++) {
GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect");
}
gpr_free(cb->channels); gpr_free(cb->channels);
gpr_free(cb->disconnects);
} }
/* call list */ /* call list */
@ -548,22 +513,10 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md; return md;
} }
static int decrement_call_count(channel_data *chand) {
int disconnect = 0;
chand->num_calls--;
if (0 == chand->num_calls && chand->server->shutdown) {
disconnect = 1;
}
maybe_finish_shutdown(chand->server);
return disconnect;
}
static void server_on_recv(void *ptr, int success) { static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr; grpc_call_element *elem = ptr;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
int remove_res;
int disconnect = 0;
if (success && !calld->got_initial_metadata) { if (success && !calld->got_initial_metadata) {
size_t i; size_t i;
@ -608,21 +561,7 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure); grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} }
remove_res = calld->active;
calld->active = 0;
gpr_mu_unlock(&chand->server->mu_call); gpr_mu_unlock(&chand->server->mu_call);
gpr_mu_lock(&chand->server->mu_global);
if (remove_res) {
disconnect = decrement_call_count(chand);
if (disconnect) {
GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect");
}
}
gpr_mu_unlock(&chand->server->mu_global);
if (disconnect) {
send_shutdown(chand->channel, 0, 1);
GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect");
}
break; break;
} }
@ -684,14 +623,9 @@ static void init_call_elem(grpc_call_element *elem,
memset(calld, 0, sizeof(call_data)); memset(calld, 0, sizeof(call_data));
calld->deadline = gpr_inf_future; calld->deadline = gpr_inf_future;
calld->call = grpc_call_from_top_element(elem); calld->call = grpc_call_from_top_element(elem);
calld->active = 1;
grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
gpr_mu_lock(&chand->server->mu_global);
chand->num_calls++;
gpr_mu_unlock(&chand->server->mu_global);
server_ref(chand->server); server_ref(chand->server);
if (initial_op) server_mutate_op(elem, initial_op); if (initial_op) server_mutate_op(elem, initial_op);
@ -700,30 +634,13 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) { static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
int disconnect = 0;
int active;
size_t i; size_t i;
gpr_mu_lock(&chand->server->mu_call); gpr_mu_lock(&chand->server->mu_call);
for (i = 0; i < CALL_LIST_COUNT; i++) { for (i = 0; i < CALL_LIST_COUNT; i++) {
call_list_remove(elem->call_data, i); call_list_remove(elem->call_data, i);
} }
active = calld->active;
calld->active = 0;
gpr_mu_unlock(&chand->server->mu_call); gpr_mu_unlock(&chand->server->mu_call);
if (active) {
gpr_mu_lock(&chand->server->mu_global);
disconnect = decrement_call_count(chand);
if (disconnect) {
GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect");
}
gpr_mu_unlock(&chand->server->mu_global);
}
if (disconnect) {
send_shutdown(chand->channel, 0, 1);
GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect");
}
if (calld->host) { if (calld->host) {
grpc_mdstr_unref(calld->host); grpc_mdstr_unref(calld->host);
@ -743,7 +660,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
GPR_ASSERT(is_first); GPR_ASSERT(is_first);
GPR_ASSERT(!is_last); GPR_ASSERT(!is_last);
chand->server = NULL; chand->server = NULL;
chand->num_calls = 0;
chand->channel = NULL; chand->channel = NULL;
chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");

@ -173,6 +173,8 @@ typedef struct {
/** have we seen a goaway */ /** have we seen a goaway */
gpr_uint8 seen_goaway; gpr_uint8 seen_goaway;
/** have we sent a goaway */
gpr_uint8 sent_goaway;
/** is this transport a client? */ /** is this transport a client? */
gpr_uint8 is_client; gpr_uint8 is_client;
@ -557,8 +559,10 @@ void grpc_chttp2_add_incoming_goaway(
void grpc_chttp2_register_stream(grpc_chttp2_transport *t, void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s); grpc_chttp2_stream *s);
void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, /* returns 1 if this is the last stream, 0 otherwise */
grpc_chttp2_stream *s); int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT;
int grpc_chttp2_has_streams(grpc_chttp2_transport *t);
void grpc_chttp2_for_all_streams( void grpc_chttp2_for_all_streams(
grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_transport_global *transport_global, void *user_data,
void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data,

@ -354,9 +354,14 @@ void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
} }
void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) { grpc_chttp2_stream *s) {
stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
return stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
int grpc_chttp2_has_streams(grpc_chttp2_transport *t) {
return !stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS);
} }
void grpc_chttp2_for_all_streams( void grpc_chttp2_for_all_streams(

@ -385,7 +385,9 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
s->global.id == 0); s->global.id == 0);
GPR_ASSERT(!s->global.in_stream_map); GPR_ASSERT(!s->global.in_stream_map);
grpc_chttp2_unregister_stream(t, s); if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
close_transport_locked(t);
}
if (!t->parsing_active && s->global.id) { if (!t->parsing_active && s->global.id) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
s->global.id) == NULL); s->global.id) == NULL);
@ -684,10 +686,14 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
} }
if (op->send_goaway) { if (op->send_goaway) {
t->global.sent_goaway = 1;
grpc_chttp2_goaway_append( grpc_chttp2_goaway_append(
t->global.last_incoming_stream_id, t->global.last_incoming_stream_id,
grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
*op->goaway_message, &t->global.qbuf); *op->goaway_message, &t->global.qbuf);
if (!grpc_chttp2_has_streams(t)) {
close_transport_locked(t);
}
} }
if (op->set_accept_stream != NULL) { if (op->set_accept_stream != NULL) {
@ -736,6 +742,9 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
t->parsing.incoming_stream = NULL; t->parsing.incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(&t->parsing); grpc_chttp2_parsing_become_skip_parser(&t->parsing);
} }
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
close_transport_locked(t);
}
new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
grpc_chttp2_stream_map_size(&t->new_stream_map); grpc_chttp2_stream_map_size(&t->new_stream_map);

@ -91,7 +91,9 @@ typedef struct grpc_transport_op {
grpc_connectivity_state *connectivity_state; grpc_connectivity_state *connectivity_state;
/** should the transport be disconnected */ /** should the transport be disconnected */
int disconnect; int disconnect;
/** should we send a goaway? */ /** should we send a goaway?
after a goaway is sent, once there are no more active calls on
the transport, the transport should disconnect */
int send_goaway; int send_goaway;
/** what should the goaway contain? */ /** what should the goaway contain? */
grpc_status_code goaway_status; grpc_status_code goaway_status;

Loading…
Cancel
Save