From 2fe87b09055cd256cdce038c4c70d92b955c991b Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 1 Mar 2018 11:36:47 -0800 Subject: [PATCH] Move assignment for Thread, make destructor optional, loop cv waits --- src/core/lib/gprpp/thd.h | 95 +++++++- src/core/lib/gprpp/thd_posix.cc | 221 +++++++++--------- src/core/lib/gprpp/thd_windows.cc | 156 +++++++------ src/core/lib/iomgr/ev_poll_posix.cc | 21 +- src/core/lib/iomgr/executor.cc | 8 +- src/core/lib/iomgr/timer_manager.cc | 4 +- src/core/lib/profiling/basic_timers.cc | 6 +- src/core/tsi/alts_transport_security.cc | 1 - .../end2end/fixtures/http_proxy_fixture.cc | 4 +- test/core/end2end/fixtures/proxy.cc | 4 +- test/core/gpr/arena_test.cc | 3 +- test/core/gpr/cpu_test.cc | 4 +- test/core/gpr/mpscq_test.cc | 8 +- test/core/gpr/spinlock_test.cc | 4 +- test/core/gpr/sync_test.cc | 6 +- test/core/gpr/tls_test.cc | 3 +- test/core/gprpp/thd_test.cc | 6 +- test/core/iomgr/combiner_test.cc | 5 +- test/core/iomgr/ev_epollsig_linux_test.cc | 3 +- test/core/iomgr/resolve_address_posix_test.cc | 3 +- .../completion_queue_threading_test.cc | 7 +- .../surface/concurrent_connectivity_test.cc | 14 +- 22 files changed, 330 insertions(+), 256 deletions(-) diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h index f45e78e7f62..2f868935195 100644 --- a/src/core/lib/gprpp/thd.h +++ b/src/core/lib/gprpp/thd.h @@ -28,34 +28,105 @@ #include #include +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/memory.h" + namespace grpc_core { +namespace internal { + +/// Base class for platform-specific thread-state +class ThreadInternalsInterface { + public: + virtual ~ThreadInternalsInterface() {} + virtual void Start() GRPC_ABSTRACT; + virtual void Join() GRPC_ABSTRACT; + GRPC_ABSTRACT_BASE_CLASS +}; +} // namespace internal class Thread { public: /// Default constructor only to allow use in structs that lack constructors /// Does not produce a validly-constructed thread; must later /// use placement new to construct a real thread. Does not init mu_ and cv_ - Thread() : real_(false), alive_(false), started_(false), joined_(false) {} + Thread() : state_(FAKE), impl_(nullptr) {} + /// Normal constructor to create a thread with name \a thd_name, + /// which will execute a thread based on function \a thd_body + /// with argument \a arg once it is started. + /// The optional \a success argument indicates whether the thread + /// is successfully created. Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, bool* success = nullptr); - ~Thread(); - void Start(); - void Join(); + /// Move constructor for thread. After this is called, the other thread + /// no longer represents a living thread object + Thread(Thread&& other) : state_(other.state_), impl_(other.impl_) { + other.state_ = MOVED; + other.impl_ = nullptr; + } + + /// Move assignment operator for thread. After this is called, the other + /// thread no longer represents a living thread object. Not allowed if this + /// thread actually exists + Thread& operator=(Thread&& other) { + if (this != &other) { + // TODO(vjpai): if we can be sure that all Thread's are actually + // constructed, then we should assert GPR_ASSERT(impl_ == nullptr) here. + // However, as long as threads come in structures that are + // allocated via gpr_malloc, this will not be the case, so we cannot + // assert it for the time being. + state_ = other.state_; + impl_ = other.impl_; + other.state_ = MOVED; + other.impl_ = nullptr; + } + return *this; + } + + /// The destructor is strictly optional; either the thread never came to life + /// and the constructor itself killed it or it has already been joined and + /// the Join function kills it. The destructor shouldn't have to do anything. + ~Thread() { GPR_ASSERT(impl_ == nullptr); } + + void Start() { + if (impl_ != nullptr) { + GPR_ASSERT(state_ == ALIVE); + state_ = STARTED; + impl_->Start(); + } else { + GPR_ASSERT(state_ == FAILED); + } + }; + void Join() { + if (impl_ != nullptr) { + GPR_ASSERT(state_ == STARTED); + impl_->Join(); + grpc_core::Delete(impl_); + state_ = DONE; + impl_ = nullptr; + } else { + GPR_ASSERT(state_ == FAILED); + } + }; static void Init(); static bool AwaitAll(gpr_timespec deadline); private: - gpr_mu mu_; - gpr_cv ready_; - - gpr_thd_id id_; - bool real_; - bool alive_; - bool started_; - bool joined_; + Thread(const Thread&) = delete; + Thread& operator=(const Thread&) = delete; + + /// The thread states are as follows: + /// FAKE -- just a dummy placeholder Thread created by the default constructor + /// ALIVE -- an actual thread of control exists associated with this thread + /// STARTED -- the thread of control has been started + /// DONE -- the thread of control has completed and been joined + /// FAILED -- the thread of control never came alive + /// MOVED -- contents were moved out and we're no longer tracking them + enum ThreadState { FAKE, ALIVE, STARTED, DONE, FAILED, MOVED }; + ThreadState state_; + internal::ThreadInternalsInterface* impl_; }; } // namespace grpc_core diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc index 4ded4d3fd5a..28e47f1aa99 100644 --- a/src/core/lib/gprpp/thd_posix.cc +++ b/src/core/lib/gprpp/thd_posix.cc @@ -34,6 +34,7 @@ #include "src/core/lib/gpr/fork.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/memory.h" namespace grpc_core { namespace { @@ -42,131 +43,142 @@ gpr_cv g_cv; int g_thread_count; int g_awaiting_threads; +class ThreadInternalsPosix; struct thd_arg { - Thread* thread; + ThreadInternalsPosix* thread; void (*body)(void* arg); /* body of a thread */ void* arg; /* argument to a thread */ const char* name; /* name of thread. Can be nullptr. */ }; -/***************************************** - * Only used when fork support is enabled - */ +class ThreadInternalsPosix + : public grpc_core::internal::ThreadInternalsInterface { + public: + ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg), + void* arg, bool* success) + : started_(false) { + gpr_mu_init(&mu_); + gpr_cv_init(&ready_); + pthread_attr_t attr; + /* don't use gpr_malloc as we may cause an infinite recursion with + * the profiling code */ + thd_arg* info = static_cast(malloc(sizeof(*info))); + GPR_ASSERT(info != nullptr); + info->thread = this; + info->body = thd_body; + info->arg = arg; + info->name = thd_name; + inc_thd_count(); + + GPR_ASSERT(pthread_attr_init(&attr) == 0); + GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == + 0); + + *success = + (pthread_create(&pthread_id_, &attr, + [](void* v) -> void* { + thd_arg arg = *static_cast(v); + free(v); + if (arg.name != nullptr) { +#if GPR_APPLE_PTHREAD_NAME + /* Apple supports 64 characters, and will + * truncate if it's longer. */ + pthread_setname_np(arg.name); +#elif GPR_LINUX_PTHREAD_NAME + /* Linux supports 16 characters max, and will + * error if it's longer. */ + char buf[16]; + size_t buf_len = GPR_ARRAY_SIZE(buf) - 1; + strncpy(buf, arg.name, buf_len); + buf[buf_len] = '\0'; + pthread_setname_np(pthread_self(), buf); +#endif // GPR_APPLE_PTHREAD_NAME + } + + gpr_mu_lock(&arg.thread->mu_); + while (!arg.thread->started_) { + gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + gpr_mu_unlock(&arg.thread->mu_); + + (*arg.body)(arg.arg); + dec_thd_count(); + return nullptr; + }, + info) == 0); + + GPR_ASSERT(pthread_attr_destroy(&attr) == 0); + + if (!success) { + /* don't use gpr_free, as this was allocated using malloc (see above) */ + free(info); + dec_thd_count(); + } + }; -void inc_thd_count() { - if (grpc_fork_support_enabled()) { - gpr_mu_lock(&g_mu); - g_thread_count++; - gpr_mu_unlock(&g_mu); + ~ThreadInternalsPosix() override { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&ready_); } -} -void dec_thd_count() { - if (grpc_fork_support_enabled()) { - gpr_mu_lock(&g_mu); - g_thread_count--; - if (g_awaiting_threads && g_thread_count == 0) { - gpr_cv_signal(&g_cv); - } - gpr_mu_unlock(&g_mu); + void Start() override { + gpr_mu_lock(&mu_); + started_ = true; + gpr_cv_signal(&ready_); + gpr_mu_unlock(&mu_); } -} -} // namespace + void Join() override { pthread_join(pthread_id_, nullptr); } -Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, - bool* success) - : real_(true), alive_(false), started_(false), joined_(false) { - gpr_mu_init(&mu_); - gpr_cv_init(&ready_); - pthread_attr_t attr; - /* don't use gpr_malloc as we may cause an infinite recursion with - * the profiling code */ - thd_arg* a = static_cast(malloc(sizeof(*a))); - GPR_ASSERT(a != nullptr); - a->thread = this; - a->body = thd_body; - a->arg = arg; - a->name = thd_name; - inc_thd_count(); - - GPR_ASSERT(pthread_attr_init(&attr) == 0); - GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0); - - pthread_t p; - alive_ = (pthread_create(&p, &attr, - [](void* v) -> void* { - thd_arg a = *static_cast(v); - free(v); - if (a.name != nullptr) { -#if GPR_APPLE_PTHREAD_NAME - /* Apple supports 64 characters, and will - * truncate if it's longer. */ - pthread_setname_np(a.name); -#elif GPR_LINUX_PTHREAD_NAME - /* Linux supports 16 characters max, and will - * error if it's longer. */ - char buf[16]; - size_t buf_len = GPR_ARRAY_SIZE(buf) - 1; - strncpy(buf, a.name, buf_len); - buf[buf_len] = '\0'; - pthread_setname_np(pthread_self(), buf); -#endif // GPR_APPLE_PTHREAD_NAME - } + private: + /***************************************** + * Only used when fork support is enabled + */ - gpr_mu_lock(&a.thread->mu_); - if (!a.thread->started_) { - gpr_cv_wait(&a.thread->ready_, &a.thread->mu_, - gpr_inf_future(GPR_CLOCK_MONOTONIC)); - } - gpr_mu_unlock(&a.thread->mu_); - - (*a.body)(a.arg); - dec_thd_count(); - return nullptr; - }, - a) == 0); + static void inc_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count++; + gpr_mu_unlock(&g_mu); + } + } - if (success != nullptr) { - *success = alive_; + static void dec_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count--; + if (g_awaiting_threads && g_thread_count == 0) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); + } } - id_ = gpr_thd_id(p); - GPR_ASSERT(pthread_attr_destroy(&attr) == 0); + gpr_mu mu_; + gpr_cv ready_; + bool started_; + pthread_t pthread_id_; +}; - if (!alive_) { - /* don't use gpr_free, as this was allocated using malloc (see above) */ - free(a); - dec_thd_count(); - } -} +} // namespace -Thread::~Thread() { - if (!alive_) { - // This thread never existed, so nothing to do +Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, + bool* success) { + bool outcome; + impl_ = + grpc_core::New(thd_name, thd_body, arg, &outcome); + if (outcome) { + state_ = ALIVE; } else { - GPR_ASSERT(joined_); - } - if (real_) { - gpr_mu_destroy(&mu_); - gpr_cv_destroy(&ready_); + state_ = FAILED; + grpc_core::Delete(impl_); + impl_ = nullptr; } -} -void Thread::Start() { - gpr_mu_lock(&mu_); - if (alive_) { - started_ = true; - gpr_cv_signal(&ready_); - } - gpr_mu_unlock(&mu_); -} - -void Thread::Join() { - if (alive_) { - pthread_join(pthread_t(id_), nullptr); + if (success != nullptr) { + *success = outcome; } - joined_ = true; } void Thread::Init() { @@ -180,7 +192,8 @@ bool Thread::AwaitAll(gpr_timespec deadline) { gpr_mu_lock(&g_mu); g_awaiting_threads = 1; int res = 0; - if (g_thread_count > 0) { + while ((g_thread_count > 0) && + (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0)) { res = gpr_cv_wait(&g_cv, &g_mu, deadline); } g_awaiting_threads = 0; diff --git a/src/core/lib/gprpp/thd_windows.cc b/src/core/lib/gprpp/thd_windows.cc index efbed30ac66..e13c2f63d18 100644 --- a/src/core/lib/gprpp/thd_windows.cc +++ b/src/core/lib/gprpp/thd_windows.cc @@ -29,6 +29,8 @@ #include #include +#include "src/core/lib/gprpp/memory.h" + #if defined(_MSC_VER) #define thread_local __declspec(thread) #define WIN_LAMBDA @@ -40,8 +42,9 @@ #endif namespace { +class ThreadInternalsWindows; struct thd_info { - grpc_core::Thread* thread; + ThreadInternalsWindows* thread; void (*body)(void* arg); /* body of a thread */ void* arg; /* argument to a thread */ HANDLE join_event; /* the join event */ @@ -54,6 +57,77 @@ void destroy_thread(struct thd_info* t) { CloseHandle(t->join_event); gpr_free(t); } + +class ThreadInternalsWindows + : public grpc_core::internal::ThreadInternalsInterface { + public: + ThreadInternalsWindows(void (*thd_body)(void* arg), void* arg, + bool* success) { + gpr_mu_init(&mu_); + gpr_cv_init(&ready_); + + HANDLE handle; + info_ = (struct thd_info*)gpr_malloc(sizeof(*info_)); + info->thread = this; + info->body = thd_body; + info->arg = arg; + + info->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr); + if (info->join_event == nullptr) { + gpr_free(info_); + *success = false; + } else { + handle = CreateThread( + nullptr, 64 * 1024, + [](void* v) WIN_LAMBDA -> DWORD { + g_thd_info = static_cast(v); + gpr_mu_lock(&g_thd_info->thread->mu_); + while (!g_thd_info->thread->started_) { + gpr_cv_wait(&g_thd_info->thread->ready_, &g_thd_info->thread->mu_, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + gpr_mu_unlock(&g_thd_info->thread->mu_); + g_thd_info->body(g_thd_info->arg); + BOOL ret = SetEvent(g_thd_info->join_event); + GPR_ASSERT(ret); + return 0; + }, + info, 0, nullptr); + if (handle == nullptr) { + destroy_thread(info_); + *success_ = false; + } else { + CloseHandle(handle); + *success = true; + } + } + } + + ~ThreadInternalsWindows() override { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&ready_); + } + + void Start() override { + gpr_mu_lock(&mu_); + started_ = true; + gpr_cv_signal(&ready_); + gpr_mu_unlock(&mu_); + } + + void Join() override { + DWORD ret = WaitForSingleObject(info_->join_event, INFINITE); + GPR_ASSERT(ret == WAIT_OBJECT_0); + destroy_thread(info_); + } + + private: + gpr_mu mu_; + gpr_cv ready_; + bool started_; + thd_info* info_; +}; + } // namespace namespace grpc_core { @@ -66,82 +140,22 @@ bool Thread::AwaitAll(gpr_timespec deadline) { } Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, - bool* success) - : real_(true), alive_(false), started_(false), joined_(false) { - gpr_mu_init(&mu_); - gpr_cv_init(&ready_); - - HANDLE handle; - struct thd_info* info = (struct thd_info*)gpr_malloc(sizeof(*info)); - info->thread = this; - info->body = thd_body; - info->arg = arg; - - info->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr); - if (info->join_event == nullptr) { - gpr_free(info); - alive_ = false; + bool* success) { + bool outcome; + impl_ = grpc_core::New(thd_body, arg, &outcome); + if (outcome) { + state_ = ALIVE; } else { - handle = CreateThread(nullptr, 64 * 1024, - [](void* v) WIN_LAMBDA -> DWORD { - g_thd_info = static_cast(v); - gpr_mu_lock(&g_thd_info->thread->mu_); - if (!g_thd_info->thread->started_) { - gpr_cv_wait(&g_thd_info->thread->ready_, - &g_thd_info->thread->mu_, - gpr_inf_future(GPR_CLOCK_MONOTONIC)); - } - gpr_mu_unlock(&g_thd_info->thread->mu_); - g_thd_info->body(g_thd_info->arg); - BOOL ret = SetEvent(g_thd_info->join_event); - GPR_ASSERT(ret); - return 0; - }, - info, 0, nullptr); - if (handle == nullptr) { - destroy_thread(info); - alive_ = false; - } else { - id_ = (gpr_thd_id)info; - CloseHandle(handle); - alive_ = true; - } - } - if (success != nullptr) { - *success = alive_; - } -} - -Thread::~Thread() { - if (!alive_) { - // This thread never existed, so nothing to do - } else { - GPR_ASSERT(joined_); - } - if (real_) { - gpr_mu_destroy(&mu_); - gpr_cv_destroy(&ready_); + state_ = FAILED; + grpc_core::Delete(impl_); + impl_ = nullptr; } -} -void Thread::Start() { - gpr_mu_lock(&mu_); - if (alive_) { - started_ = true; - gpr_cv_signal(&ready_); + if (success != nullptr) { + *success = outcome; } - gpr_mu_unlock(&mu_); } -void Thread::Join() { - if (alive_) { - thd_info* info = (thd_info*)id_; - DWORD ret = WaitForSingleObject(info->join_event, INFINITE); - GPR_ASSERT(ret == WAIT_OBJECT_0); - destroy_thread(info); - } - joined_ = true; -} } // namespace grpc_core gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; } diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 4fc8ce9ecef..6120f9f44b7 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -31,7 +31,6 @@ #include #include #include -#include #include #include @@ -259,7 +258,9 @@ typedef struct poll_args { grpc_core::Thread poller_thd; gpr_cv trigger; int trigger_set; + bool harvestable; gpr_cv harvest; + bool joinable; gpr_cv join; struct pollfd* fds; nfds_t nfds; @@ -1372,6 +1373,8 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) { gpr_cv_init(&pargs->trigger); gpr_cv_init(&pargs->harvest); gpr_cv_init(&pargs->join); + pargs->harvestable = false; + pargs->joinable = false; pargs->fds = fds; pargs->nfds = count; pargs->next = nullptr; @@ -1380,7 +1383,7 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) { init_result(pargs); cache_poller_locked(pargs); gpr_ref(&g_cvfds.pollcount); - new (&pargs->poller_thd) grpc_core::Thread("grpc_poller", &run_poll, pargs); + pargs->poller_thd = grpc_core::Thread("grpc_poller", &run_poll, pargs); pargs->poller_thd.Start(); return pargs; } @@ -1464,10 +1467,13 @@ static void cache_harvest_locked() { if (poll_cache.dead_pollers) { poll_cache.dead_pollers->prev = nullptr; } + args->harvestable = true; gpr_cv_signal(&args->harvest); - gpr_cv_wait(&args->join, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + while (!args->joinable) { + gpr_cv_wait(&args->join, &g_cvfds.mu, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } args->poller_thd.Join(); - args->poller_thd.~Thread(); gpr_free(args); } } @@ -1533,8 +1539,11 @@ static void run_poll(void* args) { if (gpr_unref(&g_cvfds.pollcount)) { gpr_cv_signal(&g_cvfds.shutdown_cv); } - gpr_cv_wait(&pargs->harvest, &g_cvfds.mu, - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + while (!pargs->harvestable) { + gpr_cv_wait(&pargs->harvest, &g_cvfds.mu, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + pargs->joinable = true; gpr_cv_signal(&pargs->join); gpr_mu_unlock(&g_cvfds.mu); } diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 74e530e8980..b017db53f8a 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -21,7 +21,6 @@ #include "src/core/lib/iomgr/executor.h" #include -#include #include #include @@ -102,11 +101,11 @@ void grpc_executor_set_threading(bool threading) { for (size_t i = 0; i < g_max_threads; i++) { gpr_mu_init(&g_thread_state[i].mu); gpr_cv_init(&g_thread_state[i].cv); - new (&g_thread_state[i].thd) grpc_core::Thread(); + g_thread_state[i].thd = grpc_core::Thread(); g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT; } - new (&g_thread_state[0].thd) + g_thread_state[0].thd = grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]); g_thread_state[0].thd.Start(); } else { @@ -126,7 +125,6 @@ void grpc_executor_set_threading(bool threading) { } gpr_atm_no_barrier_store(&g_cur_threads, 0); for (size_t i = 0; i < g_max_threads; i++) { - g_thread_state[i].thd.~Thread(); gpr_mu_destroy(&g_thread_state[i].mu); gpr_cv_destroy(&g_thread_state[i].cv); run_closures(g_thread_state[i].elems); @@ -266,7 +264,7 @@ static void executor_push(grpc_closure* closure, grpc_error* error, if (cur_thread_count < g_max_threads) { gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - new (&g_thread_state[cur_thread_count].thd) + g_thread_state[cur_thread_count].thd = grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[cur_thread_count]); g_thread_state[cur_thread_count].thd.Start(); diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index 7efbaa83646..94f288af276 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -19,7 +19,6 @@ #include #include -#include #include #include @@ -69,7 +68,6 @@ static void gc_completed_threads(void) { gpr_mu_unlock(&g_mu); while (to_gc != nullptr) { to_gc->thd.Join(); - to_gc->thd.~Thread(); completed_thread* next = to_gc->next; gpr_free(to_gc); to_gc = next; @@ -88,7 +86,7 @@ static void start_timer_thread_and_unlock(void) { } completed_thread* ct = static_cast(gpr_malloc(sizeof(*ct))); - new (&ct->thd) grpc_core::Thread("grpc_global_timer", timer_thread, ct); + ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct); ct->thd.Start(); } diff --git a/src/core/lib/profiling/basic_timers.cc b/src/core/lib/profiling/basic_timers.cc index 97646d10008..43384fd0cad 100644 --- a/src/core/lib/profiling/basic_timers.cc +++ b/src/core/lib/profiling/basic_timers.cc @@ -183,7 +183,7 @@ static void finish_writing(void) { pthread_cond_signal(&g_cv); pthread_mutex_unlock(&g_mu); g_writing_thread->Join(); - delete g_writing_thread; + grpc_core::Delete(g_writing_thread); gpr_log(GPR_INFO, "flushing logs"); @@ -202,8 +202,8 @@ void gpr_timers_set_log_filename(const char* filename) { } static void init_output() { - g_writing_thread = - new grpc_core::Thread("timer_output_thread", writing_thread, nullptr); + g_writing_thread = grpc_core::New("timer_output_thread", + writing_thread, nullptr); atexit(finish_writing); } diff --git a/src/core/tsi/alts_transport_security.cc b/src/core/tsi/alts_transport_security.cc index 67798718b78..2fd408103bf 100644 --- a/src/core/tsi/alts_transport_security.cc +++ b/src/core/tsi/alts_transport_security.cc @@ -57,7 +57,6 @@ void grpc_tsi_alts_shutdown() { grpc_completion_queue_destroy(g_alts_resource.cq); grpc_channel_destroy(g_alts_resource.channel); g_alts_resource.thread.Join(); - g_alts_resource.thread.~Thread(); } gpr_cv_destroy(&g_alts_resource.cv); gpr_mu_destroy(&g_alts_resource.mu); diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index d074fdaa52e..58353376f3f 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -21,7 +21,6 @@ #include "src/core/lib/iomgr/sockaddr.h" #include -#include #include #include @@ -551,7 +550,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create( grpc_tcp_server_start(proxy->server, &proxy->pollset, 1, on_accept, proxy); // Start proxy thread. - new (&proxy->thd) grpc_core::Thread("grpc_http_proxy", thread_main, proxy); + proxy->thd = grpc_core::Thread("grpc_http_proxy", thread_main, proxy); proxy->thd.Start(); return proxy; } @@ -566,7 +565,6 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { gpr_unref(&proxy->users); // Signal proxy thread to shutdown. grpc_core::ExecCtx exec_ctx; proxy->thd.Join(); - proxy->thd.~Thread(); grpc_tcp_server_shutdown_listeners(proxy->server); grpc_tcp_server_unref(proxy->server); gpr_free(proxy->proxy_name); diff --git a/test/core/end2end/fixtures/proxy.cc b/test/core/end2end/fixtures/proxy.cc index 5b4f3b92365..042c858b4cb 100644 --- a/test/core/end2end/fixtures/proxy.cc +++ b/test/core/end2end/fixtures/proxy.cc @@ -19,7 +19,6 @@ #include "test/core/end2end/fixtures/proxy.h" #include -#include #include #include @@ -98,7 +97,7 @@ grpc_end2end_proxy* grpc_end2end_proxy_create(const grpc_end2end_proxy_def* def, grpc_server_start(proxy->server); grpc_call_details_init(&proxy->new_call_details); - new (&proxy->thd) grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy); + proxy->thd = grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy); proxy->thd.Start(); request_call(proxy); @@ -123,7 +122,6 @@ void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) { grpc_server_shutdown_and_notify(proxy->server, proxy->cq, new_closure(shutdown_complete, proxy)); proxy->thd.Join(); - proxy->thd.~Thread(); gpr_free(proxy->proxy_port); gpr_free(proxy->server_port); grpc_server_destroy(proxy->server); diff --git a/test/core/gpr/arena_test.cc b/test/core/gpr/arena_test.cc index b00c014cea3..111414ea3e3 100644 --- a/test/core/gpr/arena_test.cc +++ b/test/core/gpr/arena_test.cc @@ -20,7 +20,6 @@ #include #include -#include #include #include @@ -102,7 +101,7 @@ static void concurrent_test(void) { grpc_core::Thread thds[CONCURRENT_TEST_THREADS]; for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) { - new (&thds[i]) + thds[i] = grpc_core::Thread("grpc_concurrent_test", concurrent_test_body, &args); thds[i].Start(); } diff --git a/test/core/gpr/cpu_test.cc b/test/core/gpr/cpu_test.cc index 279e6e6f5a2..1052d40b421 100644 --- a/test/core/gpr/cpu_test.cc +++ b/test/core/gpr/cpu_test.cc @@ -25,7 +25,6 @@ #include #include -#include #include #include @@ -118,7 +117,7 @@ static void cpu_test(void) { static_cast(gpr_malloc(sizeof(*thd) * nthreads)); for (i = 0; i < nthreads; i++) { - new (&thd[i]) grpc_core::Thread("grpc_cpu_test", &worker_thread, &ct); + thd[i] = grpc_core::Thread("grpc_cpu_test", &worker_thread, &ct); thd[i].Start(); } gpr_mu_lock(&ct.mu); @@ -128,7 +127,6 @@ static void cpu_test(void) { gpr_mu_unlock(&ct.mu); for (i = 0; i < nthreads; i++) { thd[i].Join(); - thd[i].~Thread(); } gpr_free(thd); fprintf(stderr, "Saw cores ["); diff --git a/test/core/gpr/mpscq_test.cc b/test/core/gpr/mpscq_test.cc index 1e929fcf337..8c0873941f4 100644 --- a/test/core/gpr/mpscq_test.cc +++ b/test/core/gpr/mpscq_test.cc @@ -19,7 +19,6 @@ #include "src/core/lib/gpr/mpscq.h" #include -#include #include #include @@ -85,7 +84,7 @@ static void test_mt(void) { ta[i].ctr = 0; ta[i].q = &q; ta[i].start = &start; - new (&thds[i]) grpc_core::Thread("grpc_mt_test", test_thread, &ta[i]); + thds[i] = grpc_core::Thread("grpc_mt_test", test_thread, &ta[i]); thds[i].Start(); } size_t num_done = 0; @@ -155,7 +154,7 @@ static void test_mt_multipop(void) { ta[i].ctr = 0; ta[i].q = &q; ta[i].start = &start; - new (&thds[i]) grpc_core::Thread("grpc_multipop_test", test_thread, &ta[i]); + thds[i] = grpc_core::Thread("grpc_multipop_test", test_thread, &ta[i]); thds[i].Start(); } pull_args pa; @@ -167,8 +166,7 @@ static void test_mt_multipop(void) { pa.start = &start; gpr_mu_init(&pa.mu); for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) { - new (&pull_thds[i]) - grpc_core::Thread("grpc_multipop_pull", pull_thread, &pa); + pull_thds[i] = grpc_core::Thread("grpc_multipop_pull", pull_thread, &pa); pull_thds[i].Start(); } gpr_event_set(&start, (void*)1); diff --git a/test/core/gpr/spinlock_test.cc b/test/core/gpr/spinlock_test.cc index ac9f70f3012..0ee72edb153 100644 --- a/test/core/gpr/spinlock_test.cc +++ b/test/core/gpr/spinlock_test.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -69,7 +68,7 @@ static void test_destroy(struct test* m) { static void test_create_threads(struct test* m, void (*body)(void* arg)) { int i; for (i = 0; i != m->thread_count; i++) { - new (&m->threads[i]) grpc_core::Thread("grpc_create_threads", body, m); + m->threads[i] = grpc_core::Thread("grpc_create_threads", body, m); m->threads[i].Start(); } } @@ -79,7 +78,6 @@ static void test_wait(struct test* m) { int i; for (i = 0; i != m->thread_count; i++) { m->threads[i].Join(); - m->threads[i].~Thread(); } } diff --git a/test/core/gpr/sync_test.cc b/test/core/gpr/sync_test.cc index 487f394b149..24b4562819f 100644 --- a/test/core/gpr/sync_test.cc +++ b/test/core/gpr/sync_test.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -196,7 +195,7 @@ static void test_destroy(struct test* m) { static void test_create_threads(struct test* m, void (*body)(void* arg)) { int i; for (i = 0; i != m->nthreads; i++) { - new (&m->threads[i]) grpc_core::Thread("grpc_create_threads", body, m); + m->threads[i] = grpc_core::Thread("grpc_create_threads", body, m); m->threads[i].Start(); } } @@ -210,7 +209,6 @@ static void test_wait(struct test* m) { gpr_mu_unlock(&m->mu); for (int i = 0; i != m->nthreads; i++) { m->threads[i].Join(); - m->threads[i].~Thread(); } } @@ -258,7 +256,7 @@ static void test(const char* name, void (*body)(void* m), m = test_new(10, iterations, incr_step); grpc_core::Thread extra_thd; if (extra != nullptr) { - new (&extra_thd) grpc_core::Thread(name, extra, m); + extra_thd = grpc_core::Thread(name, extra, m); extra_thd.Start(); m->done++; /* one more thread to wait for */ } diff --git a/test/core/gpr/tls_test.cc b/test/core/gpr/tls_test.cc index a060cd47f10..0502fc7ef4d 100644 --- a/test/core/gpr/tls_test.cc +++ b/test/core/gpr/tls_test.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -56,7 +55,7 @@ int main(int argc, char* argv[]) { gpr_tls_init(&test_var); for (auto& th : threads) { - new (&th) grpc_core::Thread("grpc_tls_test", thd_body, nullptr); + th = grpc_core::Thread("grpc_tls_test", thd_body, nullptr); th.Start(); } for (auto& th : threads) { diff --git a/test/core/gprpp/thd_test.cc b/test/core/gprpp/thd_test.cc index a126784e724..82dd681049b 100644 --- a/test/core/gprpp/thd_test.cc +++ b/test/core/gprpp/thd_test.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -60,7 +59,7 @@ static void test1(void) { t.n = NUM_THREADS; t.is_done = 0; for (auto& th : thds) { - new (&th) grpc_core::Thread("grpc_thread_body1_test", &thd_body1, &t); + th = grpc_core::Thread("grpc_thread_body1_test", &thd_body1, &t); th.Start(); } gpr_mu_lock(&t.mu); @@ -81,8 +80,7 @@ static void test2(void) { grpc_core::Thread thds[NUM_THREADS]; for (auto& th : thds) { bool ok; - new (&th) - grpc_core::Thread("grpc_thread_body2_test", &thd_body2, nullptr, &ok); + th = grpc_core::Thread("grpc_thread_body2_test", &thd_body2, nullptr, &ok); GPR_ASSERT(ok); th.Start(); } diff --git a/test/core/iomgr/combiner_test.cc b/test/core/iomgr/combiner_test.cc index e4f4783e58c..cf2c7db8462 100644 --- a/test/core/iomgr/combiner_test.cc +++ b/test/core/iomgr/combiner_test.cc @@ -18,8 +18,6 @@ #include "src/core/lib/iomgr/combiner.h" -#include - #include #include #include @@ -105,8 +103,7 @@ static void test_execute_many(void) { ta[i].ctr = 0; ta[i].lock = lock; gpr_event_init(&ta[i].done); - new (&thds[i]) - grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]); + thds[i] = grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]); thds[i].Start(); } for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { diff --git a/test/core/iomgr/ev_epollsig_linux_test.cc b/test/core/iomgr/ev_epollsig_linux_test.cc index 4ec6fc8b0d4..c3ba6d7c149 100644 --- a/test/core/iomgr/ev_epollsig_linux_test.cc +++ b/test/core/iomgr/ev_epollsig_linux_test.cc @@ -25,7 +25,6 @@ #include #include #include -#include #include #include @@ -262,7 +261,7 @@ static void test_threading(void) { grpc_core::Thread thds[10]; for (auto& th : thds) { - new (&th) grpc_core::Thread("test_thread", test_threading_loop, &shared); + th = grpc_core::Thread("test_thread", test_threading_loop, &shared); th.Start(); } grpc_wakeup_fd fd; diff --git a/test/core/iomgr/resolve_address_posix_test.cc b/test/core/iomgr/resolve_address_posix_test.cc index 895ede24c7f..79b2b50e702 100644 --- a/test/core/iomgr/resolve_address_posix_test.cc +++ b/test/core/iomgr/resolve_address_posix_test.cc @@ -20,7 +20,6 @@ #include #include -#include #include #include @@ -106,7 +105,7 @@ static void actually_poll(void* argsp) { static void poll_pollset_until_request_done(args_struct* args) { gpr_atm_rel_store(&args->done_atm, 0); - new (&args->thd) grpc_core::Thread("grpc_poll_pollset", actually_poll, args); + args->thd = grpc_core::Thread("grpc_poll_pollset", actually_poll, args); args->thd.Start(); } diff --git a/test/core/surface/completion_queue_threading_test.cc b/test/core/surface/completion_queue_threading_test.cc index 1a76d7e6aee..9c8d8d8395d 100644 --- a/test/core/surface/completion_queue_threading_test.cc +++ b/test/core/surface/completion_queue_threading_test.cc @@ -18,8 +18,6 @@ #include "src/core/lib/surface/completion_queue.h" -#include - #include #include #include @@ -96,7 +94,7 @@ static void test_too_many_plucks(void) { } thread_states[i].cc = cc; thread_states[i].tag = tags[i]; - new (&threads[i]) + threads[i] = grpc_core::Thread("grpc_pluck_test", pluck_one, thread_states + i); threads[i].Start(); } @@ -234,7 +232,7 @@ static void test_threading(size_t producers, size_t consumers) { options[i].id = optid++; bool ok; - new (&threads[i]) grpc_core::Thread( + threads[i] = grpc_core::Thread( i < producers ? "grpc_producer" : "grpc_consumer", i < producers ? producer_thread : consumer_thread, options + i, &ok); GPR_ASSERT(ok); @@ -273,7 +271,6 @@ static void test_threading(size_t producers, size_t consumers) { for (i = 0; i < producers + consumers; i++) { threads[i].Join(); - threads[i].~Thread(); } gpr_free(threads); diff --git a/test/core/surface/concurrent_connectivity_test.cc b/test/core/surface/concurrent_connectivity_test.cc index 32b4ae1da8a..c1298b66360 100644 --- a/test/core/surface/concurrent_connectivity_test.cc +++ b/test/core/surface/concurrent_connectivity_test.cc @@ -24,7 +24,6 @@ #include #include -#include #include #include @@ -179,8 +178,7 @@ int run_concurrent_connectivity_test() { char* localhost = gpr_strdup("localhost:54321"); grpc_core::Thread threads[NUM_THREADS]; for (auto& th : threads) { - new (&th) - grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost); + th = grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost); th.Start(); } for (auto& th : threads) { @@ -204,8 +202,7 @@ int run_concurrent_connectivity_test() { grpc_core::Thread threads[NUM_THREADS]; for (auto& th : threads) { - new (&th) - grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr); + th = grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr); th.Start(); } for (auto& th : threads) { @@ -231,8 +228,7 @@ int run_concurrent_connectivity_test() { grpc_core::Thread threads[NUM_THREADS]; for (auto& th : threads) { - new (&th) - grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr); + th = grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr); th.Start(); } for (auto& th : threads) { @@ -291,8 +287,8 @@ int run_concurrent_watches_with_short_timeouts_test() { char* localhost = gpr_strdup("localhost:54321"); for (auto& th : threads) { - new (&th) grpc_core::Thread("grpc_short_watches", - watches_with_short_timeouts, localhost); + th = grpc_core::Thread("grpc_short_watches", watches_with_short_timeouts, + localhost); th.Start(); } for (auto& th : threads) {