Merge branch '2phase_thd' of github.com:vjpai/grpc into 2phase_thd

reviewable/pr14459/r1
Vijay Pai 7 years ago
commit 1d71d461df
  1. 5
      src/core/lib/gprpp/thd.h
  2. 64
      src/core/lib/gprpp/thd_posix.cc
  3. 4
      src/core/lib/iomgr/ev_poll_posix.cc
  4. 17
      src/core/lib/iomgr/executor.cc
  5. 2
      src/core/lib/iomgr/iomgr.cc
  6. 2
      src/core/lib/iomgr/resolve_address_posix.cc
  7. 2
      src/core/lib/iomgr/timer_manager.cc
  8. 2
      src/core/lib/iomgr/wakeup_fd_cv.cc
  9. 4
      src/core/lib/profiling/basic_timers.cc
  10. 11
      test/core/end2end/bad_server_response_test.cc
  11. 2
      test/core/end2end/fixtures/http_proxy_fixture.cc
  12. 4
      test/core/end2end/fixtures/proxy.cc
  13. 8
      test/core/gpr/arena_test.cc
  14. 4
      test/core/gpr/cpu_test.cc
  15. 10
      test/core/gpr/mpscq_test.cc
  16. 2
      test/core/gpr/spinlock_test.cc
  17. 6
      test/core/gpr/sync_test.cc
  18. 2
      test/core/gpr/tls_test.cc
  19. 14
      test/core/gprpp/thd_test.cc
  20. 6
      test/core/iomgr/combiner_test.cc
  21. 2
      test/core/iomgr/ev_epollsig_linux_test.cc
  22. 4
      test/core/iomgr/resolve_address_posix_test.cc
  23. 142
      test/core/iomgr/wakeup_fd_cv_test.cc
  24. 2
      test/core/network_benchmarks/low_level_ping_pong.cc
  25. 14
      test/core/surface/completion_queue_threading_test.cc
  26. 129
      test/core/surface/concurrent_connectivity_test.cc
  27. 2
      test/cpp/client/client_channel_stress_test.cc

@ -35,10 +35,10 @@ class Thread {
/// Default constructor only to allow use in structs that lack constructors
/// Does not produce a validly-constructed thread; must later
/// use placement new to construct a real thread. Does not init mu_ and cv_
Thread(): real_(false), alive_(false), started_(false), joined_(false) {}
Thread() : real_(false), alive_(false), started_(false), joined_(false) {}
Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success = nullptr);
bool* success = nullptr);
~Thread() {
if (!alive_) {
// This thread never existed, so nothing to do
@ -63,6 +63,7 @@ class Thread {
static void Init();
static bool AwaitAll(gpr_timespec deadline);
private:
gpr_mu mu_;
gpr_cv ready_;

@ -75,8 +75,8 @@ void dec_thd_count() {
} // namespace
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success):
real_(true), alive_(false), started_(false), joined_(false) {
bool* success)
: real_(true), alive_(false), started_(false), joined_(false) {
gpr_mu_init(&mu_);
gpr_cv_init(&ready_);
pthread_attr_t attr;
@ -94,36 +94,42 @@ Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == 0);
pthread_t p;
alive_ = (pthread_create(&p, &attr, [](void *v) -> void* {
thd_arg a = *static_cast<thd_arg*>(v);
free(v);
if (a.name != nullptr) {
alive_ = (pthread_create(&p, &attr,
[](void* v) -> void* {
thd_arg a = *static_cast<thd_arg*>(v);
free(v);
if (a.name != nullptr) {
#if GPR_APPLE_PTHREAD_NAME
/* Apple supports 64 characters, and will truncate if it's longer. */
pthread_setname_np(a.name);
/* Apple supports 64 characters, and will
* truncate if it's longer. */
pthread_setname_np(a.name);
#elif GPR_LINUX_PTHREAD_NAME
/* Linux supports 16 characters max, and will error if it's longer. */
char buf[16];
size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
strncpy(buf, a.name, buf_len);
buf[buf_len] = '\0';
pthread_setname_np(pthread_self(), buf);
/* Linux supports 16 characters max, and will
* error if it's longer. */
char buf[16];
size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
strncpy(buf, a.name, buf_len);
buf[buf_len] = '\0';
pthread_setname_np(pthread_self(), buf);
#endif // GPR_APPLE_PTHREAD_NAME
}
gpr_mu_lock(&a.thread->mu_);
if (!a.thread->started_) {
gpr_cv_wait(&a.thread->ready_, &a.thread->mu_,
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&a.thread->mu_);
(*a.body)(a.arg);
dec_thd_count();
return nullptr;
}, a) == 0);
if (success != nullptr) { *success = alive_; }
}
gpr_mu_lock(&a.thread->mu_);
if (!a.thread->started_) {
gpr_cv_wait(&a.thread->ready_, &a.thread->mu_,
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&a.thread->mu_);
(*a.body)(a.arg);
dec_thd_count();
return nullptr;
},
a) == 0);
if (success != nullptr) {
*success = alive_;
}
id_ = reinterpret_cast<gpr_thd_id>(p);
GPR_ASSERT(pthread_attr_destroy(&attr) == 0);

@ -22,7 +22,6 @@
#include "src/core/lib/iomgr/ev_poll_posix.h"
#include <new>
#include <assert.h>
#include <errno.h>
#include <limits.h>
@ -30,6 +29,7 @@
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -37,9 +37,9 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/murmur_hash.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/wakeup_fd_cv.h"

@ -18,8 +18,8 @@
#include "src/core/lib/iomgr/executor.h"
#include <new>
#include <string.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/cpu.h>
@ -28,9 +28,9 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#define MAX_DEPTH 2
@ -104,9 +104,8 @@ void grpc_executor_set_threading(bool threading) {
g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT;
}
new (&g_thread_state[0].thd) grpc_core::Thread("grpc_executor",
executor_thread,
&g_thread_state[0]);
new (&g_thread_state[0].thd)
grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]);
g_thread_state[0].thd.Start();
} else {
if (cur_threads == 0) return;
@ -265,10 +264,10 @@ static void executor_push(grpc_closure* closure, grpc_error* error,
if (cur_thread_count < g_max_threads) {
gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
new (&g_thread_state[cur_thread_count].thd)
grpc_core::Thread("grpc_executor", executor_thread,
&g_thread_state[cur_thread_count]);
g_thread_state[cur_thread_count].thd.Start();
new (&g_thread_state[cur_thread_count].thd)
grpc_core::Thread("grpc_executor", executor_thread,
&g_thread_state[cur_thread_count]);
g_thread_state[cur_thread_count].thd.Start();
}
gpr_spinlock_unlock(&g_adding_thread_lock);
}

@ -31,8 +31,8 @@
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"

@ -33,8 +33,8 @@
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"

@ -18,8 +18,8 @@
#include "src/core/lib/iomgr/timer_manager.h"
#include <new>
#include <inttypes.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>

@ -30,8 +30,8 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#define MAX_TABLE_RESIZE 256

@ -202,8 +202,8 @@ void gpr_timers_set_log_filename(const char* filename) {
}
static void init_output() {
g_writing_thread = new grpc_core::Thread("timer_output_thread",
writing_thread, nullptr);
g_writing_thread =
new grpc_core::Thread("timer_output_thread", writing_thread, nullptr);
atexit(finish_writing);
}

@ -254,16 +254,15 @@ static void actually_poll_server(void* arg) {
gpr_free(pa);
}
static grpc_core::Thread*
poll_server_until_read_done(test_tcp_server* server,
gpr_event* signal_when_done) {
static grpc_core::Thread* poll_server_until_read_done(
test_tcp_server* server, gpr_event* signal_when_done) {
gpr_atm_rel_store(&state.done_atm, 0);
state.write_done = 0;
poll_args* pa = static_cast<poll_args*>(gpr_malloc(sizeof(*pa)));
pa->server = server;
pa->signal_when_done = signal_when_done;
auto* th = grpc_core::New<grpc_core::Thread>("grpc_poll_server",
actually_poll_server, pa);
actually_poll_server, pa);
th->Start();
return th;
}
@ -285,8 +284,8 @@ static void run_test(const char* response_payload,
state.response_payload_length = response_payload_length;
/* poll server until sending out the response */
grpc_core::UniquePtr<grpc_core::Thread>
thdptr(poll_server_until_read_done(&test_server, &ev));
grpc_core::UniquePtr<grpc_core::Thread> thdptr(
poll_server_until_read_done(&test_server, &ev));
start_rpc(server_port, expected_status, expected_detail);
gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
thdptr->Join();

@ -20,8 +20,8 @@
#include "src/core/lib/iomgr/sockaddr.h"
#include <new>
#include <string.h>
#include <new>
#include <grpc/grpc.h>
#include <grpc/slice_buffer.h>

@ -18,16 +18,16 @@
#include "test/core/end2end/fixtures/proxy.h"
#include <new>
#include <string.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "test/core/util/port.h"
struct grpc_end2end_proxy {

@ -18,9 +18,9 @@
#include "src/core/lib/gpr/arena.h"
#include <new>
#include <inttypes.h>
#include <string.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -28,8 +28,8 @@
#include <grpc/support/sync.h>
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "test/core/util/test_config.h"
static void test_noop(void) { gpr_arena_destroy(gpr_arena_create(1)); }
@ -102,8 +102,8 @@ static void concurrent_test(void) {
grpc_core::Thread thds[CONCURRENT_TEST_THREADS];
for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) {
new (&thds[i]) grpc_core::Thread("grpc_concurrent_test",
concurrent_test_body, &args);
new (&thds[i])
grpc_core::Thread("grpc_concurrent_test", concurrent_test_body, &args);
thds[i].Start();
}

@ -23,9 +23,9 @@
#include <grpc/support/cpu.h>
#include <new>
#include <stdio.h>
#include <string.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -115,7 +115,7 @@ static void cpu_test(void) {
uint32_t nthreads = ct.ncores * 3;
grpc_core::Thread* thd =
static_cast<grpc_core::Thread*>(gpr_malloc(sizeof(*thd)*nthreads));
static_cast<grpc_core::Thread*>(gpr_malloc(sizeof(*thd) * nthreads));
for (i = 0; i < nthreads; i++) {
new (&thd[i]) grpc_core::Thread("grpc_cpu_test", &worker_thread, &ct);

@ -18,15 +18,15 @@
#include "src/core/lib/gpr/mpscq.h"
#include <new>
#include <stdlib.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "test/core/util/test_config.h"
typedef struct test_node {
@ -167,12 +167,12 @@ static void test_mt_multipop(void) {
pa.start = &start;
gpr_mu_init(&pa.mu);
for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) {
new (&pull_thds[i]) grpc_core::Thread("grpc_multipop_pull",
pull_thread, &pa);
new (&pull_thds[i])
grpc_core::Thread("grpc_multipop_pull", pull_thread, &pa);
pull_thds[i].Start();
}
gpr_event_set(&start, (void*)1);
for (auto& pth: pull_thds) {
for (auto& pth : pull_thds) {
pth.Join();
}
gpr_log(GPR_DEBUG, "spins: %" PRIdPTR, pa.spins);

@ -20,9 +20,9 @@
#include "src/core/lib/gpr/spinlock.h"
#include <new>
#include <stdio.h>
#include <stdlib.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>

@ -20,9 +20,9 @@
#include <grpc/support/sync.h>
#include <new>
#include <stdio.h>
#include <stdlib.h>
#include <new>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -164,8 +164,8 @@ struct test {
static struct test* test_new(int nthreads, int64_t iterations, int incr_step) {
struct test* m = static_cast<struct test*>(gpr_malloc(sizeof(*m)));
m->nthreads = nthreads;
m->threads =
static_cast<grpc_core::Thread*>(gpr_malloc(sizeof(*m->threads) * nthreads));
m->threads = static_cast<grpc_core::Thread*>(
gpr_malloc(sizeof(*m->threads) * nthreads));
m->iterations = iterations;
m->counter = 0;
m->thread_count = 0;

@ -20,9 +20,9 @@
#include "src/core/lib/gpr/tls.h"
#include <new>
#include <stdio.h>
#include <stdlib.h>
#include <new>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>

@ -20,9 +20,9 @@
#include "src/core/lib/gprpp/thd.h"
#include <new>
#include <stdio.h>
#include <stdlib.h>
#include <new>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@ -59,7 +59,7 @@ static void test1(void) {
gpr_cv_init(&t.done_cv);
t.n = NUM_THREADS;
t.is_done = 0;
for (auto& th: thds) {
for (auto& th : thds) {
new (&th) grpc_core::Thread("grpc_thread_body1_test", &thd_body1, &t);
th.Start();
}
@ -68,7 +68,7 @@ static void test1(void) {
gpr_cv_wait(&t.done_cv, &t.mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&t.mu);
for (auto& th: thds) {
for (auto& th : thds) {
th.Join();
}
GPR_ASSERT(t.n == 0);
@ -79,14 +79,14 @@ static void thd_body2(void* v) {}
/* Test that we can create a number of threads and join them. */
static void test2(void) {
grpc_core::Thread thds[NUM_THREADS];
for (auto& th: thds) {
for (auto& th : thds) {
bool ok;
new (&th) grpc_core::Thread("grpc_thread_body2_test", &thd_body2,
nullptr, &ok);
new (&th)
grpc_core::Thread("grpc_thread_body2_test", &thd_body2, nullptr, &ok);
GPR_ASSERT(ok);
th.Start();
}
for (auto& th: thds) {
for (auto& th : thds) {
th.Join();
}
}

@ -24,8 +24,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "test/core/util/test_config.h"
static void test_no_op(void) {
@ -105,8 +105,8 @@ static void test_execute_many(void) {
ta[i].ctr = 0;
ta[i].lock = lock;
gpr_event_init(&ta[i].done);
new (&thds[i]) grpc_core::Thread("grpc_execute_many",
execute_many_loop, &ta[i]);
new (&thds[i])
grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]);
thds[i].Start();
}
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {

@ -30,8 +30,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "test/core/util/test_config.h"

@ -18,9 +18,9 @@
#include "src/core/lib/iomgr/resolve_address.h"
#include <new>
#include <string.h>
#include <sys/un.h>
#include <new>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@ -28,8 +28,8 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "test/core/util/test_config.h"

@ -134,89 +134,89 @@ void test_poll_cv_trigger(void) {
pargs.result = -2;
{
grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs);
thd.Start();
// Wakeup wakeup_fd not listening for events
GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd1) == GRPC_ERROR_NONE);
thd.Join();
GPR_ASSERT(pargs.result == 0);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == 0);
GPR_ASSERT(pfds[2].revents == 0);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs);
thd.Start();
// Wakeup wakeup_fd not listening for events
GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd1) == GRPC_ERROR_NONE);
thd.Join();
GPR_ASSERT(pargs.result == 0);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == 0);
GPR_ASSERT(pfds[2].revents == 0);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
}
{
// Pollin on socket fd
pargs.timeout = -1;
pargs.result = -2;
grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs);
thd.Start();
trigger_socket_event();
thd.Join();
GPR_ASSERT(pargs.result == 1);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == 0);
GPR_ASSERT(pfds[2].revents == POLLIN);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
// Pollin on socket fd
pargs.timeout = -1;
pargs.result = -2;
grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs);
thd.Start();
trigger_socket_event();
thd.Join();
GPR_ASSERT(pargs.result == 1);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == 0);
GPR_ASSERT(pfds[2].revents == POLLIN);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
}
{
// Pollin on wakeup fd
reset_socket_event();
pargs.result = -2;
grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs);
thd.Start();
GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd2) == GRPC_ERROR_NONE);
thd.Join();
GPR_ASSERT(pargs.result == 1);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == POLLIN);
GPR_ASSERT(pfds[2].revents == 0);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
// Pollin on wakeup fd
reset_socket_event();
pargs.result = -2;
grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs);
thd.Start();
GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd2) == GRPC_ERROR_NONE);
thd.Join();
GPR_ASSERT(pargs.result == 1);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == POLLIN);
GPR_ASSERT(pfds[2].revents == 0);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
}
{
// Pollin on wakeupfd before poll()
pargs.result = -2;
grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs);
thd.Start();
thd.Join();
GPR_ASSERT(pargs.result == 1);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == POLLIN);
GPR_ASSERT(pfds[2].revents == 0);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
// Pollin on wakeupfd before poll()
pargs.result = -2;
grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs);
thd.Start();
thd.Join();
GPR_ASSERT(pargs.result == 1);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == POLLIN);
GPR_ASSERT(pfds[2].revents == 0);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
}
{
// No Events
pargs.result = -2;
pargs.timeout = 1000;
reset_socket_event();
GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd1) == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd2) == GRPC_ERROR_NONE);
grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs);
thd.Start();
thd.Join();
GPR_ASSERT(pargs.result == 0);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == 0);
GPR_ASSERT(pfds[2].revents == 0);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
// No Events
pargs.result = -2;
pargs.timeout = 1000;
reset_socket_event();
GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd1) == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd2) == GRPC_ERROR_NONE);
grpc_core::Thread thd("grpc_background_poll", &background_poll, &pargs);
thd.Start();
thd.Join();
GPR_ASSERT(pargs.result == 0);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == 0);
GPR_ASSERT(pfds[2].revents == 0);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
}
}

@ -38,8 +38,8 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "test/core/util/cmdline.h"

@ -24,8 +24,8 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "test/core/util/test_config.h"
@ -96,8 +96,8 @@ static void test_too_many_plucks(void) {
}
thread_states[i].cc = cc;
thread_states[i].tag = tags[i];
new (&threads[i]) grpc_core::Thread("grpc_pluck_test", pluck_one,
thread_states + i);
new (&threads[i])
grpc_core::Thread("grpc_pluck_test", pluck_one, thread_states + i);
threads[i].Start();
}
@ -221,8 +221,7 @@ static void test_threading(size_t producers, size_t consumers) {
"test_threading", producers, consumers);
/* start all threads: they will wait for phase1 */
grpc_core::Thread* threads =
reinterpret_cast<grpc_core::Thread*>(
grpc_core::Thread* threads = reinterpret_cast<grpc_core::Thread*>(
gpr_malloc(sizeof(*threads) * (producers + consumers)));
for (i = 0; i < producers + consumers; i++) {
gpr_event_init(&options[i].on_started);
@ -236,9 +235,8 @@ static void test_threading(size_t producers, size_t consumers) {
bool ok;
new (&threads[i]) grpc_core::Thread(
i < producers ? "grpc_producer" : "grpc_consumer",
i < producers ? producer_thread : consumer_thread,
options + i, &ok);
i < producers ? "grpc_producer" : "grpc_consumer",
i < producers ? producer_thread : consumer_thread, options + i, &ok);
GPR_ASSERT(ok);
threads[i].Start();
gpr_event_wait(&options[i].on_started, ten_seconds_time());

@ -22,9 +22,9 @@
headers. Therefore, sockaddr.h must always be included first */
#include "src/core/lib/iomgr/sockaddr.h"
#include <new>
#include <memory.h>
#include <stdio.h>
#include <new>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@ -175,75 +175,78 @@ int run_concurrent_connectivity_test() {
/* First round, no server */
{
gpr_log(GPR_DEBUG, "Wave 1");
char* localhost = gpr_strdup("localhost:54321");
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
new (&th) grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost);
th.Start();
}
for (auto& th : threads) {
th.Join();
}
gpr_free(localhost);
gpr_log(GPR_DEBUG, "Wave 1");
char* localhost = gpr_strdup("localhost:54321");
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
new (&th)
grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost);
th.Start();
}
for (auto& th : threads) {
th.Join();
}
gpr_free(localhost);
}
{
/* Second round, actual grpc server */
gpr_log(GPR_DEBUG, "Wave 2");
int port = grpc_pick_unused_port_or_die();
gpr_asprintf(&args.addr, "localhost:%d", port);
args.server = grpc_server_create(nullptr, nullptr);
grpc_server_add_insecure_http2_port(args.server, args.addr);
args.cq = grpc_completion_queue_create_for_next(nullptr);
grpc_server_register_completion_queue(args.server, args.cq, nullptr);
grpc_server_start(args.server);
grpc_core::Thread server2("grpc_wave_2_server", server_thread, &args);
server2.Start();
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
new (&th) grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr);
th.Start();
}
for (auto& th : threads) {
th.Join();
}
grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e));
/* Second round, actual grpc server */
gpr_log(GPR_DEBUG, "Wave 2");
int port = grpc_pick_unused_port_or_die();
gpr_asprintf(&args.addr, "localhost:%d", port);
args.server = grpc_server_create(nullptr, nullptr);
grpc_server_add_insecure_http2_port(args.server, args.addr);
args.cq = grpc_completion_queue_create_for_next(nullptr);
grpc_server_register_completion_queue(args.server, args.cq, nullptr);
grpc_server_start(args.server);
grpc_core::Thread server2("grpc_wave_2_server", server_thread, &args);
server2.Start();
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
new (&th)
grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr);
th.Start();
}
for (auto& th : threads) {
th.Join();
}
grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e));
server2.Join();
grpc_server_destroy(args.server);
grpc_completion_queue_destroy(args.cq);
gpr_free(args.addr);
server2.Join();
grpc_server_destroy(args.server);
grpc_completion_queue_destroy(args.cq);
gpr_free(args.addr);
}
{
/* Third round, bogus tcp server */
gpr_log(GPR_DEBUG, "Wave 3");
args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(args.pollset, &args.mu);
gpr_event_init(&args.ready);
grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args);
server3.Start();
gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC));
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
new (&th) grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr);
th.Start();
}
for (auto& th : threads) {
th.Join();
}
/* Third round, bogus tcp server */
gpr_log(GPR_DEBUG, "Wave 3");
args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(args.pollset, &args.mu);
gpr_event_init(&args.ready);
grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args);
server3.Start();
gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC));
grpc_core::Thread threads[NUM_THREADS];
for (auto& th : threads) {
new (&th)
grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr);
th.Start();
}
for (auto& th : threads) {
th.Join();
}
gpr_atm_rel_store(&args.stop, 1);
server3.Join();
{
grpc_core::ExecCtx exec_ctx;
grpc_pollset_shutdown(
args.pollset, GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset,
grpc_schedule_on_exec_ctx));
}
gpr_atm_rel_store(&args.stop, 1);
server3.Join();
{
grpc_core::ExecCtx exec_ctx;
grpc_pollset_shutdown(
args.pollset, GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset,
grpc_schedule_on_exec_ctx));
}
}
grpc_shutdown();
@ -289,7 +292,7 @@ int run_concurrent_watches_with_short_timeouts_test() {
for (auto& th : threads) {
new (&th) grpc_core::Thread("grpc_short_watches",
watches_with_short_timeouts, localhost);
watches_with_short_timeouts, localhost);
th.Start();
}
for (auto& th : threads) {

@ -35,8 +35,8 @@
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "test/core/util/port.h"

Loading…
Cancel
Save