Fixes and code simplification

pull/6612/head
Craig Tiller 9 years ago
parent d88e15cee7
commit ae09d9dca9
  1. 40
      src/core/lib/iomgr/ev_poll_posix.c
  2. 9
      src/core/lib/surface/server.c
  3. 21
      test/cpp/end2end/hybrid_end2end_test.cc

@ -183,7 +183,6 @@ struct grpc_pollset_worker {
struct grpc_pollset {
gpr_mu mu;
grpc_pollset_worker root_worker;
int in_flight_cbs;
int shutting_down;
int called_shutdown;
int kicked_without_pollers;
@ -193,10 +192,6 @@ struct grpc_pollset {
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
/* fds that have been removed from the pollset explicitly */
size_t del_count;
size_t del_capacity;
grpc_fd **dels;
/* Local cache of eventfds for workers */
grpc_cached_wakeup_fd *local_wakeup_cache;
};
@ -728,7 +723,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->mu);
*mu = &pollset->mu;
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
pollset->kicked_without_pollers = 0;
@ -737,14 +731,10 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->kicked_without_pollers = 0;
pollset->fd_count = 0;
pollset->fd_capacity = 0;
pollset->del_count = 0;
pollset->del_capacity = 0;
pollset->fds = NULL;
pollset->dels = NULL;
}
static void pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(pollset->in_flight_cbs == 0);
GPR_ASSERT(!pollset_has_workers(pollset));
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
while (pollset->local_wakeup_cache) {
@ -754,17 +744,14 @@ static void pollset_destroy(grpc_pollset *pollset) {
pollset->local_wakeup_cache = next;
}
gpr_free(pollset->fds);
gpr_free(pollset->dels);
gpr_mu_destroy(&pollset->mu);
}
static void pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(pollset->in_flight_cbs == 0);
GPR_ASSERT(!pollset_has_workers(pollset));
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
GPR_ASSERT(pollset->fd_count == 0);
GPR_ASSERT(pollset->del_count == 0);
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
pollset->kicked_without_pollers = 0;
@ -797,11 +784,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
for (i = 0; i < pollset->fd_count; i++) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
}
for (i = 0; i < pollset->del_count; i++) {
GRPC_FD_UNREF(pollset->dels[i], "multipoller_del");
}
pollset->fd_count = 0;
pollset->del_count = 0;
grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
}
@ -841,13 +824,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_MARK("pollset_work.shutting_down", 0);
goto done;
}
/* Give do_promote priority so we don't starve it out */
if (pollset->in_flight_cbs) {
GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0);
gpr_mu_unlock(&pollset->mu);
locked = 0;
goto done;
}
/* Start polling, and keep doing so while we're being asked to
re-evaluate our pollers (this allows poll() based pollers to
ensure they don't miss wakeups) */
@ -867,7 +843,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int timeout;
int r;
size_t i, j, fd_count;
size_t i, fd_count;
nfds_t pfd_count;
/* TODO(ctiller): inline some elements to avoid an allocation */
grpc_fd_watcher *watchers;
@ -887,11 +863,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pfds[1].events = POLLIN;
pfds[1].revents = 0;
for (i = 0; i < pollset->fd_count; i++) {
int remove = fd_is_orphaned(pollset->fds[i]);
for (j = 0; !remove && j < pollset->del_count; j++) {
if (pollset->fds[i] == pollset->dels[j]) remove = 1;
}
if (remove) {
if (fd_is_orphaned(pollset->fds[i])) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
} else {
pollset->fds[fd_count++] = pollset->fds[i];
@ -902,10 +874,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pfd_count++;
}
}
for (j = 0; j < pollset->del_count; j++) {
GRPC_FD_UNREF(pollset->dels[j], "multipoller_del");
}
pollset->del_count = 0;
pollset->fd_count = fd_count;
gpr_mu_unlock(&pollset->mu);
@ -997,7 +965,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (pollset->shutting_down) {
if (pollset_has_workers(pollset)) {
pollset_kick(pollset, NULL);
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
} else if (!pollset->called_shutdown) {
pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
finish_shutdown(exec_ctx, pollset);
@ -1027,7 +995,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (!pollset_has_workers(pollset)) {
grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
}
if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
if (!pollset->called_shutdown &&
!pollset_has_workers(pollset)) {
pollset->called_shutdown = 1;
finish_shutdown(exec_ctx, pollset);

@ -527,6 +527,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
if (request_id == -1) {
continue;
} else {
gpr_log(GPR_DEBUG, "queue lockfree, retries=%d chose=%d", i, cq_idx);
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
@ -537,6 +539,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
}
/* no cq to take the request found: queue it on the slow list */
gpr_log(GPR_DEBUG, "queue slowpath");
gpr_mu_lock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
calld->state = PENDING;
@ -1298,12 +1301,14 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
server->requested_calls[request_id] = *rc;
gpr_free(rc);
if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
gpr_log(GPR_DEBUG, "request against empty");
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != NULL) {
request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
if (request_id == -1) break;
gpr_log(GPR_DEBUG, "drain1");
rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
@ -1324,6 +1329,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&server->mu_call);
}
gpr_mu_unlock(&server->mu_call);
} else {
gpr_log(GPR_DEBUG, "request lockfree");
}
return GRPC_CALL_OK;
}
@ -1377,6 +1384,7 @@ grpc_call_error grpc_server_request_registered_call(
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
requested_call *rc = gpr_malloc(sizeof(*rc));
registered_method *rm = rmp;
gpr_log(GPR_DEBUG, "method: %s", rm->method);
GRPC_API_TRACE(
"grpc_server_request_registered_call("
"server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
@ -1391,6 +1399,7 @@ grpc_call_error grpc_server_request_registered_call(
break;
}
}
gpr_log(GPR_DEBUG, "cq_idx=%d, cq_count=%d", cq_idx, server->cq_count);
if (cq_idx == server->cq_count) {
gpr_free(rc);
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;

@ -217,7 +217,7 @@ class HybridEnd2endTest : public ::testing::Test {
}
// Create a separate cq for each potential handler.
for (int i = 0; i < 5; i++) {
cqs_.push_back(builder.AddCompletionQueue(i < num_cqs_frequently_polled));
cqs_.push_back(builder.AddCompletionQueue(i == num_cqs_frequently_polled - 1));
}
server_ = builder.BuildAndStart();
}
@ -253,6 +253,7 @@ class HybridEnd2endTest : public ::testing::Test {
EchoRequest send_request;
EchoResponse recv_response;
ClientContext cli_ctx;
cli_ctx.set_fail_fast(false);
send_request.set_message("Hello");
Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
EXPECT_EQ(send_request.message(), recv_response.message());
@ -266,6 +267,7 @@ class HybridEnd2endTest : public ::testing::Test {
EchoRequest send_request;
EchoResponse recv_response;
ClientContext cli_ctx;
cli_ctx.set_fail_fast(false);
send_request.set_message("Hello");
Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
@ -277,6 +279,7 @@ class HybridEnd2endTest : public ::testing::Test {
EchoResponse recv_response;
grpc::string expected_message;
ClientContext cli_ctx;
cli_ctx.set_fail_fast(false);
send_request.set_message("Hello");
auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
for (int i = 0; i < 5; i++) {
@ -293,6 +296,7 @@ class HybridEnd2endTest : public ::testing::Test {
EchoRequest request;
EchoResponse response;
ClientContext context;
context.set_fail_fast(false);
request.set_message("hello");
auto stream = stub_->ResponseStream(&context, request);
@ -312,6 +316,7 @@ class HybridEnd2endTest : public ::testing::Test {
EchoRequest request;
EchoResponse response;
ClientContext context;
context.set_fail_fast(false);
grpc::string msg("hello");
auto stream = stub_->BidiStream(&context);
@ -505,12 +510,22 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
SetUpServer(&service, nullptr, &generic_service, 3);
ResetStub();
std::thread generic_handler_thread([this, &generic_service] {
gpr_log(GPR_DEBUG, "t0 start");
HandleGenericCall(&generic_service, cqs_[0].get());
gpr_log(GPR_DEBUG, "t0 done");
});
std::thread request_stream_handler_thread(
[this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
[this, &service] {
gpr_log(GPR_DEBUG, "t1 start");
HandleClientStreaming(&service, cqs_[1].get());
gpr_log(GPR_DEBUG, "t1 done");
});
std::thread response_stream_handler_thread(
[this, &service] { HandleServerStreaming(&service, cqs_[2].get()); });
[this, &service] {
gpr_log(GPR_DEBUG, "t2 start");
HandleServerStreaming(&service, cqs_[2].get());
gpr_log(GPR_DEBUG, "t2 done");
});
TestAllMethods();
generic_handler_thread.join();
request_stream_handler_thread.join();

Loading…
Cancel
Save