Merge pull request #88 from ctiller/shutdown

Add an optional server shutdown event.
pull/93/head
Yang Gao 10 years ago
commit aa6f993841
  1. 206
      Makefile
  2. 5
      include/grpc/grpc.h
  3. 7
      src/core/surface/completion_queue.c
  4. 2
      src/core/surface/completion_queue.h
  5. 3
      src/core/surface/event_string.c
  6. 25
      src/core/surface/server.c
  7. 8
      test/core/end2end/cq_verifier.c
  8. 1
      test/core/end2end/cq_verifier.h
  9. 1
      test/core/end2end/gen_build_json.py
  10. 160
      test/core/end2end/tests/graceful_server_shutdown.c

File diff suppressed because one or more lines are too long

@ -194,6 +194,7 @@ typedef enum grpc_completion_type {
GRPC_FINISHED, /* An RPC has finished. The event contains status. GRPC_FINISHED, /* An RPC has finished. The event contains status.
On the server this will be OK or Cancelled. */ On the server this will be OK or Cancelled. */
GRPC_SERVER_RPC_NEW, /* A new RPC has arrived at the server */ GRPC_SERVER_RPC_NEW, /* A new RPC has arrived at the server */
GRPC_SERVER_SHUTDOWN, /* The server has finished shutting down */
GRPC_COMPLETION_DO_NOT_USE /* must be last, forces users to include GRPC_COMPLETION_DO_NOT_USE /* must be last, forces users to include
a default: case */ a default: case */
} grpc_completion_type; } grpc_completion_type;
@ -439,6 +440,10 @@ void grpc_server_start(grpc_server *server);
Existing calls will be allowed to complete. */ Existing calls will be allowed to complete. */
void grpc_server_shutdown(grpc_server *server); void grpc_server_shutdown(grpc_server *server);
/* As per grpc_server_shutdown, but send a GRPC_SERVER_SHUTDOWN event when
there are no more calls being serviced. */
void grpc_server_shutdown_and_notify(grpc_server *server, void *tag);
/* Destroy a server. /* Destroy a server.
Forcefully cancels all existing calls. */ Forcefully cancels all existing calls. */
void grpc_server_destroy(grpc_server *server); void grpc_server_destroy(grpc_server *server);

@ -155,6 +155,13 @@ static void end_op_locked(grpc_completion_queue *cc,
} }
} }
void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
add_locked(cc, GRPC_SERVER_SHUTDOWN, tag, NULL, NULL, NULL);
end_op_locked(cc, GRPC_SERVER_SHUTDOWN);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data, grpc_event_finish_func on_finish, void *user_data,
grpc_byte_buffer *read) { grpc_byte_buffer *read) {

@ -97,6 +97,8 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_timespec deadline, size_t metadata_count, gpr_timespec deadline, size_t metadata_count,
grpc_metadata *metadata_elements); grpc_metadata *metadata_elements);
void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag);
/* disable polling for some tests */ /* disable polling for some tests */
void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc); void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc);

@ -63,6 +63,9 @@ char *grpc_event_string(grpc_event *ev) {
if (ev == NULL) return gpr_strdup("null"); if (ev == NULL) return gpr_strdup("null");
switch (ev->type) { switch (ev->type) {
case GRPC_SERVER_SHUTDOWN:
p += sprintf(p, "SERVER_SHUTDOWN");
break;
case GRPC_QUEUE_SHUTDOWN: case GRPC_QUEUE_SHUTDOWN:
p += sprintf(p, "QUEUE_SHUTDOWN"); p += sprintf(p, "QUEUE_SHUTDOWN");
break; break;

@ -81,6 +81,8 @@ struct grpc_server {
size_t tag_cap; size_t tag_cap;
gpr_uint8 shutdown; gpr_uint8 shutdown;
gpr_uint8 have_shutdown_tag;
void *shutdown_tag;
call_data *lists[CALL_LIST_COUNT]; call_data *lists[CALL_LIST_COUNT];
channel_data root_channel_data; channel_data root_channel_data;
@ -375,6 +377,10 @@ static void destroy_call_elem(grpc_call_element *elem) {
for (i = 0; i < CALL_LIST_COUNT; i++) { for (i = 0; i < CALL_LIST_COUNT; i++) {
call_list_remove(chand->server, elem->call_data, i); call_list_remove(chand->server, elem->call_data, i);
} }
if (chand->server->shutdown && chand->server->have_shutdown_tag &&
chand->server->lists[ALL_CALLS] == NULL) {
grpc_cq_end_server_shutdown(chand->server->cq, chand->server->shutdown_tag);
}
gpr_mu_unlock(&chand->server->mu); gpr_mu_unlock(&chand->server->mu);
server_unref(chand->server); server_unref(chand->server);
@ -513,7 +519,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
grpc_channel_get_channel_stack(channel), transport); grpc_channel_get_channel_stack(channel), transport);
} }
void grpc_server_shutdown(grpc_server *server) { void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
void *shutdown_tag) {
listener *l; listener *l;
void **tags; void **tags;
size_t ntags; size_t ntags;
@ -551,6 +558,14 @@ void grpc_server_shutdown(grpc_server *server) {
server->ntags = 0; server->ntags = 0;
server->shutdown = 1; server->shutdown = 1;
server->have_shutdown_tag = have_shutdown_tag;
server->shutdown_tag = shutdown_tag;
if (have_shutdown_tag) {
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN);
if (server->lists[ALL_CALLS] == NULL) {
grpc_cq_end_server_shutdown(server->cq, shutdown_tag);
}
}
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu);
for (i = 0; i < nchannels; i++) { for (i = 0; i < nchannels; i++) {
@ -583,6 +598,14 @@ void grpc_server_shutdown(grpc_server *server) {
} }
} }
void grpc_server_shutdown(grpc_server *server) {
shutdown_internal(server, 0, NULL);
}
void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
shutdown_internal(server, 1, tag);
}
void grpc_server_destroy(grpc_server *server) { void grpc_server_destroy(grpc_server *server) {
channel_data *c; channel_data *c;
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu);

@ -223,6 +223,8 @@ static void verify_matches(expectation *e, grpc_event *ev) {
GPR_ASSERT(ev->data.read == NULL); GPR_ASSERT(ev->data.read == NULL);
} }
break; break;
case GRPC_SERVER_SHUTDOWN:
break;
case GRPC_COMPLETION_DO_NOT_USE: case GRPC_COMPLETION_DO_NOT_USE:
gpr_log(GPR_ERROR, "not implemented"); gpr_log(GPR_ERROR, "not implemented");
abort(); abort();
@ -295,6 +297,8 @@ static size_t expectation_to_string(char *out, expectation *e) {
len = sprintf(out, "GRPC_READ data=%s", str); len = sprintf(out, "GRPC_READ data=%s", str);
gpr_free(str); gpr_free(str);
return len; return len;
case GRPC_SERVER_SHUTDOWN:
return sprintf(out, "GRPC_SERVER_SHUTDOWN");
case GRPC_COMPLETION_DO_NOT_USE: case GRPC_COMPLETION_DO_NOT_USE:
case GRPC_QUEUE_SHUTDOWN: case GRPC_QUEUE_SHUTDOWN:
gpr_log(GPR_ERROR, "not implemented"); gpr_log(GPR_ERROR, "not implemented");
@ -487,3 +491,7 @@ void cq_expect_finished(cq_verifier *v, void *tag, ...) {
finished_internal(v, tag, GRPC_STATUS__DO_NOT_USE, NULL, args); finished_internal(v, tag, GRPC_STATUS__DO_NOT_USE, NULL, args);
va_end(args); va_end(args);
} }
void cq_expect_server_shutdown(cq_verifier *v, void *tag) {
add(v, GRPC_SERVER_SHUTDOWN, tag);
}

@ -70,5 +70,6 @@ void cq_expect_finished_with_status(cq_verifier *v, void *tag,
grpc_status_code status_code, grpc_status_code status_code,
const char *details, ...); const char *details, ...);
void cq_expect_finished(cq_verifier *v, void *tag, ...); void cq_expect_finished(cq_verifier *v, void *tag, ...);
void cq_expect_server_shutdown(cq_verifier *v, void *tag);
#endif /* __GRPC_TEST_END2END_CQ_VERIFIER_H__ */ #endif /* __GRPC_TEST_END2END_CQ_VERIFIER_H__ */

@ -25,6 +25,7 @@ END2END_TESTS = [
'disappearing_server', 'disappearing_server',
'early_server_shutdown_finishes_inflight_calls', 'early_server_shutdown_finishes_inflight_calls',
'early_server_shutdown_finishes_tags', 'early_server_shutdown_finishes_tags',
'graceful_server_shutdown',
'invoke_large_request', 'invoke_large_request',
'max_concurrent_streams', 'max_concurrent_streams',
'no_op', 'no_op',

@ -0,0 +1,160 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "test/core/end2end/end2end_tests.h"
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "test/core/end2end/cq_verifier.h"
enum { TIMEOUT = 200000 };
static void *tag(gpr_intptr t) { return (void *)t; }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char *test_name,
grpc_channel_args *client_args,
grpc_channel_args *server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_client(&f, client_args);
config.init_server(&f, server_args);
return f;
}
static gpr_timespec n_seconds_time(int n) {
return gpr_time_add(gpr_now(), gpr_time_from_micros(GPR_US_PER_SEC * n));
}
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_destroy(f->server);
f->server = NULL;
}
static void shutdown_client(grpc_end2end_test_fixture *f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = NULL;
}
static void end_test(grpc_end2end_test_fixture *f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->server_cq);
drain_cq(f->server_cq);
grpc_completion_queue_destroy(f->server_cq);
grpc_completion_queue_shutdown(f->client_cq);
drain_cq(f->client_cq);
grpc_completion_queue_destroy(f->client_cq);
}
static void test_early_server_shutdown_finishes_inflight_calls(
grpc_end2end_test_config config) {
grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL);
grpc_call *c;
grpc_call *s;
gpr_timespec deadline = five_seconds_time();
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com",
deadline, NULL);
cq_verify(v_server);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_server_accept(s, f.server_cq, tag(102)));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_server_end_initial_metadata(s, 0));
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_verify(v_client);
/* shutdown the server */
grpc_server_shutdown_and_notify(f.server, tag(0xdead));
cq_verify_empty(v_server);
grpc_call_start_write_status(s, GRPC_STATUS_OK, NULL, tag(103));
grpc_call_destroy(s);
cq_expect_finish_accepted(v_server, tag(103), GRPC_OP_OK);
cq_expect_finished(v_server, tag(102), NULL);
cq_expect_server_shutdown(v_server, tag(0xdead));
cq_verify(v_server);
cq_expect_finished_with_status(v_client, tag(3), GRPC_OP_OK, NULL, NULL);
cq_verify(v_client);
grpc_call_destroy(c);
cq_verifier_destroy(v_client);
cq_verifier_destroy(v_server);
end_test(&f);
config.tear_down_data(&f);
}
void grpc_end2end_tests(grpc_end2end_test_config config) {
test_early_server_shutdown_finishes_inflight_calls(config);
}
Loading…
Cancel
Save