Add thread naming support on platforms that support it.

As a client of grpc I want to be aware of which threads are being
created by grpc, and giving them recognizable names makes it significantly
easier to diagnose what is going on in my programs.
This provides thread names for macOS and Linux. Adding support for other
platforms should be easy for platform specialists.
pull/13559/head
Dave MacLachlan 7 years ago
parent c99a3ca415
commit af5c54de9c
  1. 3
      include/grpc/impl/codegen/port_platform.h
  2. 5
      include/grpc/support/thd.h
  3. 2
      src/core/lib/iomgr/ev_poll_posix.cc
  4. 8
      src/core/lib/iomgr/executor.cc
  5. 2
      src/core/lib/iomgr/timer_manager.cc
  6. 19
      src/core/lib/support/thd_posix.cc
  7. 3
      src/core/lib/support/thd_windows.cc
  8. 2
      test/core/bad_client/bad_client.cc
  9. 2
      test/core/end2end/bad_server_response_test.cc
  10. 3
      test/core/end2end/fixtures/http_proxy_fixture.cc
  11. 3
      test/core/end2end/fixtures/proxy.cc
  12. 3
      test/core/end2end/tests/connectivity.cc
  13. 3
      test/core/handshake/client_ssl.cc
  14. 3
      test/core/handshake/server_ssl_common.cc
  15. 3
      test/core/iomgr/combiner_test.cc
  16. 2
      test/core/iomgr/resolve_address_posix_test.cc
  17. 10
      test/core/iomgr/wakeup_fd_cv_test.cc
  18. 3
      test/core/support/arena_test.cc
  19. 2
      test/core/support/cpu_test.cc
  20. 9
      test/core/support/mpscq_test.cc
  21. 3
      test/core/support/spinlock_test.cc
  22. 4
      test/core/support/sync_test.cc
  23. 5
      test/core/support/thd_test.cc
  24. 2
      test/core/support/tls_test.cc
  25. 5
      test/core/surface/completion_queue_threading_test.cc
  26. 16
      test/core/surface/concurrent_connectivity_test.cc
  27. 3
      test/core/surface/sequential_connectivity_test.cc

@ -166,6 +166,7 @@
#define GPR_POSIX_SYNC 1
#define GPR_POSIX_TIME 1
#define GPR_GETPID_IN_UNISTD_H 1
#define GPR_LINUX_PTHREAD_NAME 1
#ifdef _LP64
#define GPR_ARCH_64 1
#else /* _LP64 */
@ -195,6 +196,7 @@
#else /* __MAC_OS_X_VERSION_MIN_REQUIRED < __MAC_10_7 */
#define GPR_CPU_POSIX 1
#define GPR_GCC_TLS 1
#define GPR_APPLE_PTHREAD_NAME 1
#endif
#else /* __MAC_OS_X_VERSION_MIN_REQUIRED */
#define GPR_CPU_POSIX 1
@ -236,6 +238,7 @@
#define GPR_POSIX_TIME 1
#define GPR_GETPID_IN_UNISTD_H 1
#define GPR_SUPPORT_CHANNELS_FROM_FD 1
#define GPR_BSD_PTHREAD_NAME 1
#ifdef _LP64
#define GPR_ARCH_64 1
#else /* _LP64 */

@ -42,9 +42,12 @@ typedef struct {
/** Create a new thread running (*thd_body)(arg) and place its thread identifier
in *t, and return true. If there are insufficient resources, return false.
thd_name is the name of the thread for identification purposes on platforms
that support thread naming.
If options==NULL, default options are used.
The thread is immediately runnable, and exits when (*thd_body)() returns. */
GPRAPI int gpr_thd_new(gpr_thd_id* t, void (*thd_body)(void* arg), void* arg,
GPRAPI int gpr_thd_new(gpr_thd_id* t, const char* thd_name,
void (*thd_body)(void* arg), void* arg,
const gpr_thd_options* options);
/** Return a gpr_thd_options struct with all fields set to defaults. */

@ -1382,7 +1382,7 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
gpr_thd_options opt = gpr_thd_options_default();
gpr_ref(&g_cvfds.pollcount);
gpr_thd_options_set_detached(&opt);
GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt));
GPR_ASSERT(gpr_thd_new(&t_id, "gpr_poller", &run_poll, pargs, &opt));
return pargs;
}

@ -104,8 +104,8 @@ void grpc_executor_set_threading(grpc_exec_ctx* exec_ctx, bool threading) {
gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0],
&opt);
gpr_thd_new(&g_thread_state[0].id, "gpr_executor", executor_thread,
&g_thread_state[0], &opt);
} else {
if (cur_threads == 0) return;
for (size_t i = 0; i < g_max_threads; i++) {
@ -263,8 +263,8 @@ static void executor_push(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
&g_thread_state[cur_thread_count], &opt);
gpr_thd_new(&g_thread_state[cur_thread_count].id, "gpr_executor",
executor_thread, &g_thread_state[cur_thread_count], &opt);
}
gpr_spinlock_unlock(&g_adding_thread_lock);
}

@ -93,7 +93,7 @@ static void start_timer_thread_and_unlock(void) {
// to leak through g_completed_threads and be freed in gc_completed_threads()
// before "&ct->t" is written to, causing a use-after-free.
gpr_mu_lock(&g_mu);
gpr_thd_new(&ct->t, timer_thread, ct, &opt);
gpr_thd_new(&ct->t, "gpr_timer", timer_thread, ct, &opt);
gpr_mu_unlock(&g_mu);
}

@ -33,17 +33,31 @@
struct thd_arg {
void (*body)(void* arg); /* body of a thread */
void* arg; /* argument to a thread */
const char* name; /* name of thread */
};
/* Body of every thread started via gpr_thd_new. */
static void* thread_body(void* v) {
struct thd_arg a = *(struct thd_arg*)v;
free(v);
if (a.name != NULL) {
#if GPR_APPLE_PTHREAD_NAME
/* Apple supports 64 characters, and will truncate if it's longer. */
pthread_setname_np(a.name);
#elif GPR_LINUX_PTHREAD_NAME
/* Linux supports 16 characters max, and will error if it's longer. */
char buf[16];
strncpy(buf, a.name, sizeof(buf) - 1);
buf[sizeof(buf) - 1] = '\0';
pthread_setname_np(pthread_self(), buf);
#endif // GPR_APPLE_PTHREAD_NAME
}
(*a.body)(a.arg);
return nullptr;
}
int gpr_thd_new(gpr_thd_id* t, void (*thd_body)(void* arg), void* arg,
int gpr_thd_new(gpr_thd_id* t, const char* thd_name,
void (*thd_body)(void* arg), void* arg,
const gpr_thd_options* options) {
int thread_started;
pthread_attr_t attr;
@ -54,7 +68,8 @@ int gpr_thd_new(gpr_thd_id* t, void (*thd_body)(void* arg), void* arg,
GPR_ASSERT(a != nullptr);
a->body = thd_body;
a->arg = arg;
a->name = thd_name;
GPR_ASSERT(pthread_attr_init(&attr) == 0);
if (gpr_thd_options_is_detached(options)) {
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ==

@ -63,7 +63,8 @@ static DWORD WINAPI thread_body(void* v) {
return 0;
}
int gpr_thd_new(gpr_thd_id* t, void (*thd_body)(void* arg), void* arg,
int gpr_thd_new(gpr_thd_id* t, const char* thd_name,
void (*thd_body)(void* arg), void* arg,
const gpr_thd_options* options) {
HANDLE handle;
struct thd_info* info = (struct thd_info*)gpr_malloc(sizeof(*info));

@ -128,7 +128,7 @@ void grpc_run_bad_client_test(
GPR_ASSERT(grpc_server_has_open_connections(a.server));
/* Start validator */
gpr_thd_new(&id, thd_func, &a, nullptr);
gpr_thd_new(&id, "gpr_bad_client", thd_func, &a, nullptr);
grpc_slice_buffer_init(&outgoing);
grpc_slice_buffer_add(&outgoing, slice);

@ -262,7 +262,7 @@ static void poll_server_until_read_done(test_tcp_server* server,
poll_args* pa = (poll_args*)gpr_malloc(sizeof(*pa));
pa->server = server;
pa->signal_when_done = signal_when_done;
gpr_thd_new(&id, actually_poll_server, pa, nullptr);
gpr_thd_new(&id, "gpr_poll_server", actually_poll_server, pa, nullptr);
}
static void run_test(const char* response_payload,

@ -529,7 +529,8 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
// Start proxy thread.
gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt));
GPR_ASSERT(gpr_thd_new(&proxy->thd, "gpr_http_proxy", thread_main, proxy,
&opt));
return proxy;
}

@ -98,7 +98,8 @@ grpc_end2end_proxy* grpc_end2end_proxy_create(const grpc_end2end_proxy_def* def,
grpc_call_details_init(&proxy->new_call_details);
gpr_thd_options_set_joinable(&opt);
GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt));
GPR_ASSERT(gpr_thd_new(&proxy->thd, "gpr_end2end_proxy", thread_main, proxy,
&opt));
request_call(proxy);

@ -68,7 +68,8 @@ static void test_connectivity(grpc_end2end_test_config config) {
ce.cq = f.cq;
gpr_event_init(&ce.started);
gpr_thd_options_set_joinable(&thdopt);
GPR_ASSERT(gpr_thd_new(&thdid, child_thread, &ce, &thdopt));
GPR_ASSERT(gpr_thd_new(&thdid, "gpr_connectivity", child_thread, &ce,
&thdopt));
gpr_event_wait(&ce.started, gpr_inf_future(GPR_CLOCK_MONOTONIC));

@ -231,7 +231,8 @@ static bool client_ssl_test(char* server_alpn_preferred) {
gpr_thd_id thdid;
gpr_thd_options_set_joinable(&thdopt);
server_args args = {server_socket, server_alpn_preferred};
GPR_ASSERT(gpr_thd_new(&thdid, server_thread, &args, &thdopt));
GPR_ASSERT(gpr_thd_new(&thdid, "gpr_client_ssl_test", server_thread, &args,
&thdopt));
// Load key pair and establish client SSL credentials.
grpc_ssl_pem_key_cert_pair pem_key_cert_pair;

@ -137,7 +137,8 @@ bool server_ssl_test(const char* alpn_list[], unsigned int alpn_list_len,
gpr_thd_options thdopt = gpr_thd_options_default();
gpr_thd_id thdid;
gpr_thd_options_set_joinable(&thdopt);
GPR_ASSERT(gpr_thd_new(&thdid, server_thread, &port, &thdopt));
GPR_ASSERT(gpr_thd_new(&thdid, "gpr_ssl_test", server_thread, &port,
&thdopt));
SSL_load_error_strings();
OpenSSL_add_ssl_algorithms();

@ -112,7 +112,8 @@ static void test_execute_many(void) {
ta[i].ctr = 0;
ta[i].lock = lock;
gpr_event_init(&ta[i].done);
GPR_ASSERT(gpr_thd_new(&thds[i], execute_many_loop, &ta[i], &options));
GPR_ASSERT(gpr_thd_new(&thds[i], "gpr_execute_many", execute_many_loop,
&ta[i], &options));
}
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
GPR_ASSERT(gpr_event_wait(&ta[i].done,

@ -104,7 +104,7 @@ static void actually_poll(void* argsp) {
static void poll_pollset_until_request_done(args_struct* args) {
gpr_atm_rel_store(&args->done_atm, 0);
gpr_thd_id id;
gpr_thd_new(&id, actually_poll, args, nullptr);
gpr_thd_new(&id, "gpr_poll_pollset", actually_poll, args, nullptr);
}
static void must_succeed(grpc_exec_ctx* exec_ctx, void* argsp,

@ -138,7 +138,7 @@ void test_poll_cv_trigger(void) {
opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
gpr_thd_new(&t_id, "gpr_background_poll", &background_poll, &pargs, &opt);
// Wakeup wakeup_fd not listening for events
GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd1) == GRPC_ERROR_NONE);
@ -154,7 +154,7 @@ void test_poll_cv_trigger(void) {
// Pollin on socket fd
pargs.timeout = -1;
pargs.result = -2;
gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
gpr_thd_new(&t_id, "gpr_background_poll", &background_poll, &pargs, &opt);
trigger_socket_event();
gpr_thd_join(t_id);
GPR_ASSERT(pargs.result == 1);
@ -168,7 +168,7 @@ void test_poll_cv_trigger(void) {
// Pollin on wakeup fd
reset_socket_event();
pargs.result = -2;
gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
gpr_thd_new(&t_id, "gpr_background_poll", &background_poll, &pargs, &opt);
GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd2) == GRPC_ERROR_NONE);
gpr_thd_join(t_id);
@ -182,7 +182,7 @@ void test_poll_cv_trigger(void) {
// Pollin on wakeupfd before poll()
pargs.result = -2;
gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
gpr_thd_new(&t_id, "gpr_background_poll", &background_poll, &pargs, &opt);
gpr_thd_join(t_id);
GPR_ASSERT(pargs.result == 1);
@ -199,7 +199,7 @@ void test_poll_cv_trigger(void) {
reset_socket_event();
GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd1) == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd2) == GRPC_ERROR_NONE);
gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
gpr_thd_new(&t_id, "gpr_background_poll", &background_poll, &pargs, &opt);
gpr_thd_join(t_id);
GPR_ASSERT(pargs.result == 0);

@ -100,7 +100,8 @@ static void concurrent_test(void) {
for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) {
gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
gpr_thd_new(&thds[i], concurrent_test_body, &args, &opt);
gpr_thd_new(&thds[i], "gpr_concurrent_test", concurrent_test_body, &args,
&opt);
}
gpr_event_set(&args.ev_start, (void*)1);

@ -110,7 +110,7 @@ static void cpu_test(void) {
gpr_cv_init(&ct.done_cv);
ct.is_done = 0;
for (i = 0; i < ct.ncores * 3; i++) {
GPR_ASSERT(gpr_thd_new(&thd, &worker_thread, &ct, nullptr));
GPR_ASSERT(gpr_thd_new(&thd, "gpr_cpu_test", &worker_thread, &ct, nullptr));
}
gpr_mu_lock(&ct.mu);
while (!ct.is_done) {

@ -85,7 +85,8 @@ static void test_mt(void) {
ta[i].ctr = 0;
ta[i].q = &q;
ta[i].start = &start;
GPR_ASSERT(gpr_thd_new(&thds[i], test_thread, &ta[i], &options));
GPR_ASSERT(gpr_thd_new(&thds[i], "gpr_test_md", test_thread, &ta[i],
&options));
}
size_t num_done = 0;
size_t spins = 0;
@ -156,7 +157,8 @@ static void test_mt_multipop(void) {
ta[i].ctr = 0;
ta[i].q = &q;
ta[i].start = &start;
GPR_ASSERT(gpr_thd_new(&thds[i], test_thread, &ta[i], &options));
GPR_ASSERT(gpr_thd_new(&thds[i], "gpr_multipop_test", test_thread, &ta[i],
&options));
}
pull_args pa;
pa.ta = ta;
@ -169,7 +171,8 @@ static void test_mt_multipop(void) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) {
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
GPR_ASSERT(gpr_thd_new(&pull_thds[i], pull_thread, &pa, &options));
GPR_ASSERT(gpr_thd_new(&pull_thds[i], "gpr_multipop_pull", pull_thread, &pa,
&options));
}
gpr_event_set(&start, (void*)1);
for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) {

@ -67,7 +67,8 @@ static void test_create_threads(struct test* m, void (*body)(void* arg)) {
for (i = 0; i != m->thread_count; i++) {
gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
GPR_ASSERT(gpr_thd_new(&m->threads[i], body, m, &opt));
GPR_ASSERT(gpr_thd_new(&m->threads[i], "gpr_create_threads", body, m,
&opt));
}
}

@ -189,7 +189,7 @@ static void test_create_threads(struct test* m, void (*body)(void* arg)) {
gpr_thd_id id;
int i;
for (i = 0; i != m->threads; i++) {
GPR_ASSERT(gpr_thd_new(&id, body, m, nullptr));
GPR_ASSERT(gpr_thd_new(&id, "gpr_create_threads", body, m, nullptr));
}
}
@ -244,7 +244,7 @@ static void test(const char* name, void (*body)(void* m),
m = test_new(10, iterations, incr_step);
if (extra != nullptr) {
gpr_thd_id id;
GPR_ASSERT(gpr_thd_new(&id, extra, m, nullptr));
GPR_ASSERT(gpr_thd_new(&id, name, extra, m, nullptr));
m->done++; /* one more thread to wait for */
}
test_create_threads(m, body);

@ -74,7 +74,7 @@ static void test(void) {
t.n = NUM_THREADS;
t.is_done = 0;
for (i = 0; i < NUM_THREADS; i++) {
GPR_ASSERT(gpr_thd_new(&thd, &thd_body, &t, nullptr));
GPR_ASSERT(gpr_thd_new(&thd, "gpr_thread_test", &thd_body, &t, nullptr));
}
gpr_mu_lock(&t.mu);
while (!t.is_done) {
@ -84,7 +84,8 @@ static void test(void) {
GPR_ASSERT(t.n == 0);
gpr_thd_options_set_joinable(&options);
for (i = 0; i < NUM_THREADS; i++) {
GPR_ASSERT(gpr_thd_new(&thds[i], &thd_body_joinable, nullptr, &options));
GPR_ASSERT(gpr_thd_new(&thds[i], "gpr_thread_test_joinable",
&thd_body_joinable, nullptr, &options));
}
for (i = 0; i < NUM_THREADS; i++) {
gpr_thd_join(thds[i]);

@ -56,7 +56,7 @@ int main(int argc, char* argv[]) {
gpr_thd_options_set_joinable(&opt);
for (i = 0; i < NUM_THREADS; i++) {
gpr_thd_new(&threads[i], thd_body, nullptr, &opt);
gpr_thd_new(&threads[i], "gpr_tls_test", thd_body, nullptr, &opt);
}
for (i = 0; i < NUM_THREADS; i++) {
gpr_thd_join(threads[i]);

@ -96,7 +96,8 @@ static void test_too_many_plucks(void) {
}
thread_states[i].cc = cc;
thread_states[i].tag = tags[i];
gpr_thd_new(thread_ids + i, pluck_one, thread_states + i, &thread_options);
gpr_thd_new(thread_ids + i, "gpr_test_pluck", pluck_one, thread_states + i,
&thread_options);
}
/* wait until all other threads are plucking */
@ -233,7 +234,7 @@ static void test_threading(size_t producers, size_t consumers) {
options[i].events_triggered = 0;
options[i].cc = cc;
options[i].id = optid++;
GPR_ASSERT(gpr_thd_new(&id,
GPR_ASSERT(gpr_thd_new(&id, i < producers ? "gpr_producer" : "gpr_consumer",
i < producers ? producer_thread : consumer_thread,
options + i, nullptr));
gpr_event_wait(&options[i].on_started, ten_seconds_time());

@ -180,7 +180,8 @@ int run_concurrent_connectivity_test() {
/* First round, no server */
gpr_log(GPR_DEBUG, "Wave 1");
for (size_t i = 0; i < NUM_THREADS; ++i) {
gpr_thd_new(&threads[i], create_loop_destroy, localhost, &options);
gpr_thd_new(&threads[i], "gpr_wave_1", create_loop_destroy, localhost,
&options);
}
for (size_t i = 0; i < NUM_THREADS; ++i) {
gpr_thd_join(threads[i]);
@ -196,10 +197,11 @@ int run_concurrent_connectivity_test() {
args.cq = grpc_completion_queue_create_for_next(nullptr);
grpc_server_register_completion_queue(args.server, args.cq, nullptr);
grpc_server_start(args.server);
gpr_thd_new(&server, server_thread, &args, &options);
gpr_thd_new(&server, "gpr_wave_2_server", server_thread, &args, &options);
for (size_t i = 0; i < NUM_THREADS; ++i) {
gpr_thd_new(&threads[i], create_loop_destroy, args.addr, &options);
gpr_thd_new(&threads[i], "gpr_wave_2", create_loop_destroy, args.addr,
&options);
}
for (size_t i = 0; i < NUM_THREADS; ++i) {
gpr_thd_join(threads[i]);
@ -216,11 +218,12 @@ int run_concurrent_connectivity_test() {
args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(args.pollset, &args.mu);
gpr_event_init(&args.ready);
gpr_thd_new(&server, bad_server_thread, &args, &options);
gpr_thd_new(&server, "gpr_wave_3_server", bad_server_thread, &args, &options);
gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC));
for (size_t i = 0; i < NUM_THREADS; ++i) {
gpr_thd_new(&threads[i], create_loop_destroy, args.addr, &options);
gpr_thd_new(&threads[i], "gpr_wave_3", create_loop_destroy, args.addr,
&options);
}
for (size_t i = 0; i < NUM_THREADS; ++i) {
gpr_thd_join(threads[i]);
@ -278,7 +281,8 @@ int run_concurrent_watches_with_short_timeouts_test() {
gpr_thd_options_set_joinable(&options);
for (size_t i = 0; i < NUM_THREADS; ++i) {
gpr_thd_new(&threads[i], watches_with_short_timeouts, localhost, &options);
gpr_thd_new(&threads[i], "gpr_watches_with_short_timeouts",
watches_with_short_timeouts, localhost, &options);
}
for (size_t i = 0; i < NUM_THREADS; ++i) {
gpr_thd_join(threads[i]);

@ -70,7 +70,8 @@ static void run_test(const test_fixture* fixture) {
gpr_thd_id server_thread;
gpr_thd_options thdopt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&thdopt);
gpr_thd_new(&server_thread, server_thread_func, &sta, &thdopt);
gpr_thd_new(&server_thread, "gpr_server_thread", server_thread_func, &sta,
&thdopt);
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
grpc_channel* channels[NUM_CONNECTIONS];

Loading…
Cancel
Save