Fix shutdown semantics.

Document what they should be, ensure they're triggered, and fix what was broken.
pull/561/head
Craig Tiller 10 years ago
parent e1b97b608a
commit aea2fc053d
  1. 10
      include/grpc/grpc.h
  2. 35
      src/core/surface/server.c
  3. 2
      test/core/end2end/tests/early_server_shutdown_finishes_tags.c

@ -564,15 +564,19 @@ void grpc_server_start(grpc_server *server);
/* Begin shutting down a server. /* Begin shutting down a server.
After completion, no new calls or connections will be admitted. After completion, no new calls or connections will be admitted.
Existing calls will be allowed to complete. */ Existing calls will be allowed to complete.
Shutdown is idempotent. */
void grpc_server_shutdown(grpc_server *server); void grpc_server_shutdown(grpc_server *server);
/* As per grpc_server_shutdown, but send a GRPC_SERVER_SHUTDOWN event when /* As per grpc_server_shutdown, but send a GRPC_SERVER_SHUTDOWN event when
there are no more calls being serviced. */ there are no more calls being serviced.
Shutdown is idempotent, and all tags will be notified at once if multiple
grpc_server_shutdown_and_notify calls are made. */
void grpc_server_shutdown_and_notify(grpc_server *server, void *tag); void grpc_server_shutdown_and_notify(grpc_server *server, void *tag);
/* Destroy a server. /* Destroy a server.
Forcefully cancels all existing calls. */ Forcefully cancels all existing calls.
Implies grpc_server_shutdown() if one was not previously performed. */
void grpc_server_destroy(grpc_server *server); void grpc_server_destroy(grpc_server *server);
#ifdef __cplusplus #ifdef __cplusplus

@ -98,8 +98,8 @@ struct grpc_server {
size_t requested_call_capacity; size_t requested_call_capacity;
gpr_uint8 shutdown; gpr_uint8 shutdown;
gpr_uint8 have_shutdown_tag; size_t num_shutdown_tags;
void *shutdown_tag; void **shutdown_tags;
call_data *lists[CALL_LIST_COUNT]; call_data *lists[CALL_LIST_COUNT];
channel_data root_channel_data; channel_data root_channel_data;
@ -206,6 +206,7 @@ static void server_unref(grpc_server *server) {
gpr_mu_destroy(&server->mu); gpr_mu_destroy(&server->mu);
gpr_free(server->channel_filters); gpr_free(server->channel_filters);
gpr_free(server->requested_calls); gpr_free(server->requested_calls);
gpr_free(server->shutdown_tags);
gpr_free(server); gpr_free(server);
} }
} }
@ -407,15 +408,17 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) { static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
int i; size_t i;
gpr_mu_lock(&chand->server->mu); gpr_mu_lock(&chand->server->mu);
for (i = 0; i < CALL_LIST_COUNT; i++) { for (i = 0; i < CALL_LIST_COUNT; i++) {
call_list_remove(chand->server, elem->call_data, i); call_list_remove(chand->server, elem->call_data, i);
} }
if (chand->server->shutdown && chand->server->have_shutdown_tag && if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
chand->server->lists[ALL_CALLS] == NULL) { for (i = 0; i < chand->server->num_shutdown_tags; i++) {
grpc_cq_end_server_shutdown(chand->server->cq, chand->server->shutdown_tag); grpc_cq_end_server_shutdown(chand->server->cq,
chand->server->shutdown_tags[i]);
}
} }
gpr_mu_unlock(&chand->server->mu); gpr_mu_unlock(&chand->server->mu);
@ -572,6 +575,13 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
/* lock, and gather up some stuff to do */ /* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu);
if (have_shutdown_tag) {
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN);
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(void *) * (server->num_shutdown_tags + 1));
server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
}
if (server->shutdown) { if (server->shutdown) {
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu);
return; return;
@ -597,12 +607,9 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
server->requested_call_count = 0; server->requested_call_count = 0;
server->shutdown = 1; server->shutdown = 1;
server->have_shutdown_tag = have_shutdown_tag;
server->shutdown_tag = shutdown_tag;
if (have_shutdown_tag) {
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN);
if (server->lists[ALL_CALLS] == NULL) { if (server->lists[ALL_CALLS] == NULL) {
grpc_cq_end_server_shutdown(server->cq, shutdown_tag); for (i = 0; i < server->num_shutdown_tags; i++) {
grpc_cq_end_server_shutdown(server->cq, server->shutdown_tags[i]);
} }
} }
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu);
@ -653,6 +660,12 @@ void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
void grpc_server_destroy(grpc_server *server) { void grpc_server_destroy(grpc_server *server) {
channel_data *c; channel_data *c;
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu);
if (!server->shutdown) {
gpr_mu_unlock(&server->mu);
grpc_server_shutdown(server);
gpr_mu_lock(&server->mu);
}
for (c = server->root_channel_data.next; c != &server->root_channel_data; for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) { c = c->next) {
shutdown_channel(c); shutdown_channel(c);

@ -79,7 +79,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown(f->server); /* don't shutdown, just destroy, to tickle this code edge */
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;
} }

Loading…
Cancel
Save