Merge branch 'you-complete-me' of github.com:ctiller/grpc into you-complete-me

pull/1731/head
Craig Tiller 10 years ago
commit d9ba0a5cf6
  1. 2
      Makefile
  2. 2
      build.json
  3. 1
      include/grpc++/server.h
  4. 3
      include/grpc/grpc.h
  5. 4
      src/core/support/sync.c
  6. 1
      src/core/surface/completion_queue.c
  7. 36
      src/core/surface/server.c
  8. 13
      src/cpp/server/server.cc
  9. 4
      test/core/bad_client/bad_client.c
  10. 2
      test/core/end2end/dualstack_socket_test.c
  11. 2
      test/core/end2end/tests/bad_hostname.c
  12. 2
      test/core/end2end/tests/cancel_after_accept.c
  13. 2
      test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
  14. 2
      test/core/end2end/tests/cancel_after_invoke.c
  15. 2
      test/core/end2end/tests/cancel_before_invoke.c
  16. 2
      test/core/end2end/tests/cancel_in_a_vacuum.c
  17. 2
      test/core/end2end/tests/census_simple_request.c
  18. 2
      test/core/end2end/tests/disappearing_server.c
  19. 2
      test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
  20. 2
      test/core/end2end/tests/early_server_shutdown_finishes_tags.c
  21. 2
      test/core/end2end/tests/empty_batch.c
  22. 2
      test/core/end2end/tests/graceful_server_shutdown.c
  23. 2
      test/core/end2end/tests/invoke_large_request.c
  24. 2
      test/core/end2end/tests/max_concurrent_streams.c
  25. 2
      test/core/end2end/tests/max_message_length.c
  26. 2
      test/core/end2end/tests/no_op.c
  27. 2
      test/core/end2end/tests/ping_pong_streaming.c
  28. 2
      test/core/end2end/tests/registered_call.c
  29. 2
      test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
  30. 2
      test/core/end2end/tests/request_response_with_metadata_and_payload.c
  31. 2
      test/core/end2end/tests/request_response_with_payload.c
  32. 2
      test/core/end2end/tests/request_response_with_payload_and_call_creds.c
  33. 2
      test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
  34. 2
      test/core/end2end/tests/request_with_large_metadata.c
  35. 2
      test/core/end2end/tests/request_with_payload.c
  36. 2
      test/core/end2end/tests/server_finishes_request.c
  37. 2
      test/core/end2end/tests/simple_delayed_request.c
  38. 2
      test/core/end2end/tests/simple_request.c
  39. 2
      test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c
  40. 2
      test/core/fling/server.c

@ -308,7 +308,7 @@ E = @echo
Q = @ Q = @
endif endif
VERSION = 0.9.0.0 VERSION = 0.10.0.0
CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES)) CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES))
CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS) CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS)

@ -6,7 +6,7 @@
"#": "The public version number of the library.", "#": "The public version number of the library.",
"version": { "version": {
"major": 0, "major": 0,
"minor": 9, "minor": 10,
"micro": 0, "micro": 0,
"build": 0 "build": 0
} }

@ -77,6 +77,7 @@ class Server GRPC_FINAL : public GrpcLibrary,
class SyncRequest; class SyncRequest;
class AsyncRequest; class AsyncRequest;
class ShutdownRequest;
// ServerBuilder use only // ServerBuilder use only
Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,

@ -509,7 +509,8 @@ void grpc_server_start(grpc_server *server);
Send a GRPC_OP_COMPLETE event when there are no more calls being serviced. Send a GRPC_OP_COMPLETE event when there are no more calls being serviced.
Shutdown is idempotent, and all tags will be notified at once if multiple Shutdown is idempotent, and all tags will be notified at once if multiple
grpc_server_shutdown_and_notify calls are made. */ grpc_server_shutdown_and_notify calls are made. */
void grpc_server_shutdown_and_notify(grpc_server *server, void *tag); void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag);
/* Cancel all in-progress calls. /* Cancel all in-progress calls.
Only usable after shutdown. */ Only usable after shutdown. */

@ -118,7 +118,9 @@ void gpr_refn(gpr_refcount *r, int n) {
} }
int gpr_unref(gpr_refcount *r) { int gpr_unref(gpr_refcount *r) {
return gpr_atm_full_fetch_add(&r->count, -1) == 1; gpr_atm prior = gpr_atm_full_fetch_add(&r->count, -1);
GPR_ASSERT(prior > 0);
return prior == 1;
} }
void gpr_stats_init(gpr_stats_counter *c, gpr_intptr n) { void gpr_stats_init(gpr_stats_counter *c, gpr_intptr n) {

@ -150,7 +150,6 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
int success) { int success) {
event *ev; event *ev;
int shutdown = 0; int shutdown = 0;
gpr_log(GPR_DEBUG, "end_op:%p", tag);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call); ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
ev->base.success = success; ev->base.success = success;

@ -124,6 +124,11 @@ struct channel_data {
gpr_uint32 registered_method_max_probes; gpr_uint32 registered_method_max_probes;
}; };
typedef struct shutdown_tag {
void *tag;
grpc_completion_queue *cq;
} shutdown_tag;
struct grpc_server { struct grpc_server {
size_t channel_filter_count; size_t channel_filter_count;
const grpc_channel_filter **channel_filters; const grpc_channel_filter **channel_filters;
@ -139,8 +144,9 @@ struct grpc_server {
requested_call_array requested_calls; requested_call_array requested_calls;
gpr_uint8 shutdown; gpr_uint8 shutdown;
gpr_uint8 shutdown_published;
size_t num_shutdown_tags; size_t num_shutdown_tags;
void **shutdown_tags; shutdown_tag *shutdown_tags;
call_data *lists[CALL_LIST_COUNT]; call_data *lists[CALL_LIST_COUNT];
channel_data root_channel_data; channel_data root_channel_data;
@ -383,13 +389,12 @@ static int num_listeners(grpc_server *server) {
} }
static void maybe_finish_shutdown(grpc_server *server) { static void maybe_finish_shutdown(grpc_server *server) {
size_t i, j; size_t i;
if (server->shutdown && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) { if (server->shutdown && !server->shutdown_published && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) {
server->shutdown_published = 1;
for (i = 0; i < server->num_shutdown_tags; i++) { for (i = 0; i < server->num_shutdown_tags; i++) {
for (j = 0; j < server->cq_count; j++) { grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i], NULL, 1);
NULL, 1);
}
} }
} }
} }
@ -453,8 +458,9 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED; calld->state = ZOMBIED;
grpc_iomgr_add_callback(kill_zombie, elem); grpc_iomgr_add_callback(kill_zombie, elem);
} }
call_list_remove(calld, ALL_CALLS); if (call_list_remove(calld, ALL_CALLS)) {
maybe_finish_shutdown(chand->server); maybe_finish_shutdown(chand->server);
}
gpr_mu_unlock(&chand->server->mu); gpr_mu_unlock(&chand->server->mu);
break; break;
} }
@ -804,7 +810,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
return result; return result;
} }
void grpc_server_shutdown_and_notify(grpc_server *server, void *shutdown_tag) { void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l; listener *l;
requested_call_array requested_calls; requested_call_array requested_calls;
channel_data **channels; channel_data **channels;
@ -814,16 +821,17 @@ void grpc_server_shutdown_and_notify(grpc_server *server, void *shutdown_tag) {
grpc_channel_op op; grpc_channel_op op;
grpc_channel_element *elem; grpc_channel_element *elem;
registered_method *rm; registered_method *rm;
shutdown_tag *sdt;
/* lock, and gather up some stuff to do */ /* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu);
for (i = 0; i < server->cq_count; i++) { grpc_cq_begin_op(cq, NULL);
grpc_cq_begin_op(server->cqs[i], NULL);
}
server->shutdown_tags = server->shutdown_tags =
gpr_realloc(server->shutdown_tags, gpr_realloc(server->shutdown_tags,
sizeof(void *) * (server->num_shutdown_tags + 1)); sizeof(void *) * (server->num_shutdown_tags + 1));
server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag; sdt = &server->shutdown_tags[server->num_shutdown_tags++];
sdt->tag = tag;
sdt->cq = cq;
if (server->shutdown) { if (server->shutdown) {
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu);
return; return;

@ -52,6 +52,14 @@
namespace grpc { namespace grpc {
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
delete this;
return false;
}
};
class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
public: public:
SyncRequest(RpcServiceMethod* method, void* tag) SyncRequest(RpcServiceMethod* method, void* tag)
@ -213,6 +221,9 @@ Server::~Server() {
Shutdown(); Shutdown();
} }
} }
void* got_tag;
bool ok;
GPR_ASSERT(!cq_.Next(&got_tag, &ok));
grpc_server_destroy(server_); grpc_server_destroy(server_);
if (thread_pool_owned_) { if (thread_pool_owned_) {
delete thread_pool_; delete thread_pool_;
@ -286,7 +297,7 @@ void Server::Shutdown() {
grpc::unique_lock<grpc::mutex> lock(mu_); grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) { if (started_ && !shutdown_) {
shutdown_ = true; shutdown_ = true;
grpc_server_shutdown(server_); grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown(); cq_.Shutdown();
// Wait for running callbacks to finish. // Wait for running callbacks to finish.

@ -129,6 +129,10 @@ void grpc_run_bad_client_test(const char *name, const char *client_payload,
/* Shutdown */ /* Shutdown */
grpc_endpoint_destroy(sfd.client); grpc_endpoint_destroy(sfd.client);
grpc_server_shutdown_and_notify(a.server, a.cq, NULL);
GPR_ASSERT(grpc_completion_queue_pluck(a.cq, NULL,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1))
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(a.server); grpc_server_destroy(a.server);
grpc_completion_queue_destroy(a.cq); grpc_completion_queue_destroy(a.cq);

@ -206,7 +206,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
grpc_completion_queue_destroy(client_cq); grpc_completion_queue_destroy(client_cq);
/* Destroy server. */ /* Destroy server. */
grpc_server_shutdown_and_notify(server, tag(1000)); grpc_server_shutdown_and_notify(server, server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(server); grpc_server_destroy(server);
grpc_completion_queue_shutdown(server_cq); grpc_completion_queue_shutdown(server_cq);

@ -76,7 +76,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -75,7 +75,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -75,7 +75,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -76,7 +76,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -75,7 +75,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -64,7 +64,7 @@ static void *tag(gpr_intptr t) { return (void *)t; }
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -136,7 +136,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
/* should be able to shut down the server early /* should be able to shut down the server early
- and still complete the request */ - and still complete the request */
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
op = ops; op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA; op->op = GRPC_OP_SEND_INITIAL_METADATA;

@ -149,7 +149,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
/* shutdown and destroy the server */ /* shutdown and destroy the server */
grpc_server_shutdown_and_notify(f.server, tag(1000)); grpc_server_shutdown_and_notify(f.server, f.server_cq, tag(1000));
grpc_server_cancel_all_calls(f.server); grpc_server_cancel_all_calls(f.server);
cq_expect_completion(v_server, tag(102), 1); cq_expect_completion(v_server, tag(102), 1);

@ -106,7 +106,7 @@ static void test_early_server_shutdown_finishes_tags(
grpc_server_request_call(f.server, &s, &call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.server_cq, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
grpc_server_shutdown_and_notify(f.server, tag(1000)); grpc_server_shutdown_and_notify(f.server, f.server_cq, tag(1000));
cq_expect_completion(v_server, tag(101), 0); cq_expect_completion(v_server, tag(101), 0);
cq_expect_completion(v_server, tag(1000), 1); cq_expect_completion(v_server, tag(1000), 1);
cq_verify(v_server); cq_verify(v_server);

@ -76,7 +76,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -150,7 +150,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
cq_verify(v_server); cq_verify(v_server);
/* shutdown and destroy the server */ /* shutdown and destroy the server */
grpc_server_shutdown_and_notify(f.server, tag(0xdead)); grpc_server_shutdown_and_notify(f.server, f.server_cq, tag(0xdead));
cq_verify_empty(v_server); cq_verify_empty(v_server);
op = ops; op = ops;

@ -72,7 +72,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -76,7 +76,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -88,7 +88,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -76,7 +76,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -62,7 +62,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -76,7 +76,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -76,7 +76,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) { static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return; if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000)); grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
f->server = NULL; f->server = NULL;

@ -233,7 +233,7 @@ int main(int argc, char **argv) {
while (!shutdown_finished) { while (!shutdown_finished) {
if (got_sigint && !shutdown_started) { if (got_sigint && !shutdown_started) {
gpr_log(GPR_INFO, "Shutting down due to SIGINT"); gpr_log(GPR_INFO, "Shutting down due to SIGINT");
grpc_server_shutdown_and_notify(server, tag(1000)); grpc_server_shutdown_and_notify(server, cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); GPR_ASSERT(grpc_completion_queue_pluck(cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_completion_queue_shutdown(cq); grpc_completion_queue_shutdown(cq);
shutdown_started = 1; shutdown_started = 1;

Loading…
Cancel
Save