Move assignment for Thread, make destructor optional, loop cv waits

pull/14583/head
Vijay Pai 7 years ago
parent 8ffa1ae933
commit 2fe87b0905
  1. 95
      src/core/lib/gprpp/thd.h
  2. 221
      src/core/lib/gprpp/thd_posix.cc
  3. 156
      src/core/lib/gprpp/thd_windows.cc
  4. 21
      src/core/lib/iomgr/ev_poll_posix.cc
  5. 8
      src/core/lib/iomgr/executor.cc
  6. 4
      src/core/lib/iomgr/timer_manager.cc
  7. 6
      src/core/lib/profiling/basic_timers.cc
  8. 1
      src/core/tsi/alts_transport_security.cc
  9. 4
      test/core/end2end/fixtures/http_proxy_fixture.cc
  10. 4
      test/core/end2end/fixtures/proxy.cc
  11. 3
      test/core/gpr/arena_test.cc
  12. 4
      test/core/gpr/cpu_test.cc
  13. 8
      test/core/gpr/mpscq_test.cc
  14. 4
      test/core/gpr/spinlock_test.cc
  15. 6
      test/core/gpr/sync_test.cc
  16. 3
      test/core/gpr/tls_test.cc
  17. 6
      test/core/gprpp/thd_test.cc
  18. 5
      test/core/iomgr/combiner_test.cc
  19. 3
      test/core/iomgr/ev_epollsig_linux_test.cc
  20. 3
      test/core/iomgr/resolve_address_posix_test.cc
  21. 7
      test/core/surface/completion_queue_threading_test.cc
  22. 14
      test/core/surface/concurrent_connectivity_test.cc

@ -28,34 +28,105 @@
#include <grpc/support/thd_id.h>
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/memory.h"
namespace grpc_core {
namespace internal {
/// Base class for platform-specific thread-state
class ThreadInternalsInterface {
public:
virtual ~ThreadInternalsInterface() {}
virtual void Start() GRPC_ABSTRACT;
virtual void Join() GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
} // namespace internal
class Thread {
public:
/// Default constructor only to allow use in structs that lack constructors
/// Does not produce a validly-constructed thread; must later
/// use placement new to construct a real thread. Does not init mu_ and cv_
Thread() : real_(false), alive_(false), started_(false), joined_(false) {}
Thread() : state_(FAKE), impl_(nullptr) {}
/// Normal constructor to create a thread with name \a thd_name,
/// which will execute a thread based on function \a thd_body
/// with argument \a arg once it is started.
/// The optional \a success argument indicates whether the thread
/// is successfully created.
Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success = nullptr);
~Thread();
void Start();
void Join();
/// Move constructor for thread. After this is called, the other thread
/// no longer represents a living thread object
Thread(Thread&& other) : state_(other.state_), impl_(other.impl_) {
other.state_ = MOVED;
other.impl_ = nullptr;
}
/// Move assignment operator for thread. After this is called, the other
/// thread no longer represents a living thread object. Not allowed if this
/// thread actually exists
Thread& operator=(Thread&& other) {
if (this != &other) {
// TODO(vjpai): if we can be sure that all Thread's are actually
// constructed, then we should assert GPR_ASSERT(impl_ == nullptr) here.
// However, as long as threads come in structures that are
// allocated via gpr_malloc, this will not be the case, so we cannot
// assert it for the time being.
state_ = other.state_;
impl_ = other.impl_;
other.state_ = MOVED;
other.impl_ = nullptr;
}
return *this;
}
/// The destructor is strictly optional; either the thread never came to life
/// and the constructor itself killed it or it has already been joined and
/// the Join function kills it. The destructor shouldn't have to do anything.
~Thread() { GPR_ASSERT(impl_ == nullptr); }
void Start() {
if (impl_ != nullptr) {
GPR_ASSERT(state_ == ALIVE);
state_ = STARTED;
impl_->Start();
} else {
GPR_ASSERT(state_ == FAILED);
}
};
void Join() {
if (impl_ != nullptr) {
GPR_ASSERT(state_ == STARTED);
impl_->Join();
grpc_core::Delete(impl_);
state_ = DONE;
impl_ = nullptr;
} else {
GPR_ASSERT(state_ == FAILED);
}
};
static void Init();
static bool AwaitAll(gpr_timespec deadline);
private:
gpr_mu mu_;
gpr_cv ready_;
gpr_thd_id id_;
bool real_;
bool alive_;
bool started_;
bool joined_;
Thread(const Thread&) = delete;
Thread& operator=(const Thread&) = delete;
/// The thread states are as follows:
/// FAKE -- just a dummy placeholder Thread created by the default constructor
/// ALIVE -- an actual thread of control exists associated with this thread
/// STARTED -- the thread of control has been started
/// DONE -- the thread of control has completed and been joined
/// FAILED -- the thread of control never came alive
/// MOVED -- contents were moved out and we're no longer tracking them
enum ThreadState { FAKE, ALIVE, STARTED, DONE, FAILED, MOVED };
ThreadState state_;
internal::ThreadInternalsInterface* impl_;
};
} // namespace grpc_core

@ -34,6 +34,7 @@
#include "src/core/lib/gpr/fork.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h"
namespace grpc_core {
namespace {
@ -42,131 +43,142 @@ gpr_cv g_cv;
int g_thread_count;
int g_awaiting_threads;
class ThreadInternalsPosix;
struct thd_arg {
Thread* thread;
ThreadInternalsPosix* thread;
void (*body)(void* arg); /* body of a thread */
void* arg; /* argument to a thread */
const char* name; /* name of thread. Can be nullptr. */
};
/*****************************************
* Only used when fork support is enabled
*/
class ThreadInternalsPosix
: public grpc_core::internal::ThreadInternalsInterface {
public:
ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg),
void* arg, bool* success)
: started_(false) {
gpr_mu_init(&mu_);
gpr_cv_init(&ready_);
pthread_attr_t attr;
/* don't use gpr_malloc as we may cause an infinite recursion with
* the profiling code */
thd_arg* info = static_cast<thd_arg*>(malloc(sizeof(*info)));
GPR_ASSERT(info != nullptr);
info->thread = this;
info->body = thd_body;
info->arg = arg;
info->name = thd_name;
inc_thd_count();
GPR_ASSERT(pthread_attr_init(&attr) == 0);
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
0);
*success =
(pthread_create(&pthread_id_, &attr,
[](void* v) -> void* {
thd_arg arg = *static_cast<thd_arg*>(v);
free(v);
if (arg.name != nullptr) {
#if GPR_APPLE_PTHREAD_NAME
/* Apple supports 64 characters, and will
* truncate if it's longer. */
pthread_setname_np(arg.name);
#elif GPR_LINUX_PTHREAD_NAME
/* Linux supports 16 characters max, and will
* error if it's longer. */
char buf[16];
size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
strncpy(buf, arg.name, buf_len);
buf[buf_len] = '\0';
pthread_setname_np(pthread_self(), buf);
#endif // GPR_APPLE_PTHREAD_NAME
}
gpr_mu_lock(&arg.thread->mu_);
while (!arg.thread->started_) {
gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_,
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&arg.thread->mu_);
(*arg.body)(arg.arg);
dec_thd_count();
return nullptr;
},
info) == 0);
GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
if (!success) {
/* don't use gpr_free, as this was allocated using malloc (see above) */
free(info);
dec_thd_count();
}
};
void inc_thd_count() {
if (grpc_fork_support_enabled()) {
gpr_mu_lock(&g_mu);
g_thread_count++;
gpr_mu_unlock(&g_mu);
~ThreadInternalsPosix() override {
gpr_mu_destroy(&mu_);
gpr_cv_destroy(&ready_);
}
}
void dec_thd_count() {
if (grpc_fork_support_enabled()) {
gpr_mu_lock(&g_mu);
g_thread_count--;
if (g_awaiting_threads && g_thread_count == 0) {
gpr_cv_signal(&g_cv);
}
gpr_mu_unlock(&g_mu);
void Start() override {
gpr_mu_lock(&mu_);
started_ = true;
gpr_cv_signal(&ready_);
gpr_mu_unlock(&mu_);
}
}
} // namespace
void Join() override { pthread_join(pthread_id_, nullptr); }
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success)
: real_(true), alive_(false), started_(false), joined_(false) {
gpr_mu_init(&mu_);
gpr_cv_init(&ready_);
pthread_attr_t attr;
/* don't use gpr_malloc as we may cause an infinite recursion with
* the profiling code */
thd_arg* a = static_cast<thd_arg*>(malloc(sizeof(*a)));
GPR_ASSERT(a != nullptr);
a->thread = this;
a->body = thd_body;
a->arg = arg;
a->name = thd_name;
inc_thd_count();
GPR_ASSERT(pthread_attr_init(&attr) == 0);
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0);
pthread_t p;
alive_ = (pthread_create(&p, &attr,
[](void* v) -> void* {
thd_arg a = *static_cast<thd_arg*>(v);
free(v);
if (a.name != nullptr) {
#if GPR_APPLE_PTHREAD_NAME
/* Apple supports 64 characters, and will
* truncate if it's longer. */
pthread_setname_np(a.name);
#elif GPR_LINUX_PTHREAD_NAME
/* Linux supports 16 characters max, and will
* error if it's longer. */
char buf[16];
size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
strncpy(buf, a.name, buf_len);
buf[buf_len] = '\0';
pthread_setname_np(pthread_self(), buf);
#endif // GPR_APPLE_PTHREAD_NAME
}
private:
/*****************************************
* Only used when fork support is enabled
*/
gpr_mu_lock(&a.thread->mu_);
if (!a.thread->started_) {
gpr_cv_wait(&a.thread->ready_, &a.thread->mu_,
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&a.thread->mu_);
(*a.body)(a.arg);
dec_thd_count();
return nullptr;
},
a) == 0);
static void inc_thd_count() {
if (grpc_fork_support_enabled()) {
gpr_mu_lock(&g_mu);
g_thread_count++;
gpr_mu_unlock(&g_mu);
}
}
if (success != nullptr) {
*success = alive_;
static void dec_thd_count() {
if (grpc_fork_support_enabled()) {
gpr_mu_lock(&g_mu);
g_thread_count--;
if (g_awaiting_threads && g_thread_count == 0) {
gpr_cv_signal(&g_cv);
}
gpr_mu_unlock(&g_mu);
}
}
id_ = gpr_thd_id(p);
GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
gpr_mu mu_;
gpr_cv ready_;
bool started_;
pthread_t pthread_id_;
};
if (!alive_) {
/* don't use gpr_free, as this was allocated using malloc (see above) */
free(a);
dec_thd_count();
}
}
} // namespace
Thread::~Thread() {
if (!alive_) {
// This thread never existed, so nothing to do
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success) {
bool outcome;
impl_ =
grpc_core::New<ThreadInternalsPosix>(thd_name, thd_body, arg, &outcome);
if (outcome) {
state_ = ALIVE;
} else {
GPR_ASSERT(joined_);
}
if (real_) {
gpr_mu_destroy(&mu_);
gpr_cv_destroy(&ready_);
state_ = FAILED;
grpc_core::Delete(impl_);
impl_ = nullptr;
}
}
void Thread::Start() {
gpr_mu_lock(&mu_);
if (alive_) {
started_ = true;
gpr_cv_signal(&ready_);
}
gpr_mu_unlock(&mu_);
}
void Thread::Join() {
if (alive_) {
pthread_join(pthread_t(id_), nullptr);
if (success != nullptr) {
*success = outcome;
}
joined_ = true;
}
void Thread::Init() {
@ -180,7 +192,8 @@ bool Thread::AwaitAll(gpr_timespec deadline) {
gpr_mu_lock(&g_mu);
g_awaiting_threads = 1;
int res = 0;
if (g_thread_count > 0) {
while ((g_thread_count > 0) &&
(gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0)) {
res = gpr_cv_wait(&g_cv, &g_mu, deadline);
}
g_awaiting_threads = 0;

@ -29,6 +29,8 @@
#include <grpc/support/thd_id.h>
#include <string.h>
#include "src/core/lib/gprpp/memory.h"
#if defined(_MSC_VER)
#define thread_local __declspec(thread)
#define WIN_LAMBDA
@ -40,8 +42,9 @@
#endif
namespace {
class ThreadInternalsWindows;
struct thd_info {
grpc_core::Thread* thread;
ThreadInternalsWindows* thread;
void (*body)(void* arg); /* body of a thread */
void* arg; /* argument to a thread */
HANDLE join_event; /* the join event */
@ -54,6 +57,77 @@ void destroy_thread(struct thd_info* t) {
CloseHandle(t->join_event);
gpr_free(t);
}
class ThreadInternalsWindows
: public grpc_core::internal::ThreadInternalsInterface {
public:
ThreadInternalsWindows(void (*thd_body)(void* arg), void* arg,
bool* success) {
gpr_mu_init(&mu_);
gpr_cv_init(&ready_);
HANDLE handle;
info_ = (struct thd_info*)gpr_malloc(sizeof(*info_));
info->thread = this;
info->body = thd_body;
info->arg = arg;
info->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
if (info->join_event == nullptr) {
gpr_free(info_);
*success = false;
} else {
handle = CreateThread(
nullptr, 64 * 1024,
[](void* v) WIN_LAMBDA -> DWORD {
g_thd_info = static_cast<thd_info*>(v);
gpr_mu_lock(&g_thd_info->thread->mu_);
while (!g_thd_info->thread->started_) {
gpr_cv_wait(&g_thd_info->thread->ready_, &g_thd_info->thread->mu_,
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&g_thd_info->thread->mu_);
g_thd_info->body(g_thd_info->arg);
BOOL ret = SetEvent(g_thd_info->join_event);
GPR_ASSERT(ret);
return 0;
},
info, 0, nullptr);
if (handle == nullptr) {
destroy_thread(info_);
*success_ = false;
} else {
CloseHandle(handle);
*success = true;
}
}
}
~ThreadInternalsWindows() override {
gpr_mu_destroy(&mu_);
gpr_cv_destroy(&ready_);
}
void Start() override {
gpr_mu_lock(&mu_);
started_ = true;
gpr_cv_signal(&ready_);
gpr_mu_unlock(&mu_);
}
void Join() override {
DWORD ret = WaitForSingleObject(info_->join_event, INFINITE);
GPR_ASSERT(ret == WAIT_OBJECT_0);
destroy_thread(info_);
}
private:
gpr_mu mu_;
gpr_cv ready_;
bool started_;
thd_info* info_;
};
} // namespace
namespace grpc_core {
@ -66,82 +140,22 @@ bool Thread::AwaitAll(gpr_timespec deadline) {
}
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success)
: real_(true), alive_(false), started_(false), joined_(false) {
gpr_mu_init(&mu_);
gpr_cv_init(&ready_);
HANDLE handle;
struct thd_info* info = (struct thd_info*)gpr_malloc(sizeof(*info));
info->thread = this;
info->body = thd_body;
info->arg = arg;
info->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
if (info->join_event == nullptr) {
gpr_free(info);
alive_ = false;
bool* success) {
bool outcome;
impl_ = grpc_core::New<ThreadInternalsWindows>(thd_body, arg, &outcome);
if (outcome) {
state_ = ALIVE;
} else {
handle = CreateThread(nullptr, 64 * 1024,
[](void* v) WIN_LAMBDA -> DWORD {
g_thd_info = static_cast<thd_info*>(v);
gpr_mu_lock(&g_thd_info->thread->mu_);
if (!g_thd_info->thread->started_) {
gpr_cv_wait(&g_thd_info->thread->ready_,
&g_thd_info->thread->mu_,
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&g_thd_info->thread->mu_);
g_thd_info->body(g_thd_info->arg);
BOOL ret = SetEvent(g_thd_info->join_event);
GPR_ASSERT(ret);
return 0;
},
info, 0, nullptr);
if (handle == nullptr) {
destroy_thread(info);
alive_ = false;
} else {
id_ = (gpr_thd_id)info;
CloseHandle(handle);
alive_ = true;
}
}
if (success != nullptr) {
*success = alive_;
}
}
Thread::~Thread() {
if (!alive_) {
// This thread never existed, so nothing to do
} else {
GPR_ASSERT(joined_);
}
if (real_) {
gpr_mu_destroy(&mu_);
gpr_cv_destroy(&ready_);
state_ = FAILED;
grpc_core::Delete(impl_);
impl_ = nullptr;
}
}
void Thread::Start() {
gpr_mu_lock(&mu_);
if (alive_) {
started_ = true;
gpr_cv_signal(&ready_);
if (success != nullptr) {
*success = outcome;
}
gpr_mu_unlock(&mu_);
}
void Thread::Join() {
if (alive_) {
thd_info* info = (thd_info*)id_;
DWORD ret = WaitForSingleObject(info->join_event, INFINITE);
GPR_ASSERT(ret == WAIT_OBJECT_0);
destroy_thread(info);
}
joined_ = true;
}
} // namespace grpc_core
gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)g_thd_info; }

@ -31,7 +31,6 @@
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -259,7 +258,9 @@ typedef struct poll_args {
grpc_core::Thread poller_thd;
gpr_cv trigger;
int trigger_set;
bool harvestable;
gpr_cv harvest;
bool joinable;
gpr_cv join;
struct pollfd* fds;
nfds_t nfds;
@ -1372,6 +1373,8 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
gpr_cv_init(&pargs->trigger);
gpr_cv_init(&pargs->harvest);
gpr_cv_init(&pargs->join);
pargs->harvestable = false;
pargs->joinable = false;
pargs->fds = fds;
pargs->nfds = count;
pargs->next = nullptr;
@ -1380,7 +1383,7 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
init_result(pargs);
cache_poller_locked(pargs);
gpr_ref(&g_cvfds.pollcount);
new (&pargs->poller_thd) grpc_core::Thread("grpc_poller", &run_poll, pargs);
pargs->poller_thd = grpc_core::Thread("grpc_poller", &run_poll, pargs);
pargs->poller_thd.Start();
return pargs;
}
@ -1464,10 +1467,13 @@ static void cache_harvest_locked() {
if (poll_cache.dead_pollers) {
poll_cache.dead_pollers->prev = nullptr;
}
args->harvestable = true;
gpr_cv_signal(&args->harvest);
gpr_cv_wait(&args->join, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
while (!args->joinable) {
gpr_cv_wait(&args->join, &g_cvfds.mu,
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
args->poller_thd.Join();
args->poller_thd.~Thread();
gpr_free(args);
}
}
@ -1533,8 +1539,11 @@ static void run_poll(void* args) {
if (gpr_unref(&g_cvfds.pollcount)) {
gpr_cv_signal(&g_cvfds.shutdown_cv);
}
gpr_cv_wait(&pargs->harvest, &g_cvfds.mu,
gpr_inf_future(GPR_CLOCK_MONOTONIC));
while (!pargs->harvestable) {
gpr_cv_wait(&pargs->harvest, &g_cvfds.mu,
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
pargs->joinable = true;
gpr_cv_signal(&pargs->join);
gpr_mu_unlock(&g_cvfds.mu);
}

@ -21,7 +21,6 @@
#include "src/core/lib/iomgr/executor.h"
#include <string.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/cpu.h>
@ -102,11 +101,11 @@ void grpc_executor_set_threading(bool threading) {
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_init(&g_thread_state[i].mu);
gpr_cv_init(&g_thread_state[i].cv);
new (&g_thread_state[i].thd) grpc_core::Thread();
g_thread_state[i].thd = grpc_core::Thread();
g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT;
}
new (&g_thread_state[0].thd)
g_thread_state[0].thd =
grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]);
g_thread_state[0].thd.Start();
} else {
@ -126,7 +125,6 @@ void grpc_executor_set_threading(bool threading) {
}
gpr_atm_no_barrier_store(&g_cur_threads, 0);
for (size_t i = 0; i < g_max_threads; i++) {
g_thread_state[i].thd.~Thread();
gpr_mu_destroy(&g_thread_state[i].mu);
gpr_cv_destroy(&g_thread_state[i].cv);
run_closures(g_thread_state[i].elems);
@ -266,7 +264,7 @@ static void executor_push(grpc_closure* closure, grpc_error* error,
if (cur_thread_count < g_max_threads) {
gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
new (&g_thread_state[cur_thread_count].thd)
g_thread_state[cur_thread_count].thd =
grpc_core::Thread("grpc_executor", executor_thread,
&g_thread_state[cur_thread_count]);
g_thread_state[cur_thread_count].thd.Start();

@ -19,7 +19,6 @@
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -69,7 +68,6 @@ static void gc_completed_threads(void) {
gpr_mu_unlock(&g_mu);
while (to_gc != nullptr) {
to_gc->thd.Join();
to_gc->thd.~Thread();
completed_thread* next = to_gc->next;
gpr_free(to_gc);
to_gc = next;
@ -88,7 +86,7 @@ static void start_timer_thread_and_unlock(void) {
}
completed_thread* ct =
static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
new (&ct->thd) grpc_core::Thread("grpc_global_timer", timer_thread, ct);
ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
ct->thd.Start();
}

@ -183,7 +183,7 @@ static void finish_writing(void) {
pthread_cond_signal(&g_cv);
pthread_mutex_unlock(&g_mu);
g_writing_thread->Join();
delete g_writing_thread;
grpc_core::Delete(g_writing_thread);
gpr_log(GPR_INFO, "flushing logs");
@ -202,8 +202,8 @@ void gpr_timers_set_log_filename(const char* filename) {
}
static void init_output() {
g_writing_thread =
new grpc_core::Thread("timer_output_thread", writing_thread, nullptr);
g_writing_thread = grpc_core::New<grpc_core::Thread>("timer_output_thread",
writing_thread, nullptr);
atexit(finish_writing);
}

@ -57,7 +57,6 @@ void grpc_tsi_alts_shutdown() {
grpc_completion_queue_destroy(g_alts_resource.cq);
grpc_channel_destroy(g_alts_resource.channel);
g_alts_resource.thread.Join();
g_alts_resource.thread.~Thread();
}
gpr_cv_destroy(&g_alts_resource.cv);
gpr_mu_destroy(&g_alts_resource.mu);

@ -21,7 +21,6 @@
#include "src/core/lib/iomgr/sockaddr.h"
#include <string.h>
#include <new>
#include <grpc/grpc.h>
#include <grpc/slice_buffer.h>
@ -551,7 +550,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
grpc_tcp_server_start(proxy->server, &proxy->pollset, 1, on_accept, proxy);
// Start proxy thread.
new (&proxy->thd) grpc_core::Thread("grpc_http_proxy", thread_main, proxy);
proxy->thd = grpc_core::Thread("grpc_http_proxy", thread_main, proxy);
proxy->thd.Start();
return proxy;
}
@ -566,7 +565,6 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) {
gpr_unref(&proxy->users); // Signal proxy thread to shutdown.
grpc_core::ExecCtx exec_ctx;
proxy->thd.Join();
proxy->thd.~Thread();
grpc_tcp_server_shutdown_listeners(proxy->server);
grpc_tcp_server_unref(proxy->server);
gpr_free(proxy->proxy_name);

@ -19,7 +19,6 @@
#include "test/core/end2end/fixtures/proxy.h"
#include <string.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -98,7 +97,7 @@ grpc_end2end_proxy* grpc_end2end_proxy_create(const grpc_end2end_proxy_def* def,
grpc_server_start(proxy->server);
grpc_call_details_init(&proxy->new_call_details);
new (&proxy->thd) grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
proxy->thd = grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
proxy->thd.Start();
request_call(proxy);
@ -123,7 +122,6 @@ void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) {
grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
new_closure(shutdown_complete, proxy));
proxy->thd.Join();
proxy->thd.~Thread();
gpr_free(proxy->proxy_port);
gpr_free(proxy->server_port);
grpc_server_destroy(proxy->server);

@ -20,7 +20,6 @@
#include <inttypes.h>
#include <string.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -102,7 +101,7 @@ static void concurrent_test(void) {
grpc_core::Thread thds[CONCURRENT_TEST_THREADS];
for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) {
new (&thds[i])
thds[i] =
grpc_core::Thread("grpc_concurrent_test", concurrent_test_body, &args);
thds[i].Start();
}

@ -25,7 +25,6 @@
#include <stdio.h>
#include <string.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -118,7 +117,7 @@ static void cpu_test(void) {
static_cast<grpc_core::Thread*>(gpr_malloc(sizeof(*thd) * nthreads));
for (i = 0; i < nthreads; i++) {
new (&thd[i]) grpc_core::Thread("grpc_cpu_test", &worker_thread, &ct);
thd[i] = grpc_core::Thread("grpc_cpu_test", &worker_thread, &ct);
thd[i].Start();
}
gpr_mu_lock(&ct.mu);
@ -128,7 +127,6 @@ static void cpu_test(void) {
gpr_mu_unlock(&ct.mu);
for (i = 0; i < nthreads; i++) {
thd[i].Join();
thd[i].~Thread();
}
gpr_free(thd);
fprintf(stderr, "Saw cores [");

@ -19,7 +19,6 @@
#include "src/core/lib/gpr/mpscq.h"
#include <stdlib.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -85,7 +84,7 @@ static void test_mt(void) {
ta[i].ctr = 0;
ta[i].q = &q;
ta[i].start = &start;
new (&thds[i]) grpc_core::Thread("grpc_mt_test", test_thread, &ta[i]);
thds[i] = grpc_core::Thread("grpc_mt_test", test_thread, &ta[i]);
thds[i].Start();
}
size_t num_done = 0;
@ -155,7 +154,7 @@ static void test_mt_multipop(void) {
ta[i].ctr = 0;
ta[i].q = &q;
ta[i].start = &start;
new (&thds[i]) grpc_core::Thread("grpc_multipop_test", test_thread, &ta[i]);
thds[i] = grpc_core::Thread("grpc_multipop_test", test_thread, &ta[i]);
thds[i].Start();
}
pull_args pa;
@ -167,8 +166,7 @@ static void test_mt_multipop(void) {
pa.start = &start;
gpr_mu_init(&pa.mu);
for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) {
new (&pull_thds[i])
grpc_core::Thread("grpc_multipop_pull", pull_thread, &pa);
pull_thds[i] = grpc_core::Thread("grpc_multipop_pull", pull_thread, &pa);
pull_thds[i].Start();
}
gpr_event_set(&start, (void*)1);

@ -22,7 +22,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -69,7 +68,7 @@ static void test_destroy(struct test* m) {
static void test_create_threads(struct test* m, void (*body)(void* arg)) {
int i;
for (i = 0; i != m->thread_count; i++) {
new (&m->threads[i]) grpc_core::Thread("grpc_create_threads", body, m);
m->threads[i] = grpc_core::Thread("grpc_create_threads", body, m);
m->threads[i].Start();
}
}
@ -79,7 +78,6 @@ static void test_wait(struct test* m) {
int i;
for (i = 0; i != m->thread_count; i++) {
m->threads[i].Join();
m->threads[i].~Thread();
}
}

@ -22,7 +22,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -196,7 +195,7 @@ static void test_destroy(struct test* m) {
static void test_create_threads(struct test* m, void (*body)(void* arg)) {
int i;
for (i = 0; i != m->nthreads; i++) {
new (&m->threads[i]) grpc_core::Thread("grpc_create_threads", body, m);
m->threads[i] = grpc_core::Thread("grpc_create_threads", body, m);
m->threads[i].Start();
}
}
@ -210,7 +209,6 @@ static void test_wait(struct test* m) {
gpr_mu_unlock(&m->mu);
for (int i = 0; i != m->nthreads; i++) {
m->threads[i].Join();
m->threads[i].~Thread();
}
}
@ -258,7 +256,7 @@ static void test(const char* name, void (*body)(void* m),
m = test_new(10, iterations, incr_step);
grpc_core::Thread extra_thd;
if (extra != nullptr) {
new (&extra_thd) grpc_core::Thread(name, extra, m);
extra_thd = grpc_core::Thread(name, extra, m);
extra_thd.Start();
m->done++; /* one more thread to wait for */
}

@ -22,7 +22,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <new>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@ -56,7 +55,7 @@ int main(int argc, char* argv[]) {
gpr_tls_init(&test_var);
for (auto& th : threads) {
new (&th) grpc_core::Thread("grpc_tls_test", thd_body, nullptr);
th = grpc_core::Thread("grpc_tls_test", thd_body, nullptr);
th.Start();
}
for (auto& th : threads) {

@ -22,7 +22,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <new>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@ -60,7 +59,7 @@ static void test1(void) {
t.n = NUM_THREADS;
t.is_done = 0;
for (auto& th : thds) {
new (&th) grpc_core::Thread("grpc_thread_body1_test", &thd_body1, &t);
th = grpc_core::Thread("grpc_thread_body1_test", &thd_body1, &t);
th.Start();
}
gpr_mu_lock(&t.mu);
@ -81,8 +80,7 @@ static void test2(void) {
grpc_core::Thread thds[NUM_THREADS];
for (auto& th : thds) {
bool ok;
new (&th)
grpc_core::Thread("grpc_thread_body2_test", &thd_body2, nullptr, &ok);
th = grpc_core::Thread("grpc_thread_body2_test", &thd_body2, nullptr, &ok);
GPR_ASSERT(ok);
th.Start();
}

@ -18,8 +18,6 @@
#include "src/core/lib/iomgr/combiner.h"
#include <new>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -105,8 +103,7 @@ static void test_execute_many(void) {
ta[i].ctr = 0;
ta[i].lock = lock;
gpr_event_init(&ta[i].done);
new (&thds[i])
grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]);
thds[i] = grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]);
thds[i].Start();
}
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {

@ -25,7 +25,6 @@
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <new>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@ -262,7 +261,7 @@ static void test_threading(void) {
grpc_core::Thread thds[10];
for (auto& th : thds) {
new (&th) grpc_core::Thread("test_thread", test_threading_loop, &shared);
th = grpc_core::Thread("test_thread", test_threading_loop, &shared);
th.Start();
}
grpc_wakeup_fd fd;

@ -20,7 +20,6 @@
#include <string.h>
#include <sys/un.h>
#include <new>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@ -106,7 +105,7 @@ static void actually_poll(void* argsp) {
static void poll_pollset_until_request_done(args_struct* args) {
gpr_atm_rel_store(&args->done_atm, 0);
new (&args->thd) grpc_core::Thread("grpc_poll_pollset", actually_poll, args);
args->thd = grpc_core::Thread("grpc_poll_pollset", actually_poll, args);
args->thd.Start();
}

@ -18,8 +18,6 @@
#include "src/core/lib/surface/completion_queue.h"
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@ -96,7 +94,7 @@ static void test_too_many_plucks(void) {
}
thread_states[i].cc = cc;
thread_states[i].tag = tags[i];
new (&threads[i])
threads[i] =
grpc_core::Thread("grpc_pluck_test", pluck_one, thread_states + i);
threads[i].Start();
}
@ -234,7 +232,7 @@ static void test_threading(size_t producers, size_t consumers) {
options[i].id = optid++;
bool ok;
new (&threads[i]) grpc_core::Thread(
threads[i] = grpc_core::Thread(
i < producers ? "grpc_producer" : "grpc_consumer",
i < producers ? producer_thread : consumer_thread, options + i, &ok);
GPR_ASSERT(ok);
@ -273,7 +271,6 @@ static void test_threading(size_t producers, size_t consumers) {
for (i = 0; i < producers + consumers; i++) {
threads[i].Join();
threads[i].~Thread();
}
gpr_free(threads);

@ -24,7 +24,6 @@
#include <memory.h>
#include <stdio.h>
#include <new>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@ -179,8 +178,7 @@ int run_concurrent_connectivity_test() {
char* localhost = gpr_strdup("localhost:54321");
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
new (&th)
grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost);
th = grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost);
th.Start();
}
for (auto& th : threads) {
@ -204,8 +202,7 @@ int run_concurrent_connectivity_test() {
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
new (&th)
grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr);
th = grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr);
th.Start();
}
for (auto& th : threads) {
@ -231,8 +228,7 @@ int run_concurrent_connectivity_test() {
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
new (&th)
grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr);
th = grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr);
th.Start();
}
for (auto& th : threads) {
@ -291,8 +287,8 @@ int run_concurrent_watches_with_short_timeouts_test() {
char* localhost = gpr_strdup("localhost:54321");
for (auto& th : threads) {
new (&th) grpc_core::Thread("grpc_short_watches",
watches_with_short_timeouts, localhost);
th = grpc_core::Thread("grpc_short_watches", watches_with_short_timeouts,
localhost);
th.Start();
}
for (auto& th : threads) {

Loading…
Cancel
Save