From 6deadf5b8881d17e0960a46e5f995e0250e715d3 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 28 Nov 2016 18:02:37 -0800 Subject: [PATCH 1/5] Propagate termination signals to subprocesses --- test/cpp/qps/json_run_localhost.cc | 36 +++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc index 74e40fbf1a9..70c50cdc71b 100644 --- a/test/cpp/qps/json_run_localhost.cc +++ b/test/cpp/qps/json_run_localhost.cc @@ -31,7 +31,11 @@ * */ +#include +#include + #include +#include #include #include @@ -42,6 +46,9 @@ #include "test/cpp/util/subprocess.h" using grpc::SubProcess; +typedef std::unique_ptr SubProcessPtr; +std::vector g_workers; +SubProcessPtr g_driver; template std::string as_string(const T& val) { @@ -50,9 +57,24 @@ std::string as_string(const T& val) { return out.str(); } +static void sighandler(int sig) { + g_driver->Interrupt(); + for (auto it = g_workers.begin(); it != g_workers.end(); ++it) { + (*it)->Interrupt(); + } +} + +static void register_sighandler() { + struct sigaction act; + memset(&act, 0, sizeof(act)); + act.sa_handler = sighandler; + + sigaction(SIGINT, &act, NULL); + sigaction(SIGTERM, &act, NULL); +} + int main(int argc, char** argv) { - typedef std::unique_ptr SubProcessPtr; - std::vector jobs; + register_sighandler(); std::string my_bin = argv[0]; std::string bin_dir = my_bin.substr(0, my_bin.rfind('/')); @@ -64,7 +86,7 @@ int main(int argc, char** argv) { auto port = grpc_pick_unused_port_or_die(); std::vector args = {bin_dir + "/qps_worker", "-driver_port", as_string(port)}; - jobs.emplace_back(new SubProcess(args)); + g_workers.emplace_back(new SubProcess(args)); if (!first) env << ","; env << "localhost:" << port; first = false; @@ -75,12 +97,14 @@ int main(int argc, char** argv) { for (int i = 1; i < argc; i++) { args.push_back(argv[i]); } - GPR_ASSERT(SubProcess(args).Join() == 0); - for (auto it = jobs.begin(); it != jobs.end(); ++it) { + g_driver.reset(new SubProcess(args)); + const int driver_join_status = g_driver->Join(); + for (auto it = g_workers.begin(); it != g_workers.end(); ++it) { (*it)->Interrupt(); } - for (auto it = jobs.begin(); it != jobs.end(); ++it) { + for (auto it = g_workers.begin(); it != g_workers.end(); ++it) { (*it)->Join(); } + GPR_ASSERT(driver_join_status == 0); } From a05909faf2394433e841a5eb4399e89afd05c26a Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 28 Nov 2016 18:02:52 -0800 Subject: [PATCH 2/5] Added pid logging to subprocess --- src/core/lib/support/subprocess_posix.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/lib/support/subprocess_posix.c b/src/core/lib/support/subprocess_posix.c index 4f4de9298e9..074769879be 100644 --- a/src/core/lib/support/subprocess_posix.c +++ b/src/core/lib/support/subprocess_posix.c @@ -97,7 +97,8 @@ retry: if (errno == EINTR) { goto retry; } - gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno)); + gpr_log(GPR_ERROR, "waitpid failed for pid %d: %s", p->pid, + strerror(errno)); return -1; } return status; From d6936b6bee465f903a5788dd2c76bbd49f8cb48b Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 29 Nov 2016 17:22:48 -0800 Subject: [PATCH 3/5] PR comments --- test/cpp/qps/json_run_localhost.cc | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc index 70c50cdc71b..59a5a89ca34 100644 --- a/test/cpp/qps/json_run_localhost.cc +++ b/test/cpp/qps/json_run_localhost.cc @@ -47,8 +47,9 @@ using grpc::SubProcess; typedef std::unique_ptr SubProcessPtr; -std::vector g_workers; SubProcessPtr g_driver; +constexpr auto kNumWorkers = 2; +std::vector g_workers(2); template std::string as_string(const T& val) { @@ -58,10 +59,11 @@ std::string as_string(const T& val) { } static void sighandler(int sig) { + const int errno_saved = errno; g_driver->Interrupt(); - for (auto it = g_workers.begin(); it != g_workers.end(); ++it) { - (*it)->Interrupt(); - } + for (const auto& worker : g_workers) + if (worker) worker->Interrupt(); + errno = errno_saved; } static void register_sighandler() { @@ -82,11 +84,11 @@ int main(int argc, char** argv) { std::ostringstream env; bool first = true; - for (int i = 0; i < 2; i++) { - auto port = grpc_pick_unused_port_or_die(); + for (int i = 0; i < kNumWorkers; i++) { + const auto port = grpc_pick_unused_port_or_die(); std::vector args = {bin_dir + "/qps_worker", "-driver_port", as_string(port)}; - g_workers.emplace_back(new SubProcess(args)); + g_workers[i].reset(new SubProcess(args)); if (!first) env << ","; env << "localhost:" << port; first = false; @@ -100,11 +102,9 @@ int main(int argc, char** argv) { g_driver.reset(new SubProcess(args)); const int driver_join_status = g_driver->Join(); - for (auto it = g_workers.begin(); it != g_workers.end(); ++it) { - (*it)->Interrupt(); - } - for (auto it = g_workers.begin(); it != g_workers.end(); ++it) { - (*it)->Join(); - } + for (const auto& worker : g_workers) + if (worker) worker->Interrupt(); + for (const auto& worker : g_workers) + if (worker) worker->Join(); GPR_ASSERT(driver_join_status == 0); } From 523a4aa0a06de21ffe23582b373d18488f78c8e2 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 30 Nov 2016 10:50:39 -0800 Subject: [PATCH 4/5] PR comments --- test/cpp/qps/json_run_localhost.cc | 31 ++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc index b6e96601d19..409abed12e8 100644 --- a/test/cpp/qps/json_run_localhost.cc +++ b/test/cpp/qps/json_run_localhost.cc @@ -46,10 +46,11 @@ #include "test/cpp/util/subprocess.h" using grpc::SubProcess; -typedef std::unique_ptr SubProcessPtr; -SubProcessPtr g_driver; + constexpr auto kNumWorkers = 2; -std::vector g_workers(2); + +static SubProcess* g_driver; +static SubProcess* g_workers[kNumWorkers]; template std::string as_string(const T& val) { @@ -61,8 +62,9 @@ std::string as_string(const T& val) { static void sighandler(int sig) { const int errno_saved = errno; g_driver->Interrupt(); - for (const auto& worker : g_workers) - if (worker) worker->Interrupt(); + for (int i = 0; i < kNumWorkers; ++i) { + if (g_workers[i]) g_workers[i]->Interrupt(); + } errno = errno_saved; } @@ -100,7 +102,7 @@ int main(int argc, char** argv) { const auto port = grpc_pick_unused_port_or_die(); std::vector args = {bin_dir + "/qps_worker", "-driver_port", as_string(port)}; - g_workers[i].reset(new SubProcess(args)); + g_workers[i] = new SubProcess(args); if (!first) env << ","; env << "localhost:" << port; first = false; @@ -112,20 +114,25 @@ int main(int argc, char** argv) { args.push_back(argv[i]); } - g_driver.reset(new SubProcess(args)); + g_driver = new SubProcess(args); const int driver_join_status = g_driver->Join(); if (driver_join_status != 0) { LogStatus(driver_join_status, "driver"); } - for (const auto& worker : g_workers) { - if (worker) worker->Interrupt(); + for (int i = 0; i < kNumWorkers; ++i) { + if (g_workers[i]) g_workers[i]->Interrupt(); } - for (const auto& worker : g_workers) { - if (worker) { - const int worker_status = worker->Join(); + + for (int i = 0; i < kNumWorkers; ++i) { + if (g_workers[i]) { + const int worker_status = g_workers[i]->Join(); if (worker_status != 0) { LogStatus(worker_status, "worker"); } } } + + delete g_driver; + for (int i = 0; i < kNumWorkers; ++i) delete g_workers[i]; + GPR_ASSERT(driver_join_status == 0); } From 3c5a868a1bbd0bf08d3c6a4213f7807f3cfdb893 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 30 Nov 2016 11:26:58 -0800 Subject: [PATCH 5/5] PR comments #2 --- test/cpp/qps/json_run_localhost.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc index 409abed12e8..b7b2553f12d 100644 --- a/test/cpp/qps/json_run_localhost.cc +++ b/test/cpp/qps/json_run_localhost.cc @@ -61,7 +61,7 @@ std::string as_string(const T& val) { static void sighandler(int sig) { const int errno_saved = errno; - g_driver->Interrupt(); + if (g_driver != NULL) g_driver->Interrupt(); for (int i = 0; i < kNumWorkers; ++i) { if (g_workers[i]) g_workers[i]->Interrupt(); } @@ -133,6 +133,7 @@ int main(int argc, char** argv) { } delete g_driver; + g_driver = NULL; for (int i = 0; i < kNumWorkers; ++i) delete g_workers[i]; GPR_ASSERT(driver_join_status == 0); }