Merge branch 'you-complete-me' into we-dont-need-no-cv

pull/1888/head
Craig Tiller 10 years ago
commit ba2e755701
  1. 2
      Makefile
  2. 2
      build.json
  3. 1
      include/grpc++/server.h
  4. 18
      include/grpc/grpc.h
  5. 31
      src/core/iomgr/pollset_posix.c
  6. 5
      src/core/iomgr/resolve_address_posix.c
  7. 4
      src/core/support/sync.c
  8. 36
      src/core/surface/completion_queue.c
  9. 195
      src/core/surface/server.c
  10. 13
      src/cpp/server/server.cc
  11. 2
      src/python/src/grpc/_adapter/_c_test.py
  12. 4
      test/core/bad_client/bad_client.c
  13. 3
      test/core/end2end/dualstack_socket_test.c
  14. 3
      test/core/end2end/tests/bad_hostname.c
  15. 3
      test/core/end2end/tests/cancel_after_accept.c
  16. 3
      test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
  17. 3
      test/core/end2end/tests/cancel_after_invoke.c
  18. 3
      test/core/end2end/tests/cancel_before_invoke.c
  19. 2
      test/core/end2end/tests/cancel_in_a_vacuum.c
  20. 7
      test/core/end2end/tests/census_simple_request.c
  21. 4
      test/core/end2end/tests/disappearing_server.c
  22. 14
      test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
  23. 13
      test/core/end2end/tests/early_server_shutdown_finishes_tags.c
  24. 3
      test/core/end2end/tests/empty_batch.c
  25. 7
      test/core/end2end/tests/graceful_server_shutdown.c
  26. 3
      test/core/end2end/tests/invoke_large_request.c
  27. 3
      test/core/end2end/tests/max_concurrent_streams.c
  28. 3
      test/core/end2end/tests/max_message_length.c
  29. 5
      test/core/end2end/tests/no_op.c
  30. 3
      test/core/end2end/tests/ping_pong_streaming.c
  31. 3
      test/core/end2end/tests/registered_call.c
  32. 3
      test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
  33. 3
      test/core/end2end/tests/request_response_with_metadata_and_payload.c
  34. 3
      test/core/end2end/tests/request_response_with_payload.c
  35. 3
      test/core/end2end/tests/request_response_with_payload_and_call_creds.c
  36. 3
      test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
  37. 3
      test/core/end2end/tests/request_with_large_metadata.c
  38. 3
      test/core/end2end/tests/request_with_payload.c
  39. 3
      test/core/end2end/tests/server_finishes_request.c
  40. 3
      test/core/end2end/tests/simple_delayed_request.c
  41. 3
      test/core/end2end/tests/simple_request.c
  42. 3
      test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c
  43. 3
      test/core/fling/server.c
  44. 2
      tools/doxygen/Doxyfile.c++
  45. 2
      tools/doxygen/Doxyfile.core

@ -308,7 +308,7 @@ E = @echo
Q = @
endif
VERSION = 0.9.0.0
VERSION = 0.10.0.0
CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES))
CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS)

@ -6,7 +6,7 @@
"#": "The public version number of the library.",
"version": {
"major": 0,
"minor": 9,
"minor": 10,
"micro": 0,
"build": 0
}

@ -77,6 +77,7 @@ class Server GRPC_FINAL : public GrpcLibrary,
class SyncRequest;
class AsyncRequest;
class ShutdownRequest;
// ServerBuilder use only
Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,

@ -506,18 +506,20 @@ void grpc_server_start(grpc_server *server);
/* Begin shutting down a server.
After completion, no new calls or connections will be admitted.
Existing calls will be allowed to complete.
Shutdown is idempotent. */
void grpc_server_shutdown(grpc_server *server);
/* As per grpc_server_shutdown, but send a GRPC_OP_COMPLETE event when
there are no more calls being serviced.
Send a GRPC_OP_COMPLETE event when there are no more calls being serviced.
Shutdown is idempotent, and all tags will be notified at once if multiple
grpc_server_shutdown_and_notify calls are made. */
void grpc_server_shutdown_and_notify(grpc_server *server, void *tag);
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag);
/* Cancel all in-progress calls.
Only usable after shutdown. */
void grpc_server_cancel_all_calls(grpc_server *server);
/* Destroy a server.
Forcefully cancels all existing calls.
Implies grpc_server_shutdown() if one was not previously performed. */
Shutdown must have completed beforehand (i.e. all tags generated by
grpc_server_shutdown_and_notify must have been received, and at least
one call to grpc_server_shutdown_and_notify must have been made). */
void grpc_server_destroy(grpc_server *server);
#ifdef __cplusplus

@ -118,8 +118,6 @@ void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now();
/* FIXME(ctiller): see below */
gpr_timespec maximum_deadline = gpr_time_add(now, gpr_time_from_seconds(1));
int r;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
@ -130,14 +128,25 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
return 1;
}
/* FIXME(ctiller): we should not clamp deadline, however we have some
stuck at shutdown bugs that this resolves */
if (gpr_time_cmp(deadline, maximum_deadline) > 0) {
deadline = maximum_deadline;
if (pollset->shutting_down) {
return 1;
}
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
r = pollset->vtable->maybe_work(pollset, deadline, now, 1);
gpr_tls_set(&g_current_thread_poller, 0);
if (pollset->shutting_down) {
if (pollset->counter > 0) {
grpc_pollset_kick(pollset);
} else if (pollset->in_flight_cbs == 0) {
gpr_mu_unlock(&pollset->mu);
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
/* Continuing to access pollset here is safe -- it is the caller's
* responsibility to not destroy when it has outstanding calls to
* grpc_pollset_work.
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
gpr_mu_lock(&pollset->mu);
}
}
return r;
}
@ -145,13 +154,19 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void (*shutdown_done)(void *arg),
void *shutdown_done_arg) {
int in_flight_cbs;
int counter;
gpr_mu_lock(&pollset->mu);
pollset->shutting_down = 1;
in_flight_cbs = pollset->in_flight_cbs;
counter = pollset->counter;
pollset->shutdown_done_cb = shutdown_done;
pollset->shutdown_done_arg = shutdown_done_arg;
if (counter > 0) {
grpc_pollset_kick(pollset);
}
gpr_mu_unlock(&pollset->mu);
if (in_flight_cbs == 0) {
if (in_flight_cbs == 0 && counter == 0) {
shutdown_done(shutdown_done_arg);
}
}
@ -238,7 +253,7 @@ static void unary_poll_do_promote(void *args, int success) {
pollset->in_flight_cbs--;
if (pollset->shutting_down) {
/* We don't care about this pollset anymore. */
if (pollset->in_flight_cbs == 0) {
if (pollset->in_flight_cbs == 0 && pollset->counter == 0) {
do_shutdown_cb = 1;
}
} else if (grpc_fd_is_orphaned(fd)) {

@ -166,13 +166,14 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
gpr_thd_id id;
/*gpr_thd_id id;*/
grpc_iomgr_ref();
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;
r->arg = arg;
gpr_thd_new(&id, do_request, r, NULL);
/*gpr_thd_new(&id, do_request, r, NULL);*/
do_request(r);
}
#endif

@ -118,7 +118,9 @@ void gpr_refn(gpr_refcount *r, int n) {
}
int gpr_unref(gpr_refcount *r) {
return gpr_atm_full_fetch_add(&r->count, -1) == 1;
gpr_atm prior = gpr_atm_full_fetch_add(&r->count, -1);
GPR_ASSERT(prior > 0);
return prior == 1;
}
void gpr_stats_init(gpr_stats_counter *c, gpr_intptr n) {

@ -80,7 +80,8 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->refs, 1);
gpr_ref_init(&cc->owning_refs, 1);
/* One for destroy(), one for pollset_shutdown */
gpr_ref_init(&cc->owning_refs, 2);
grpc_pollset_init(&cc->pollset);
return cc;
}
@ -91,14 +92,14 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) {
static void on_pollset_destroy_done(void *arg) {
grpc_completion_queue *cc = arg;
grpc_pollset_destroy(&cc->pollset);
gpr_free(cc);
grpc_cq_internal_unref(cc);
}
void grpc_cq_internal_unref(grpc_completion_queue *cc) {
if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->queue == NULL);
grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
grpc_pollset_destroy(&cc->pollset);
gpr_free(cc);
}
}
@ -136,24 +137,24 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) {
/* Signal the end of an operation - if this is the last waiting-to-be-queued
event, then enter shutdown mode */
static void end_op_locked(grpc_completion_queue *cc,
grpc_completion_type type) {
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
}
}
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
int success) {
event *ev;
int shutdown = 0;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
ev->base.success = success;
end_op_locked(cc, GRPC_OP_COMPLETE);
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
shutdown = 1;
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0);
if (shutdown) {
grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
}
}
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
@ -169,6 +170,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
event *ev = NULL;
grpc_event ret;
grpc_cq_internal_ref(cc);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if (cc->queue != NULL) {
@ -200,6 +202,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
grpc_cq_internal_unref(cc);
return ret;
}
}
@ -207,6 +210,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret = ev->base;
gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
grpc_cq_internal_unref(cc);
return ret;
}
@ -244,6 +248,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
event *ev = NULL;
grpc_event ret;
grpc_cq_internal_ref(cc);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if ((ev = pluck_event(cc, tag))) {
@ -258,6 +263,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
grpc_cq_internal_unref(cc);
return ret;
}
}
@ -265,6 +271,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret = ev->base;
gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
grpc_cq_internal_unref(cc);
return ret;
}
@ -280,6 +287,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
}
}

@ -124,6 +124,11 @@ struct channel_data {
gpr_uint32 registered_method_max_probes;
};
typedef struct shutdown_tag {
void *tag;
grpc_completion_queue *cq;
} shutdown_tag;
struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
@ -134,14 +139,14 @@ struct grpc_server {
size_t cq_count;
gpr_mu mu;
gpr_cv cv;
registered_method *registered_methods;
requested_call_array requested_calls;
gpr_uint8 shutdown;
gpr_uint8 shutdown_published;
size_t num_shutdown_tags;
void **shutdown_tags;
shutdown_tag *shutdown_tags;
call_data *lists[CALL_LIST_COUNT];
channel_data root_channel_data;
@ -256,29 +261,32 @@ static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
static void server_unref(grpc_server *server) {
static void server_delete(grpc_server *server) {
registered_method *rm;
size_t i;
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu);
gpr_free(server->channel_filters);
requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
gpr_free(rm->method);
gpr_free(rm->host);
requested_call_array_destroy(&rm->requested);
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
grpc_cq_internal_unref(server->cqs[i]);
}
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
gpr_free(server);
}
static void server_unref(grpc_server *server) {
if (gpr_unref(&server->internal_refcount)) {
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu);
gpr_cv_destroy(&server->cv);
gpr_free(server->channel_filters);
requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
gpr_free(rm->method);
gpr_free(rm->host);
requested_call_array_destroy(&rm->requested);
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
grpc_cq_internal_unref(server->cqs[i]);
}
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
gpr_free(server);
server_delete(server);
}
}
@ -371,6 +379,26 @@ static void kill_zombie(void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
static int num_listeners(grpc_server *server) {
listener *l;
int n = 0;
for (l = server->listeners; l; l = l->next) {
n++;
}
return n;
}
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (server->shutdown && !server->shutdown_published && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) {
server->shutdown_published = 1;
for (i = 0; i < server->num_shutdown_tags; i++) {
grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
NULL, 1);
}
}
}
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *chand = elem->channel_data;
@ -430,6 +458,9 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED;
grpc_iomgr_add_callback(kill_zombie, elem);
}
if (call_list_remove(calld, ALL_CALLS)) {
maybe_finish_shutdown(chand->server);
}
gpr_mu_unlock(&chand->server->mu);
break;
}
@ -526,19 +557,15 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
size_t i, j;
int removed[CALL_LIST_COUNT];
size_t i;
gpr_mu_lock(&chand->server->mu);
for (i = 0; i < CALL_LIST_COUNT; i++) {
call_list_remove(elem->call_data, i);
removed[i] = call_list_remove(elem->call_data, i);
}
if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
for (i = 0; i < chand->server->num_shutdown_tags; i++) {
for (j = 0; j < chand->server->cq_count; j++) {
grpc_cq_end_op(chand->server->cqs[j], chand->server->shutdown_tags[i],
NULL, 1);
}
}
if (removed[ALL_CALLS]) {
maybe_finish_shutdown(chand->server);
}
gpr_mu_unlock(&chand->server->mu);
@ -625,7 +652,6 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
memset(server, 0, sizeof(grpc_server));
gpr_mu_init(&server->mu);
gpr_cv_init(&server->cv);
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
@ -784,38 +810,28 @@ grpc_transport_setup_result grpc_server_setup_transport(
return result;
}
static int num_listeners(grpc_server *server) {
listener *l;
int n = 0;
for (l = server->listeners; l; l = l->next) {
n++;
}
return n;
}
static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
void *shutdown_tag) {
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
requested_call_array requested_calls;
channel_data **channels;
channel_data *c;
size_t nchannels;
size_t i, j;
size_t i;
grpc_channel_op op;
grpc_channel_element *elem;
registered_method *rm;
shutdown_tag *sdt;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu);
if (have_shutdown_tag) {
for (i = 0; i < server->cq_count; i++) {
grpc_cq_begin_op(server->cqs[i], NULL);
}
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(void *) * (server->num_shutdown_tags + 1));
server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
}
grpc_cq_begin_op(cq, NULL);
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(void *) * (server->num_shutdown_tags + 1));
sdt = &server->shutdown_tags[server->num_shutdown_tags++];
sdt->tag = tag;
sdt->cq = cq;
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
return;
@ -856,13 +872,7 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
}
server->shutdown = 1;
if (server->lists[ALL_CALLS] == NULL) {
for (i = 0; i < server->num_shutdown_tags; i++) {
for (j = 0; j < server->cq_count; j++) {
grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i], NULL, 1);
}
}
}
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu);
for (i = 0; i < nchannels; i++) {
@ -892,46 +902,64 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
}
}
void grpc_server_shutdown(grpc_server *server) {
shutdown_internal(server, 0, NULL);
}
void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
shutdown_internal(server, 1, tag);
}
void grpc_server_listener_destroy_done(void *s) {
grpc_server *server = s;
gpr_mu_lock(&server->mu);
server->listeners_destroyed++;
gpr_cv_signal(&server->cv);
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu);
}
void grpc_server_destroy(grpc_server *server) {
channel_data *c;
listener *l;
size_t i;
void grpc_server_cancel_all_calls(grpc_server *server) {
call_data *calld;
grpc_call **calls;
size_t call_count;
size_t call_capacity;
int is_first = 1;
size_t i;
gpr_mu_lock(&server->mu);
if (!server->shutdown) {
GPR_ASSERT(server->shutdown);
if (!server->lists[ALL_CALLS]) {
gpr_mu_unlock(&server->mu);
grpc_server_shutdown(server);
gpr_mu_lock(&server->mu);
return;
}
while (server->listeners_destroyed != num_listeners(server)) {
for (i = 0; i < server->cq_count; i++) {
gpr_mu_unlock(&server->mu);
grpc_cq_hack_spin_pollset(server->cqs[i]);
gpr_mu_lock(&server->mu);
call_capacity = 8;
call_count = 0;
calls = gpr_malloc(sizeof(grpc_call *) * call_capacity);
for (calld = server->lists[ALL_CALLS]; calld != server->lists[ALL_CALLS] || is_first; calld = calld->links[ALL_CALLS].next) {
if (call_count == call_capacity) {
call_capacity *= 2;
calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity);
}
calls[call_count++] = calld->call;
GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all");
is_first = 0;
}
gpr_mu_unlock(&server->mu);
gpr_cv_wait(&server->cv, &server->mu,
gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
for (i = 0; i < call_count; i++) {
grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, "Unavailable");
GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1);
}
gpr_free(calls);
}
void grpc_server_destroy(grpc_server *server) {
channel_data *c;
listener *l;
call_data *calld;
gpr_mu_lock(&server->mu);
GPR_ASSERT(server->shutdown);
GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
while (server->listeners) {
l = server->listeners;
server->listeners = l->next;
@ -940,7 +968,6 @@ void grpc_server_destroy(grpc_server *server) {
while ((calld = call_list_remove_head(&server->lists[PENDING_START],
PENDING_START)) != NULL) {
gpr_log(GPR_DEBUG, "server destroys call %p", calld->call);
calld->state = ZOMBIED;
grpc_iomgr_add_callback(
kill_zombie,

@ -52,6 +52,14 @@
namespace grpc {
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
delete this;
return false;
}
};
class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
SyncRequest(RpcServiceMethod* method, void* tag)
@ -213,6 +221,9 @@ Server::~Server() {
Shutdown();
}
}
void* got_tag;
bool ok;
GPR_ASSERT(!cq_.Next(&got_tag, &ok));
grpc_server_destroy(server_);
if (thread_pool_owned_) {
delete thread_pool_;
@ -286,7 +297,7 @@ void Server::Shutdown() {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
grpc_server_shutdown(server_);
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
// Wait for running callbacks to finish.

@ -216,4 +216,4 @@ class _CTest(unittest.TestCase):
if __name__ == '__main__':
unittest.main()
unittest.main(verbosity=2)

@ -129,6 +129,10 @@ void grpc_run_bad_client_test(const char *name, const char *client_payload,
/* Shutdown */
grpc_endpoint_destroy(sfd.client);
grpc_server_shutdown_and_notify(a.server, a.cq, NULL);
GPR_ASSERT(grpc_completion_queue_pluck(a.cq, NULL,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1))
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(a.server);
grpc_completion_queue_destroy(a.cq);

@ -196,7 +196,8 @@ void test_connect(const char *server_host, const char *client_host, int port,
grpc_channel_destroy(client);
/* Destroy server. */
grpc_server_shutdown(server);
grpc_server_shutdown_and_notify(server, cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(server);
grpc_completion_queue_shutdown(cq);
drain_cq(cq);

@ -76,7 +76,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -75,7 +75,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -75,7 +75,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -76,7 +76,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -74,7 +74,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -75,7 +75,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;

@ -60,9 +60,12 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
return f;
}
static void *tag(gpr_intptr t) { return (void *)t; }
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}
@ -89,8 +92,6 @@ static void end_test(grpc_end2end_test_fixture *f) {
grpc_completion_queue_destroy(f->cq);
}
static void *tag(gpr_intptr t) { return (void *)t; }
static void test_body(grpc_end2end_test_fixture f) {
grpc_call *c;
grpc_call *s;

@ -62,7 +62,6 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_destroy(f->server);
f->server = NULL;
}
@ -133,7 +132,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
/* should be able to shut down the server early
- and still complete the request */
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
@ -151,6 +150,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
cq_expect_completion(cqv, tag(102), 1);
cq_expect_completion(cqv, tag(1), 1);
cq_expect_completion(cqv, tag(1000), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);

@ -72,13 +72,6 @@ static void drain_cq(grpc_completion_queue *cq) {
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_destroy(f->server);
f->server = NULL;
}
static void shutdown_client(grpc_end2end_test_fixture *f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
@ -86,7 +79,6 @@ static void shutdown_client(grpc_end2end_test_fixture *f) {
}
static void end_test(grpc_end2end_test_fixture *f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
@ -153,12 +145,16 @@ static void test_early_server_shutdown_finishes_inflight_calls(
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
/* shutdown and destroy the server */
shutdown_server(&f);
grpc_server_shutdown_and_notify(f.server, f.cq, tag(1000));
grpc_server_cancel_all_calls(f.server);
cq_expect_completion(cqv, tag(1000), 1);
cq_expect_completion(cqv, tag(102), 1);
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
grpc_server_destroy(f.server);
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr"));

@ -72,13 +72,6 @@ static void drain_cq(grpc_completion_queue *cq) {
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
/* don't shutdown, just destroy, to tickle this code edge */
grpc_server_destroy(f->server);
f->server = NULL;
}
static void shutdown_client(grpc_end2end_test_fixture *f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
@ -86,7 +79,6 @@ static void shutdown_client(grpc_end2end_test_fixture *f) {
}
static void end_test(grpc_end2end_test_fixture *f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
@ -111,11 +103,14 @@ static void test_early_server_shutdown_finishes_tags(
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq,
f.cq, tag(101)));
grpc_server_shutdown(f.server);
grpc_server_shutdown_and_notify(f.server, f.cq, tag(1000));
cq_expect_completion(cqv, tag(101), 0);
cq_expect_completion(cqv, tag(1000), 1);
cq_verify(cqv);
GPR_ASSERT(s == NULL);
grpc_server_destroy(f.server);
end_test(&f);
config.tear_down_data(&f);
cq_verifier_destroy(cqv);

@ -76,7 +76,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -146,7 +146,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
cq_verify(cqv);
/* shutdown and destroy the server */
grpc_server_shutdown_and_notify(f.server, tag(0xdead));
grpc_server_shutdown_and_notify(f.server, f.cq, tag(0xdead));
cq_verify_empty(cqv);
op = ops;
@ -164,13 +164,12 @@ static void test_early_server_shutdown_finishes_inflight_calls(
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(cqv, tag(102), 1);
cq_verify(cqv);
grpc_call_destroy(s);
cq_expect_completion(cqv, tag(0xdead), 1);
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
grpc_call_destroy(s);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr"));

@ -72,7 +72,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -74,7 +74,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -74,7 +74,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -45,6 +45,8 @@
enum { TIMEOUT = 200000 };
static void *tag(gpr_intptr t) { return (void *)t; }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char *test_name,
grpc_channel_args *client_args,
@ -72,7 +74,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -74,7 +74,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -76,7 +76,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -74,7 +74,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -74,7 +74,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -74,7 +74,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -88,7 +88,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -74,7 +74,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -74,7 +74,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -74,7 +74,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -76,7 +76,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -62,7 +62,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -76,7 +76,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -76,7 +76,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

@ -233,7 +233,8 @@ int main(int argc, char **argv) {
while (!shutdown_finished) {
if (got_sigint && !shutdown_started) {
gpr_log(GPR_INFO, "Shutting down due to SIGINT");
grpc_server_shutdown(server);
grpc_server_shutdown_and_notify(server, cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_completion_queue_shutdown(cq);
shutdown_started = 1;
}

@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC C++"
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 0.9.0.0
PROJECT_NUMBER = 0.10.0.0
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core"
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 0.9.0.0
PROJECT_NUMBER = 0.10.0.0
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

Loading…
Cancel
Save