[iomgr] Remove executor/...

I've tried this before in #27445 and we found some internal usage of this code. Today I can find no such usage, so let's try again.
pull/30251/head
Craig Tiller 3 years ago committed by GitHub
parent b5966f39eb
commit 1b5295a4a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      BUILD
  2. 60
      CMakeLists.txt
  3. 4
      Makefile
  4. 26
      build_autogenerated.yaml
  5. 3
      config.m4
  6. 3
      config.w32
  7. 4
      gRPC-C++.podspec
  8. 6
      gRPC-Core.podspec
  9. 4
      grpc.gemspec
  10. 4
      grpc.gyp
  11. 4
      package.xml
  12. 182
      src/core/lib/iomgr/executor/mpmcqueue.cc
  13. 171
      src/core/lib/iomgr/executor/mpmcqueue.h
  14. 136
      src/core/lib/iomgr/executor/threadpool.cc
  15. 150
      src/core/lib/iomgr/executor/threadpool.h
  16. 2
      src/python/grpcio/grpc_core_dependencies.py
  17. 26
      test/core/iomgr/BUILD
  18. 227
      test/core/iomgr/mpmcqueue_test.cc
  19. 189
      test/core/iomgr/threadpool_test.cc
  20. 15
      test/cpp/microbenchmarks/BUILD
  21. 331
      test/cpp/microbenchmarks/bm_threadpool.cc
  22. 4
      tools/doxygen/Doxyfile.c++.internal
  23. 4
      tools/doxygen/Doxyfile.core.internal
  24. 48
      tools/run_tests/generated/tests.json

@ -2538,8 +2538,6 @@ grpc_cc_library(
"src/core/lib/iomgr/ev_poll_posix.cc",
"src/core/lib/iomgr/ev_posix.cc",
"src/core/lib/iomgr/ev_windows.cc",
"src/core/lib/iomgr/executor/mpmcqueue.cc",
"src/core/lib/iomgr/executor/threadpool.cc",
"src/core/lib/iomgr/fork_posix.cc",
"src/core/lib/iomgr/fork_windows.cc",
"src/core/lib/iomgr/gethostname_fallback.cc",
@ -2654,8 +2652,6 @@ grpc_cc_library(
"src/core/lib/iomgr/ev_epoll1_linux.h",
"src/core/lib/iomgr/ev_poll_posix.h",
"src/core/lib/iomgr/ev_posix.h",
"src/core/lib/iomgr/executor/mpmcqueue.h",
"src/core/lib/iomgr/executor/threadpool.h",
"src/core/lib/iomgr/gethostname.h",
"src/core/lib/iomgr/grpc_if_nametoindex.h",
"src/core/lib/iomgr/internal_errqueue.h",

60
CMakeLists.txt generated

@ -828,7 +828,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_c load_file_test)
add_dependencies(buildtests_c message_compress_test)
add_dependencies(buildtests_c minimal_stack_is_minimal_test)
add_dependencies(buildtests_c mpmcqueue_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c mpscq_test)
endif()
@ -867,7 +866,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_c test_core_iomgr_timer_heap_test)
add_dependencies(buildtests_c test_core_iomgr_timer_list_test)
add_dependencies(buildtests_c thd_test)
add_dependencies(buildtests_c threadpool_test)
add_dependencies(buildtests_c varint_test)
add_custom_target(buildtests_cxx)
@ -2102,8 +2100,6 @@ add_library(grpc
src/core/lib/iomgr/ev_windows.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/executor/mpmcqueue.cc
src/core/lib/iomgr/executor/threadpool.cc
src/core/lib/iomgr/fork_posix.cc
src/core/lib/iomgr/fork_windows.cc
src/core/lib/iomgr/gethostname_fallback.cc
@ -2732,8 +2728,6 @@ add_library(grpc_unsecure
src/core/lib/iomgr/ev_windows.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/executor/mpmcqueue.cc
src/core/lib/iomgr/executor/threadpool.cc
src/core/lib/iomgr/fork_posix.cc
src/core/lib/iomgr/fork_windows.cc
src/core/lib/iomgr/gethostname_fallback.cc
@ -5264,33 +5258,6 @@ target_link_libraries(minimal_stack_is_minimal_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(mpmcqueue_test
test/core/iomgr/mpmcqueue_test.cc
)
target_include_directories(mpmcqueue_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(mpmcqueue_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@ -5965,33 +5932,6 @@ target_link_libraries(thd_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(threadpool_test
test/core/iomgr/threadpool_test.cc
)
target_include_directories(threadpool_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(threadpool_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)

4
Makefile generated

@ -1484,8 +1484,6 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname_fallback.cc \
@ -1954,8 +1952,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname_fallback.cc \

@ -793,8 +793,6 @@ libs:
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/executor/mpmcqueue.h
- src/core/lib/iomgr/executor/threadpool.h
- src/core/lib/iomgr/gethostname.h
- src/core/lib/iomgr/grpc_if_nametoindex.h
- src/core/lib/iomgr/internal_errqueue.h
@ -1463,8 +1461,6 @@ libs:
- src/core/lib/iomgr/ev_windows.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/executor/mpmcqueue.cc
- src/core/lib/iomgr/executor/threadpool.cc
- src/core/lib/iomgr/fork_posix.cc
- src/core/lib/iomgr/fork_windows.cc
- src/core/lib/iomgr/gethostname_fallback.cc
@ -2011,8 +2007,6 @@ libs:
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/executor/mpmcqueue.h
- src/core/lib/iomgr/executor/threadpool.h
- src/core/lib/iomgr/gethostname.h
- src/core/lib/iomgr/grpc_if_nametoindex.h
- src/core/lib/iomgr/internal_errqueue.h
@ -2329,8 +2323,6 @@ libs:
- src/core/lib/iomgr/ev_windows.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/executor/mpmcqueue.cc
- src/core/lib/iomgr/executor/threadpool.cc
- src/core/lib/iomgr/fork_posix.cc
- src/core/lib/iomgr/fork_windows.cc
- src/core/lib/iomgr/gethostname_fallback.cc
@ -3565,15 +3557,6 @@ targets:
deps:
- grpc_test_util
uses_polling: false
- name: mpmcqueue_test
build: test
language: c
headers: []
src:
- test/core/iomgr/mpmcqueue_test.cc
deps:
- grpc_test_util
uses_polling: false
- name: mpscq_test
build: test
language: c
@ -3834,15 +3817,6 @@ targets:
deps:
- grpc_test_util
uses_polling: false
- name: threadpool_test
build: test
language: c
headers: []
src:
- test/core/iomgr/threadpool_test.cc
deps:
- grpc_test_util
uses_polling: false
- name: varint_test
build: test
language: c

3
config.m4 generated

@ -543,8 +543,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname_fallback.cc \
@ -1323,7 +1321,6 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/gprpp)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/http)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/iomgr)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/iomgr/executor)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/json)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/matchers)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/profiling)

3
config.w32 generated

@ -509,8 +509,6 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\iomgr\\ev_windows.cc " +
"src\\core\\lib\\iomgr\\exec_ctx.cc " +
"src\\core\\lib\\iomgr\\executor.cc " +
"src\\core\\lib\\iomgr\\executor\\mpmcqueue.cc " +
"src\\core\\lib\\iomgr\\executor\\threadpool.cc " +
"src\\core\\lib\\iomgr\\fork_posix.cc " +
"src\\core\\lib\\iomgr\\fork_windows.cc " +
"src\\core\\lib\\iomgr\\gethostname_fallback.cc " +
@ -1445,7 +1443,6 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\gprpp");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\http");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\iomgr");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\iomgr\\executor");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\json");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\matchers");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\profiling");

4
gRPC-C++.podspec generated

@ -751,8 +751,6 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/executor/mpmcqueue.h',
'src/core/lib/iomgr/executor/threadpool.h',
'src/core/lib/iomgr/gethostname.h',
'src/core/lib/iomgr/grpc_if_nametoindex.h',
'src/core/lib/iomgr/internal_errqueue.h',
@ -1591,8 +1589,6 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/executor/mpmcqueue.h',
'src/core/lib/iomgr/executor/threadpool.h',
'src/core/lib/iomgr/gethostname.h',
'src/core/lib/iomgr/grpc_if_nametoindex.h',
'src/core/lib/iomgr/internal_errqueue.h',

6
gRPC-Core.podspec generated

@ -1192,10 +1192,6 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/executor/mpmcqueue.cc',
'src/core/lib/iomgr/executor/mpmcqueue.h',
'src/core/lib/iomgr/executor/threadpool.cc',
'src/core/lib/iomgr/executor/threadpool.h',
'src/core/lib/iomgr/fork_posix.cc',
'src/core/lib/iomgr/fork_windows.cc',
'src/core/lib/iomgr/gethostname.h',
@ -2203,8 +2199,6 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/executor/mpmcqueue.h',
'src/core/lib/iomgr/executor/threadpool.h',
'src/core/lib/iomgr/gethostname.h',
'src/core/lib/iomgr/grpc_if_nametoindex.h',
'src/core/lib/iomgr/internal_errqueue.h',

4
grpc.gemspec generated

@ -1106,10 +1106,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/exec_ctx.h )
s.files += %w( src/core/lib/iomgr/executor.cc )
s.files += %w( src/core/lib/iomgr/executor.h )
s.files += %w( src/core/lib/iomgr/executor/mpmcqueue.cc )
s.files += %w( src/core/lib/iomgr/executor/mpmcqueue.h )
s.files += %w( src/core/lib/iomgr/executor/threadpool.cc )
s.files += %w( src/core/lib/iomgr/executor/threadpool.h )
s.files += %w( src/core/lib/iomgr/fork_posix.cc )
s.files += %w( src/core/lib/iomgr/fork_windows.cc )
s.files += %w( src/core/lib/iomgr/gethostname.h )

4
grpc.gyp generated

@ -834,8 +834,6 @@
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/executor/mpmcqueue.cc',
'src/core/lib/iomgr/executor/threadpool.cc',
'src/core/lib/iomgr/fork_posix.cc',
'src/core/lib/iomgr/fork_windows.cc',
'src/core/lib/iomgr/gethostname_fallback.cc',
@ -1296,8 +1294,6 @@
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/executor/mpmcqueue.cc',
'src/core/lib/iomgr/executor/threadpool.cc',
'src/core/lib/iomgr/fork_posix.cc',
'src/core/lib/iomgr/fork_windows.cc',
'src/core/lib/iomgr/gethostname_fallback.cc',

4
package.xml generated

@ -1088,10 +1088,6 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor/mpmcqueue.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor/mpmcqueue.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor/threadpool.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor/threadpool.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/fork_posix.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/fork_windows.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/gethostname.h" role="src" />

@ -1,182 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/executor/mpmcqueue.h"
namespace grpc_core {
DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool");
inline void* InfLenFIFOQueue::PopFront() {
// Caller should already check queue is not empty and has already held the
// mutex. This function will assume that there is at least one element in the
// queue (i.e. queue_head_->content is valid).
void* result = queue_head_->content;
count_.store(count_.load(std::memory_order_relaxed) - 1,
std::memory_order_relaxed);
// Updates Stats when trace flag turned on.
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
gpr_timespec wait_time =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), queue_head_->insert_time);
stats_.num_completed++;
stats_.total_queue_time = gpr_time_add(stats_.total_queue_time, wait_time);
stats_.max_queue_time = gpr_time_max(
gpr_convert_clock_type(stats_.max_queue_time, GPR_TIMESPAN), wait_time);
if (count_.load(std::memory_order_relaxed) == 0) {
stats_.busy_queue_time =
gpr_time_add(stats_.busy_queue_time,
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), busy_time));
}
gpr_log(GPR_INFO,
"[InfLenFIFOQueue PopFront] num_completed: %" PRIu64
" total_queue_time: %f max_queue_time: %f busy_queue_time: %f",
stats_.num_completed,
gpr_timespec_to_micros(stats_.total_queue_time),
gpr_timespec_to_micros(stats_.max_queue_time),
gpr_timespec_to_micros(stats_.busy_queue_time));
}
queue_head_ = queue_head_->next;
// Signal waiting thread
if (count_.load(std::memory_order_relaxed) > 0) {
TopWaiter()->cv.Signal();
}
return result;
}
InfLenFIFOQueue::Node* InfLenFIFOQueue::AllocateNodes(int num) {
num_nodes_ = num_nodes_ + num;
Node* new_chunk = new Node[num];
new_chunk[0].next = &new_chunk[1];
new_chunk[num - 1].prev = &new_chunk[num - 2];
for (int i = 1; i < num - 1; ++i) {
new_chunk[i].prev = &new_chunk[i - 1];
new_chunk[i].next = &new_chunk[i + 1];
}
return new_chunk;
}
InfLenFIFOQueue::InfLenFIFOQueue() {
delete_list_size_ = kDeleteListInitSize;
delete_list_ = new Node*[delete_list_size_];
Node* new_chunk = AllocateNodes(kQueueInitNumNodes);
delete_list_[delete_list_count_++] = new_chunk;
queue_head_ = queue_tail_ = new_chunk;
new_chunk[0].prev = &new_chunk[kQueueInitNumNodes - 1];
new_chunk[kQueueInitNumNodes - 1].next = &new_chunk[0];
waiters_.next = &waiters_;
waiters_.prev = &waiters_;
}
InfLenFIFOQueue::~InfLenFIFOQueue() {
GPR_ASSERT(count_.load(std::memory_order_relaxed) == 0);
for (size_t i = 0; i < delete_list_count_; ++i) {
delete[] delete_list_[i];
}
delete[] delete_list_;
}
void InfLenFIFOQueue::Put(void* elem) {
MutexLock l(&mu_);
int curr_count = count_.load(std::memory_order_relaxed);
if (queue_tail_ == queue_head_ && curr_count != 0) {
// List is full. Expands list to double size by inserting new chunk of nodes
Node* new_chunk = AllocateNodes(curr_count);
delete_list_[delete_list_count_++] = new_chunk;
// Expands delete list on full.
if (delete_list_count_ == delete_list_size_) {
delete_list_size_ = delete_list_size_ * 2;
delete_list_ = new Node*[delete_list_size_];
}
new_chunk[0].prev = queue_tail_->prev;
new_chunk[curr_count - 1].next = queue_head_;
queue_tail_->prev->next = new_chunk;
queue_head_->prev = &new_chunk[curr_count - 1];
queue_tail_ = new_chunk;
}
queue_tail_->content = static_cast<void*>(elem);
// Updates Stats info
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
stats_.num_started++;
gpr_log(GPR_INFO, "[InfLenFIFOQueue Put] num_started: %" PRIu64,
stats_.num_started);
auto current_time = gpr_now(GPR_CLOCK_MONOTONIC);
if (curr_count == 0) {
busy_time = current_time;
}
queue_tail_->insert_time = current_time;
}
count_.store(curr_count + 1, std::memory_order_relaxed);
queue_tail_ = queue_tail_->next;
TopWaiter()->cv.Signal();
}
void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
MutexLock l(&mu_);
if (count_.load(std::memory_order_relaxed) == 0) {
gpr_timespec start_time;
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
wait_time != nullptr) {
start_time = gpr_now(GPR_CLOCK_MONOTONIC);
}
Waiter self;
PushWaiter(&self);
do {
self.cv.Wait(&mu_);
} while (count_.load(std::memory_order_relaxed) == 0);
RemoveWaiter(&self);
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
wait_time != nullptr) {
*wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time);
}
}
GPR_DEBUG_ASSERT(count_.load(std::memory_order_relaxed) > 0);
return PopFront();
}
void InfLenFIFOQueue::PushWaiter(Waiter* waiter) {
waiter->next = waiters_.next;
waiter->prev = &waiters_;
waiter->next->prev = waiter;
waiter->prev->next = waiter;
}
void InfLenFIFOQueue::RemoveWaiter(Waiter* waiter) {
GPR_DEBUG_ASSERT(waiter != &waiters_);
waiter->next->prev = waiter->prev;
waiter->prev->next = waiter->next;
}
InfLenFIFOQueue::Waiter* InfLenFIFOQueue::TopWaiter() { return waiters_.next; }
} // namespace grpc_core

@ -1,171 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
#define GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H
#include <grpc/support/port_platform.h>
#include <atomic>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_core {
extern DebugOnlyTraceFlag grpc_thread_pool_trace;
// Abstract base class of a Multiple-Producer-Multiple-Consumer(MPMC) queue
// interface
class MPMCQueueInterface {
public:
virtual ~MPMCQueueInterface() {}
// Puts elem into queue immediately at the end of queue.
// This might cause to block on full queue depending on implementation.
virtual void Put(void* elem) = 0;
// Removes the oldest element from the queue and return it.
// This might cause to block on empty queue depending on implementation.
// Optional argument for collecting stats purpose.
virtual void* Get(gpr_timespec* wait_time) = 0;
// Returns number of elements in the queue currently
virtual int count() const = 0;
};
class InfLenFIFOQueue : public MPMCQueueInterface {
public:
// Creates a new MPMC Queue. The queue created will have infinite length.
InfLenFIFOQueue();
// Releases all resources held by the queue. The queue must be empty, and no
// one waits on conditional variables.
~InfLenFIFOQueue() override;
// Puts elem into queue immediately at the end of queue. Since the queue has
// infinite length, this routine will never block and should never fail.
void Put(void* elem) override;
// Removes the oldest element from the queue and returns it.
// This routine will cause the thread to block if queue is currently empty.
// Argument wait_time should be passed in when trace flag turning on (for
// collecting stats info purpose.)
void* Get(gpr_timespec* wait_time) override;
// Returns number of elements in queue currently.
// There might be concurrently add/remove on queue, so count might change
// quickly.
int count() const override { return count_.load(std::memory_order_relaxed); }
struct Node {
Node* next = nullptr; // Linking
Node* prev = nullptr;
void* content = nullptr; // Points to actual element
gpr_timespec insert_time; // Time for stats
};
// For test purpose only. Returns number of nodes allocated in queue.
// Any allocated node will be alive until the destruction of the queue.
int num_nodes() const { return num_nodes_; }
// For test purpose only. Returns the initial number of nodes in queue.
int init_num_nodes() const { return kQueueInitNumNodes; }
private:
// For Internal Use Only.
// Removes the oldest element from the queue and returns it. This routine
// will NOT check whether queue is empty, and it will NOT acquire mutex.
// Caller MUST check that queue is not empty and must acquire mutex before
// callling.
void* PopFront();
// Stats of queue. This will only be collect when debug trace mode is on.
// All printed stats info will have time measurement in microsecond.
struct Stats {
uint64_t num_started; // Number of elements have been added to queue
uint64_t num_completed; // Number of elements have been removed from
// the queue
gpr_timespec total_queue_time; // Total waiting time that all the
// removed elements have spent in queue
gpr_timespec max_queue_time; // Max waiting time among all removed
// elements
gpr_timespec busy_queue_time; // Accumulated amount of time that queue
// was not empty
Stats() {
num_started = 0;
num_completed = 0;
total_queue_time = gpr_time_0(GPR_TIMESPAN);
max_queue_time = gpr_time_0(GPR_TIMESPAN);
busy_queue_time = gpr_time_0(GPR_TIMESPAN);
}
};
// Node for waiting thread queue. Stands for one waiting thread, should have
// exact one thread waiting on its CondVar.
// Using a doubly linked list for waiting thread queue to wake up waiting
// threads in LIFO order to reduce cache misses.
struct Waiter {
CondVar cv;
Waiter* next;
Waiter* prev;
};
// Pushs waiter to the front of queue, require caller held mutex
void PushWaiter(Waiter* waiter);
// Removes waiter from queue, require caller held mutex
void RemoveWaiter(Waiter* waiter);
// Returns pointer to the waiter that should be waken up next, should be the
// last added waiter.
Waiter* TopWaiter();
Mutex mu_; // Protecting lock
Waiter waiters_; // Head of waiting thread queue
// Initial size for delete list
static const int kDeleteListInitSize = 1024;
// Initial number of nodes allocated
static const int kQueueInitNumNodes = 1024;
Node** delete_list_ = nullptr; // Keeps track of all allocated array entries
// for deleting on destruction
size_t delete_list_count_ = 0; // Number of entries in list
size_t delete_list_size_ = 0; // Size of the list. List will be expanded to
// double size on full
Node* queue_head_ = nullptr; // Head of the queue, remove position
Node* queue_tail_ = nullptr; // End of queue, insert position
std::atomic<int> count_{0}; // Number of elements in queue
int num_nodes_ = 0; // Number of nodes allocated
Stats stats_; // Stats info
gpr_timespec busy_time; // Start time of busy queue
// Internal Helper.
// Allocates an array of nodes of size "num", links all nodes together except
// the first node's prev and last node's next. They should be set by caller
// manually afterward.
Node* AllocateNodes(int num);
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H */

@ -1,136 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/executor/threadpool.h"
namespace grpc_core {
void ThreadPoolWorker::Run() {
while (true) {
void* elem;
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
// Updates stats and print
gpr_timespec wait_time = gpr_time_0(GPR_TIMESPAN);
elem = queue_->Get(&wait_time);
stats_.sleep_time = gpr_time_add(stats_.sleep_time, wait_time);
gpr_log(GPR_INFO,
"ThreadPool Worker [%s %d] Stats: sleep_time %f",
thd_name_, index_, gpr_timespec_to_micros(stats_.sleep_time));
} else {
elem = queue_->Get(nullptr);
}
if (elem == nullptr) {
break;
}
// Runs closure
auto* closure = static_cast<grpc_completion_queue_functor*>(elem);
closure->functor_run(closure, closure->internal_success);
}
}
void ThreadPool::SharedThreadPoolConstructor() {
// All worker threads in thread pool must be joinable.
thread_options_.set_joinable(true);
// Create at least 1 worker thread.
if (num_threads_ <= 0) num_threads_ = 1;
queue_ = new InfLenFIFOQueue();
threads_ = static_cast<ThreadPoolWorker**>(
gpr_zalloc(num_threads_ * sizeof(ThreadPoolWorker*)));
for (int i = 0; i < num_threads_; ++i) {
threads_[i] = new ThreadPoolWorker(thd_name_, queue_, thread_options_, i);
threads_[i]->Start();
}
}
size_t ThreadPool::DefaultStackSize() {
#if defined(__ANDROID__) || defined(__APPLE__)
return 1952 * 1024;
#else
return 64 * 1024;
#endif
}
void ThreadPool::AssertHasNotBeenShutDown() {
// For debug checking purpose, using RELAXED order is sufficient.
GPR_DEBUG_ASSERT(!shut_down_.load(std::memory_order_relaxed));
}
ThreadPool::ThreadPool(int num_threads) : num_threads_(num_threads) {
thd_name_ = "ThreadPoolWorker";
thread_options_ = Thread::Options();
thread_options_.set_stack_size(DefaultStackSize());
SharedThreadPoolConstructor();
}
ThreadPool::ThreadPool(int num_threads, const char* thd_name)
: num_threads_(num_threads), thd_name_(thd_name) {
thread_options_ = Thread::Options();
thread_options_.set_stack_size(DefaultStackSize());
SharedThreadPoolConstructor();
}
ThreadPool::ThreadPool(int num_threads, const char* thd_name,
const Thread::Options& thread_options)
: num_threads_(num_threads),
thd_name_(thd_name),
thread_options_(thread_options) {
if (thread_options_.stack_size() == 0) {
thread_options_.set_stack_size(DefaultStackSize());
}
SharedThreadPoolConstructor();
}
ThreadPool::~ThreadPool() {
// For debug checking purpose, using RELAXED order is sufficient.
shut_down_.store(true, std::memory_order_relaxed);
for (int i = 0; i < num_threads_; ++i) {
queue_->Put(nullptr);
}
for (int i = 0; i < num_threads_; ++i) {
threads_[i]->Join();
}
for (int i = 0; i < num_threads_; ++i) {
delete threads_[i];
}
gpr_free(threads_);
delete queue_;
}
void ThreadPool::Add(grpc_completion_queue_functor* closure) {
AssertHasNotBeenShutDown();
queue_->Put(static_cast<void*>(closure));
}
int ThreadPool::num_pending_closures() const { return queue_->count(); }
int ThreadPool::pool_capacity() const { return num_threads_; }
const Thread::Options& ThreadPool::thread_options() const {
return thread_options_;
}
const char* ThreadPool::thread_name() const { return thd_name_; }
} // namespace grpc_core

@ -1,150 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
#define GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/executor/mpmcqueue.h"
namespace grpc_core {
// A base abstract base class for threadpool.
// Threadpool is an executor that maintains a pool of threads sitting around
// and waiting for closures. A threadpool also maintains a queue of pending
// closures, when closures appearing in the queue, the threads in pool will
// pull them out and execute them.
class ThreadPoolInterface {
public:
// Waits for all pending closures to complete, then shuts down thread pool.
virtual ~ThreadPoolInterface() {}
// Schedules a given closure for execution later.
// Depending on specific subclass implementation, this routine might cause
// current thread to be blocked (in case of unable to schedule).
// Closure should contain a function pointer and arguments it will take, more
// details for closure struct at /grpc/include/grpc/impl/codegen/grpc_types.h
virtual void Add(grpc_completion_queue_functor* closure) = 0;
// Returns the current number of pending closures
virtual int num_pending_closures() const = 0;
// Returns the capacity of pool (number of worker threads in pool)
virtual int pool_capacity() const = 0;
// Thread option accessor
virtual const Thread::Options& thread_options() const = 0;
// Returns the thread name for threads in this ThreadPool.
virtual const char* thread_name() const = 0;
};
// Worker thread for threadpool. Executes closures in the queue, until getting a
// NULL closure.
class ThreadPoolWorker {
public:
ThreadPoolWorker(const char* thd_name, MPMCQueueInterface* queue,
Thread::Options& options, int index)
: queue_(queue), thd_name_(thd_name), index_(index) {
thd_ = Thread(
thd_name, [](void* th) { static_cast<ThreadPoolWorker*>(th)->Run(); },
this, nullptr, options);
}
~ThreadPoolWorker() {}
void Start() { thd_.Start(); }
void Join() { thd_.Join(); }
private:
// struct for tracking stats of thread
struct Stats {
gpr_timespec sleep_time;
Stats() { sleep_time = gpr_time_0(GPR_TIMESPAN); }
};
void Run(); // Pulls closures from queue and executes them
MPMCQueueInterface* queue_; // Queue in thread pool to pull closures from
Thread thd_; // Thread wrapped in
Stats stats_; // Stats to be collected in run time
const char* thd_name_; // Name of thread
int index_; // Index in thread pool
};
// A fixed size thread pool implementation of abstract thread pool interface.
// In this implementation, the number of threads in pool is fixed, but the
// capacity of closure queue is unlimited.
class ThreadPool : public ThreadPoolInterface {
public:
// Creates a thread pool with size of "num_threads", with default thread name
// "ThreadPoolWorker" and all thread options set to default. If the given size
// is 0 or less, there will be 1 worker thread created inside pool.
explicit ThreadPool(int num_threads);
// Same as ThreadPool(int num_threads) constructor, except
// that it also sets "thd_name" as the name of all threads in the thread pool.
ThreadPool(int num_threads, const char* thd_name);
// Same as ThreadPool(const char *thd_name, int num_threads) constructor,
// except that is also set thread_options for threads.
// Notes for stack size:
// If the stack size field of the passed in Thread::Options is set to default
// value 0, default ThreadPool stack size will be used. The current default
// stack size of this implementation is 1952K for mobile platform and 64K for
// all others.
ThreadPool(int num_threads, const char* thd_name,
const Thread::Options& thread_options);
// Waits for all pending closures to complete, then shuts down thread pool.
~ThreadPool() override;
// Adds given closure into pending queue immediately. Since closure queue has
// infinite length, this routine will not block.
void Add(grpc_completion_queue_functor* closure) override;
int num_pending_closures() const override;
int pool_capacity() const override;
const Thread::Options& thread_options() const override;
const char* thread_name() const override;
private:
int num_threads_ = 0;
const char* thd_name_ = nullptr;
Thread::Options thread_options_;
ThreadPoolWorker** threads_ = nullptr; // Array of worker threads
MPMCQueueInterface* queue_ = nullptr; // Closure queue
std::atomic<bool> shut_down_{
false}; // Destructor has been called if set to true
void SharedThreadPoolConstructor();
// For ThreadPool, default stack size for mobile platform is 1952K. for other
// platforms is 64K.
size_t DefaultStackSize();
// Internal Use Only for debug checking.
void AssertHasNotBeenShutDown();
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H */

@ -518,8 +518,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/executor/mpmcqueue.cc',
'src/core/lib/iomgr/executor/threadpool.cc',
'src/core/lib/iomgr/fork_posix.cc',
'src/core/lib/iomgr/fork_windows.cc',
'src/core/lib/iomgr/gethostname_fallback.cc',

@ -146,19 +146,6 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "mpmcqueue_test",
srcs = ["mpmcqueue_test.cc"],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//:gpr",
"//:grpc",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "resolve_address_using_ares_resolver_posix_test",
srcs = ["resolve_address_posix_test.cc"],
@ -292,19 +279,6 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "threadpool_test",
srcs = ["threadpool_test.cc"],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//:gpr",
"//:grpc",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "time_averaged_stats_test",
srcs = ["time_averaged_stats_test.cc"],

@ -1,227 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/lib/iomgr/executor/mpmcqueue.h"
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/thd.h"
#include "test/core/util/test_config.h"
#define TEST_NUM_ITEMS 10000
// Testing items for queue
struct WorkItem {
int index;
bool done;
explicit WorkItem(int i) : index(i) { done = false; }
};
// Thread to "produce" items and put items into queue
// It will also check that all items has been marked done and clean up all
// produced items on destructing.
class ProducerThread {
public:
ProducerThread(grpc_core::InfLenFIFOQueue* queue, int start_index,
int num_items)
: start_index_(start_index), num_items_(num_items), queue_(queue) {
items_ = nullptr;
thd_ = grpc_core::Thread(
"mpmcq_test_producer_thd",
[](void* th) { static_cast<ProducerThread*>(th)->Run(); }, this);
}
~ProducerThread() {
for (int i = 0; i < num_items_; ++i) {
GPR_ASSERT(items_[i]->done);
delete items_[i];
}
delete[] items_;
}
void Start() { thd_.Start(); }
void Join() { thd_.Join(); }
private:
void Run() {
items_ = new WorkItem*[num_items_];
for (int i = 0; i < num_items_; ++i) {
items_[i] = new WorkItem(start_index_ + i);
queue_->Put(items_[i]);
}
}
int start_index_;
int num_items_;
grpc_core::InfLenFIFOQueue* queue_;
grpc_core::Thread thd_;
WorkItem** items_;
};
// Thread to pull out items from queue
class ConsumerThread {
public:
explicit ConsumerThread(grpc_core::InfLenFIFOQueue* queue) : queue_(queue) {
thd_ = grpc_core::Thread(
"mpmcq_test_consumer_thd",
[](void* th) { static_cast<ConsumerThread*>(th)->Run(); }, this);
}
~ConsumerThread() {}
void Start() { thd_.Start(); }
void Join() { thd_.Join(); }
private:
void Run() {
// count number of Get() called in this thread
int count = 0;
WorkItem* item;
while ((item = static_cast<WorkItem*>(queue_->Get(nullptr))) != nullptr) {
count++;
GPR_ASSERT(!item->done);
item->done = true;
}
gpr_log(GPR_DEBUG, "ConsumerThread: %d times of Get() called.", count);
}
grpc_core::InfLenFIFOQueue* queue_;
grpc_core::Thread thd_;
};
static void test_FIFO(void) {
gpr_log(GPR_INFO, "test_FIFO");
grpc_core::InfLenFIFOQueue large_queue;
for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
large_queue.Put(static_cast<void*>(new WorkItem(i)));
}
GPR_ASSERT(large_queue.count() == TEST_NUM_ITEMS);
for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
WorkItem* item = static_cast<WorkItem*>(large_queue.Get(nullptr));
GPR_ASSERT(i == item->index);
delete item;
}
}
// Test if queue's behavior of expanding is correct. (Only does expansion when
// it gets full, and each time expands to doubled size).
static void test_space_efficiency(void) {
gpr_log(GPR_INFO, "test_space_efficiency");
grpc_core::InfLenFIFOQueue queue;
for (int i = 0; i < queue.init_num_nodes(); ++i) {
queue.Put(static_cast<void*>(new WorkItem(i)));
}
// Queue should not have been expanded at this time.
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
for (int i = 0; i < queue.init_num_nodes(); ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get(nullptr));
queue.Put(item);
}
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
for (int i = 0; i < queue.init_num_nodes(); ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get(nullptr));
delete item;
}
// Queue never shrinks even it is empty.
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
GPR_ASSERT(queue.count() == 0);
// queue empty now
for (int i = 0; i < queue.init_num_nodes() * 2; ++i) {
queue.Put(static_cast<void*>(new WorkItem(i)));
}
GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2);
// Queue should have been expanded once.
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2);
for (int i = 0; i < queue.init_num_nodes(); ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get(nullptr));
delete item;
}
GPR_ASSERT(queue.count() == queue.init_num_nodes());
// Queue will never shrink, should keep same number of node as before.
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2);
for (int i = 0; i < queue.init_num_nodes() + 1; ++i) {
queue.Put(static_cast<void*>(new WorkItem(i)));
}
GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2 + 1);
// Queue should have been expanded twice.
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4);
for (int i = 0; i < queue.init_num_nodes() * 2 + 1; ++i) {
WorkItem* item = static_cast<WorkItem*>(queue.Get(nullptr));
delete item;
}
GPR_ASSERT(queue.count() == 0);
GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4);
gpr_log(GPR_DEBUG, "Done.");
}
static void test_many_thread(void) {
gpr_log(GPR_INFO, "test_many_thread");
const int num_producer_threads = 10;
const int num_consumer_threads = 20;
grpc_core::InfLenFIFOQueue queue;
ProducerThread** producer_threads = new ProducerThread*[num_producer_threads];
ConsumerThread** consumer_threads = new ConsumerThread*[num_consumer_threads];
gpr_log(GPR_DEBUG, "Fork ProducerThreads...");
for (int i = 0; i < num_producer_threads; ++i) {
producer_threads[i] =
new ProducerThread(&queue, i * TEST_NUM_ITEMS, TEST_NUM_ITEMS);
producer_threads[i]->Start();
}
gpr_log(GPR_DEBUG, "ProducerThreads Started.");
gpr_log(GPR_DEBUG, "Fork ConsumerThreads...");
for (int i = 0; i < num_consumer_threads; ++i) {
consumer_threads[i] = new ConsumerThread(&queue);
consumer_threads[i]->Start();
}
gpr_log(GPR_DEBUG, "ConsumerThreads Started.");
gpr_log(GPR_DEBUG, "Waiting ProducerThreads to finish...");
for (int i = 0; i < num_producer_threads; ++i) {
producer_threads[i]->Join();
}
gpr_log(GPR_DEBUG, "All ProducerThreads Terminated.");
gpr_log(GPR_DEBUG, "Terminating ConsumerThreads...");
for (int i = 0; i < num_consumer_threads; ++i) {
queue.Put(nullptr);
}
for (int i = 0; i < num_consumer_threads; ++i) {
consumer_threads[i]->Join();
}
gpr_log(GPR_DEBUG, "All ConsumerThreads Terminated.");
gpr_log(GPR_DEBUG, "Checking WorkItems and Cleaning Up...");
for (int i = 0; i < num_producer_threads; ++i) {
// Destructor of ProducerThread will do the check of WorkItems
delete producer_threads[i];
}
delete[] producer_threads;
for (int i = 0; i < num_consumer_threads; ++i) {
delete consumer_threads[i];
}
delete[] consumer_threads;
gpr_log(GPR_DEBUG, "Done.");
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
test_FIFO();
test_space_efficiency();
test_many_thread();
grpc_shutdown();
return 0;
}

@ -1,189 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/lib/iomgr/executor/threadpool.h"
#include "test/core/util/test_config.h"
static const int kSmallThreadPoolSize = 20;
static const int kLargeThreadPoolSize = 100;
static const int kThreadSmallIter = 100;
static const int kThreadLargeIter = 10000;
static void test_size_zero(void) {
gpr_log(GPR_INFO, "test_size_zero");
grpc_core::ThreadPool* pool_size_zero = new grpc_core::ThreadPool(0);
GPR_ASSERT(pool_size_zero->pool_capacity() == 1);
delete pool_size_zero;
}
static void test_constructor_option(void) {
gpr_log(GPR_INFO, "test_constructor_option");
// Tests options
grpc_core::Thread::Options options;
options.set_stack_size(192 * 1024); // Random non-default value
grpc_core::ThreadPool* pool =
new grpc_core::ThreadPool(0, "test_constructor_option", options);
GPR_ASSERT(pool->thread_options().stack_size() == options.stack_size());
delete pool;
}
// Simple functor for testing. It will count how many times being called.
class SimpleFunctorForAdd : public grpc_completion_queue_functor {
public:
friend class SimpleFunctorCheckForAdd;
SimpleFunctorForAdd() {
functor_run = &SimpleFunctorForAdd::Run;
inlineable = true;
internal_next = this;
internal_success = 0;
}
~SimpleFunctorForAdd() {}
static void Run(struct grpc_completion_queue_functor* cb, int /*ok*/) {
auto* callback = static_cast<SimpleFunctorForAdd*>(cb);
callback->count_.fetch_add(1, std::memory_order_relaxed);
}
int count() { return count_.load(std::memory_order_relaxed); }
private:
std::atomic<int> count_{0};
};
static void test_add(void) {
gpr_log(GPR_INFO, "test_add");
grpc_core::ThreadPool* pool =
new grpc_core::ThreadPool(kSmallThreadPoolSize, "test_add");
SimpleFunctorForAdd* functor = new SimpleFunctorForAdd();
for (int i = 0; i < kThreadSmallIter; ++i) {
pool->Add(functor);
}
delete pool;
GPR_ASSERT(functor->count() == kThreadSmallIter);
delete functor;
gpr_log(GPR_DEBUG, "Done.");
}
// Thread that adds closures to pool
class WorkThread {
public:
WorkThread(grpc_core::ThreadPool* pool, SimpleFunctorForAdd* cb, int num_add)
: num_add_(num_add), cb_(cb), pool_(pool) {
thd_ = grpc_core::Thread(
"thread_pool_test_add_thd",
[](void* th) { static_cast<WorkThread*>(th)->Run(); }, this);
}
~WorkThread() {}
void Start() { thd_.Start(); }
void Join() { thd_.Join(); }
private:
void Run() {
for (int i = 0; i < num_add_; ++i) {
pool_->Add(cb_);
}
}
int num_add_;
SimpleFunctorForAdd* cb_;
grpc_core::ThreadPool* pool_;
grpc_core::Thread thd_;
};
static void test_multi_add(void) {
gpr_log(GPR_INFO, "test_multi_add");
const int num_work_thds = 10;
grpc_core::ThreadPool* pool =
new grpc_core::ThreadPool(kLargeThreadPoolSize, "test_multi_add");
SimpleFunctorForAdd* functor = new SimpleFunctorForAdd();
WorkThread** work_thds = static_cast<WorkThread**>(
gpr_zalloc(sizeof(WorkThread*) * num_work_thds));
gpr_log(GPR_DEBUG, "Fork threads for adding...");
for (int i = 0; i < num_work_thds; ++i) {
work_thds[i] = new WorkThread(pool, functor, kThreadLargeIter);
work_thds[i]->Start();
}
// Wait for all threads finish
gpr_log(GPR_DEBUG, "Waiting for all work threads finish...");
for (int i = 0; i < num_work_thds; ++i) {
work_thds[i]->Join();
delete work_thds[i];
}
gpr_free(work_thds);
gpr_log(GPR_DEBUG, "Done.");
gpr_log(GPR_DEBUG, "Waiting for all closures finish...");
// Destructor of thread pool will wait for all closures to finish
delete pool;
GPR_ASSERT(functor->count() == kThreadLargeIter * num_work_thds);
delete functor;
gpr_log(GPR_DEBUG, "Done.");
}
// Checks the current count with a given number.
class SimpleFunctorCheckForAdd : public grpc_completion_queue_functor {
public:
SimpleFunctorCheckForAdd(int ok, int* count) : count_(count) {
functor_run = &SimpleFunctorCheckForAdd::Run;
inlineable = true;
internal_success = ok;
}
~SimpleFunctorCheckForAdd() {}
static void Run(struct grpc_completion_queue_functor* cb, int /*ok*/) {
auto* callback = static_cast<SimpleFunctorCheckForAdd*>(cb);
(*callback->count_)++;
GPR_ASSERT(*callback->count_ == callback->internal_success);
}
private:
int* count_;
};
static void test_one_thread_FIFO(void) {
gpr_log(GPR_INFO, "test_one_thread_FIFO");
int counter = 0;
grpc_core::ThreadPool* pool =
new grpc_core::ThreadPool(1, "test_one_thread_FIFO");
SimpleFunctorCheckForAdd** check_functors =
static_cast<SimpleFunctorCheckForAdd**>(
gpr_zalloc(sizeof(SimpleFunctorCheckForAdd*) * kThreadSmallIter));
for (int i = 0; i < kThreadSmallIter; ++i) {
check_functors[i] = new SimpleFunctorCheckForAdd(i + 1, &counter);
pool->Add(check_functors[i]);
}
// Destructor of pool will wait until all closures finished.
delete pool;
for (int i = 0; i < kThreadSmallIter; ++i) {
delete check_functors[i];
}
gpr_free(check_functors);
gpr_log(GPR_DEBUG, "Done.");
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
test_size_zero();
test_constructor_option();
test_add();
test_multi_add();
test_one_thread_FIFO();
grpc_shutdown();
return 0;
}

@ -289,21 +289,6 @@ grpc_cc_test(
deps = [":helpers"],
)
grpc_cc_test(
name = "bm_threadpool",
size = "large",
srcs = ["bm_threadpool.cc"],
args = grpc_benchmark_args(),
tags = [
"manual",
"no_windows",
"notap",
],
uses_event_engine = False,
uses_polling = False,
deps = [":helpers"],
)
grpc_cc_library(
name = "bm_callback_test_service_impl",
testonly = 1,

@ -1,331 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <condition_variable>
#include <mutex>
#include <benchmark/benchmark.h>
#include <grpc/grpc.h>
#include "src/core/lib/iomgr/executor/threadpool.h"
#include "test/core/util/test_config.h"
#include "test/cpp/microbenchmarks/helpers.h"
#include "test/cpp/util/test_config.h"
namespace grpc {
namespace testing {
// This helper class allows a thread to block for a pre-specified number of
// actions. BlockingCounter has an initial non-negative count on initialization.
// Each call to DecrementCount will decrease the count by 1. When making a call
// to Wait, if the count is greater than 0, the thread will be blocked, until
// the count reaches 0.
class BlockingCounter {
public:
explicit BlockingCounter(int count) : count_(count) {}
void DecrementCount() {
std::lock_guard<std::mutex> l(mu_);
count_--;
if (count_ == 0) cv_.notify_all();
}
void Wait() {
std::unique_lock<std::mutex> l(mu_);
while (count_ > 0) {
cv_.wait(l);
}
}
private:
int count_;
std::mutex mu_;
std::condition_variable cv_;
};
// This is a functor/closure class for threadpool microbenchmark.
// This functor (closure) class will add another functor into pool if the
// number passed in (num_add) is greater than 0. Otherwise, it will decrement
// the counter to indicate that task is finished. This functor will suicide at
// the end, therefore, no need for caller to do clean-ups.
class AddAnotherFunctor : public grpc_completion_queue_functor {
public:
AddAnotherFunctor(grpc_core::ThreadPool* pool, BlockingCounter* counter,
int num_add)
: pool_(pool), counter_(counter), num_add_(num_add) {
functor_run = &AddAnotherFunctor::Run;
inlineable = false;
internal_next = this;
internal_success = 0;
}
// When the functor gets to run in thread pool, it will take itself as first
// argument and internal_success as second one.
static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
auto* callback = static_cast<AddAnotherFunctor*>(cb);
if (--callback->num_add_ > 0) {
callback->pool_->Add(new AddAnotherFunctor(
callback->pool_, callback->counter_, callback->num_add_));
} else {
callback->counter_->DecrementCount();
}
// Suicides.
delete callback;
}
private:
grpc_core::ThreadPool* pool_;
BlockingCounter* counter_;
int num_add_;
};
template <int kConcurrentFunctor>
static void ThreadPoolAddAnother(benchmark::State& state) {
const int num_iterations = state.range(0);
const int num_threads = state.range(1);
// Number of adds done by each closure.
const int num_add = num_iterations / kConcurrentFunctor;
grpc_core::ThreadPool pool(num_threads);
while (state.KeepRunningBatch(num_iterations)) {
BlockingCounter counter(kConcurrentFunctor);
for (int i = 0; i < kConcurrentFunctor; ++i) {
pool.Add(new AddAnotherFunctor(&pool, &counter, num_add));
}
counter.Wait();
}
state.SetItemsProcessed(state.iterations());
}
// First pair of arguments is range for number of iterations (num_iterations).
// Second pair of arguments is range for thread pool size (num_threads).
BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 1)->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 4)->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 8)->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 16)
->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 32)
->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 64)
->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 128)
->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 512)
->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 2048)
->RangePair(524288, 524288, 1, 1024);
// A functor class that will delete self on end of running.
class SuicideFunctorForAdd : public grpc_completion_queue_functor {
public:
explicit SuicideFunctorForAdd(BlockingCounter* counter) : counter_(counter) {
functor_run = &SuicideFunctorForAdd::Run;
inlineable = false;
internal_next = this;
internal_success = 0;
}
static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
// On running, the first argument would be itself.
auto* callback = static_cast<SuicideFunctorForAdd*>(cb);
callback->counter_->DecrementCount();
delete callback;
}
private:
BlockingCounter* counter_;
};
// Performs the scenario of external thread(s) adding closures into pool.
static void BM_ThreadPoolExternalAdd(benchmark::State& state) {
static grpc_core::ThreadPool* external_add_pool = nullptr;
int thread_idx = state.thread_index();
// Setup for each run of test.
if (thread_idx == 0) {
const int num_threads = state.range(1);
external_add_pool = new grpc_core::ThreadPool(num_threads);
}
const int num_iterations = state.range(0) / state.threads();
while (state.KeepRunningBatch(num_iterations)) {
BlockingCounter counter(num_iterations);
for (int i = 0; i < num_iterations; ++i) {
external_add_pool->Add(new SuicideFunctorForAdd(&counter));
}
counter.Wait();
}
// Teardown at the end of each test run.
if (thread_idx == 0) {
state.SetItemsProcessed(state.range(0));
delete external_add_pool;
}
}
BENCHMARK(BM_ThreadPoolExternalAdd)
// First pair is range for number of iterations (num_iterations).
// Second pair is range for thread pool size (num_threads).
->RangePair(524288, 524288, 1, 1024)
->ThreadRange(1, 256); // Concurrent external thread(s) up to 256
// Functor (closure) that adds itself into pool repeatedly. By adding self, the
// overhead would be low and can measure the time of add more accurately.
class AddSelfFunctor : public grpc_completion_queue_functor {
public:
AddSelfFunctor(grpc_core::ThreadPool* pool, BlockingCounter* counter,
int num_add)
: pool_(pool), counter_(counter), num_add_(num_add) {
functor_run = &AddSelfFunctor::Run;
inlineable = false;
internal_next = this;
internal_success = 0;
}
// When the functor gets to run in thread pool, it will take itself as first
// argument and internal_success as second one.
static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
auto* callback = static_cast<AddSelfFunctor*>(cb);
if (--callback->num_add_ > 0) {
callback->pool_->Add(cb);
} else {
callback->counter_->DecrementCount();
// Suicides.
delete callback;
}
}
private:
grpc_core::ThreadPool* pool_;
BlockingCounter* counter_;
int num_add_;
};
template <int kConcurrentFunctor>
static void ThreadPoolAddSelf(benchmark::State& state) {
const int num_iterations = state.range(0);
const int num_threads = state.range(1);
// Number of adds done by each closure.
const int num_add = num_iterations / kConcurrentFunctor;
grpc_core::ThreadPool pool(num_threads);
while (state.KeepRunningBatch(num_iterations)) {
BlockingCounter counter(kConcurrentFunctor);
for (int i = 0; i < kConcurrentFunctor; ++i) {
pool.Add(new AddSelfFunctor(&pool, &counter, num_add));
}
counter.Wait();
}
state.SetItemsProcessed(state.iterations());
}
// First pair of arguments is range for number of iterations (num_iterations).
// Second pair of arguments is range for thread pool size (num_threads).
BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 1)->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 4)->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 8)->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 16)->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 32)->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 64)->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 128)->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 512)->RangePair(524288, 524288, 1, 1024);
BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 2048)->RangePair(524288, 524288, 1, 1024);
#if defined(__GNUC__) && !defined(SWIG)
#if defined(__i386__) || defined(__x86_64__)
#define CACHELINE_SIZE 64
#elif defined(__powerpc64__)
#define CACHELINE_SIZE 128
#elif defined(__aarch64__)
#define CACHELINE_SIZE 64
#elif defined(__arm__)
#if defined(__ARM_ARCH_5T__)
#define CACHELINE_SIZE 32
#elif defined(__ARM_ARCH_7A__)
#define CACHELINE_SIZE 64
#endif
#endif
#ifndef CACHELINE_SIZE
#define CACHELINE_SIZE 64
#endif
#endif
// A functor (closure) that simulates closures with small but non-trivial amount
// of work.
class ShortWorkFunctorForAdd : public grpc_completion_queue_functor {
public:
BlockingCounter* counter_;
ShortWorkFunctorForAdd() {
functor_run = &ShortWorkFunctorForAdd::Run;
inlineable = false;
internal_next = this;
internal_success = 0;
val_ = 0;
}
static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
auto* callback = static_cast<ShortWorkFunctorForAdd*>(cb);
// Uses pad to avoid compiler complaining unused variable error.
callback->pad[0] = 0;
for (int i = 0; i < 1000; ++i) {
callback->val_++;
}
callback->counter_->DecrementCount();
}
private:
char pad[CACHELINE_SIZE];
volatile int val_;
};
// Simulates workloads where many short running callbacks are added to the
// threadpool. The callbacks are not enough to keep all the workers busy
// continuously so the number of workers running changes overtime.
//
// In effect this tests how well the threadpool avoids spurious wakeups.
static void BM_SpikyLoad(benchmark::State& state) {
const int num_threads = state.range(0);
const int kNumSpikes = 1000;
const int batch_size = 3 * num_threads;
std::vector<ShortWorkFunctorForAdd> work_vector(batch_size);
grpc_core::ThreadPool pool(num_threads);
while (state.KeepRunningBatch(kNumSpikes * batch_size)) {
for (int i = 0; i != kNumSpikes; ++i) {
BlockingCounter counter(batch_size);
for (auto& w : work_vector) {
w.counter_ = &counter;
pool.Add(&w);
}
counter.Wait();
}
}
state.SetItemsProcessed(state.iterations() * batch_size);
}
BENCHMARK(BM_SpikyLoad)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16);
} // namespace testing
} // namespace grpc
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
// and others do not. This allows us to support both modes.
namespace benchmark {
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
} // namespace benchmark
int main(int argc, char* argv[]) {
grpc::testing::TestEnvironment env(&argc, argv);
LibraryInitializer libInit;
::benchmark::Initialize(&argc, argv);
grpc::testing::InitTest(&argc, &argv, false);
benchmark::RunTheBenchmarksNamespaced();
return 0;
}

@ -2089,10 +2089,6 @@ src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/exec_ctx.h \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor.h \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/mpmcqueue.h \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/executor/threadpool.h \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname.h \

@ -1883,10 +1883,6 @@ src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/exec_ctx.h \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/executor.h \
src/core/lib/iomgr/executor/mpmcqueue.cc \
src/core/lib/iomgr/executor/mpmcqueue.h \
src/core/lib/iomgr/executor/threadpool.cc \
src/core/lib/iomgr/executor/threadpool.h \
src/core/lib/iomgr/fork_posix.cc \
src/core/lib/iomgr/fork_windows.cc \
src/core/lib/iomgr/gethostname.h \

@ -823,30 +823,6 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "mpmcqueue_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -1411,30 +1387,6 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "threadpool_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save