From 1b5295a4a2d581291cadca0af08bde16824dfe08 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 8 Jul 2022 12:07:36 -0700 Subject: [PATCH] [iomgr] Remove executor/... I've tried this before in #27445 and we found some internal usage of this code. Today I can find no such usage, so let's try again. --- BUILD | 4 - CMakeLists.txt | 60 ---- Makefile | 4 - build_autogenerated.yaml | 26 -- config.m4 | 3 - config.w32 | 3 - gRPC-C++.podspec | 4 - gRPC-Core.podspec | 6 - grpc.gemspec | 4 - grpc.gyp | 4 - package.xml | 4 - src/core/lib/iomgr/executor/mpmcqueue.cc | 182 ----------- src/core/lib/iomgr/executor/mpmcqueue.h | 171 ---------- src/core/lib/iomgr/executor/threadpool.cc | 136 -------- src/core/lib/iomgr/executor/threadpool.h | 150 --------- src/python/grpcio/grpc_core_dependencies.py | 2 - test/core/iomgr/BUILD | 26 -- test/core/iomgr/mpmcqueue_test.cc | 227 -------------- test/core/iomgr/threadpool_test.cc | 189 ----------- test/cpp/microbenchmarks/BUILD | 15 - test/cpp/microbenchmarks/bm_threadpool.cc | 331 -------------------- tools/doxygen/Doxyfile.c++.internal | 4 - tools/doxygen/Doxyfile.core.internal | 4 - tools/run_tests/generated/tests.json | 48 --- 24 files changed, 1607 deletions(-) delete mode 100644 src/core/lib/iomgr/executor/mpmcqueue.cc delete mode 100644 src/core/lib/iomgr/executor/mpmcqueue.h delete mode 100644 src/core/lib/iomgr/executor/threadpool.cc delete mode 100644 src/core/lib/iomgr/executor/threadpool.h delete mode 100644 test/core/iomgr/mpmcqueue_test.cc delete mode 100644 test/core/iomgr/threadpool_test.cc delete mode 100644 test/cpp/microbenchmarks/bm_threadpool.cc diff --git a/BUILD b/BUILD index ebf19d82fc4..243822f871a 100644 --- a/BUILD +++ b/BUILD @@ -2538,8 +2538,6 @@ grpc_cc_library( "src/core/lib/iomgr/ev_poll_posix.cc", "src/core/lib/iomgr/ev_posix.cc", "src/core/lib/iomgr/ev_windows.cc", - "src/core/lib/iomgr/executor/mpmcqueue.cc", - "src/core/lib/iomgr/executor/threadpool.cc", "src/core/lib/iomgr/fork_posix.cc", "src/core/lib/iomgr/fork_windows.cc", "src/core/lib/iomgr/gethostname_fallback.cc", @@ -2654,8 +2652,6 @@ grpc_cc_library( "src/core/lib/iomgr/ev_epoll1_linux.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", - "src/core/lib/iomgr/executor/mpmcqueue.h", - "src/core/lib/iomgr/executor/threadpool.h", "src/core/lib/iomgr/gethostname.h", "src/core/lib/iomgr/grpc_if_nametoindex.h", "src/core/lib/iomgr/internal_errqueue.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index b475830375e..5150b1cd6f3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -828,7 +828,6 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_c load_file_test) add_dependencies(buildtests_c message_compress_test) add_dependencies(buildtests_c minimal_stack_is_minimal_test) - add_dependencies(buildtests_c mpmcqueue_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_c mpscq_test) endif() @@ -867,7 +866,6 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_c test_core_iomgr_timer_heap_test) add_dependencies(buildtests_c test_core_iomgr_timer_list_test) add_dependencies(buildtests_c thd_test) - add_dependencies(buildtests_c threadpool_test) add_dependencies(buildtests_c varint_test) add_custom_target(buildtests_cxx) @@ -2102,8 +2100,6 @@ add_library(grpc src/core/lib/iomgr/ev_windows.cc src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc - src/core/lib/iomgr/executor/mpmcqueue.cc - src/core/lib/iomgr/executor/threadpool.cc src/core/lib/iomgr/fork_posix.cc src/core/lib/iomgr/fork_windows.cc src/core/lib/iomgr/gethostname_fallback.cc @@ -2732,8 +2728,6 @@ add_library(grpc_unsecure src/core/lib/iomgr/ev_windows.cc src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc - src/core/lib/iomgr/executor/mpmcqueue.cc - src/core/lib/iomgr/executor/threadpool.cc src/core/lib/iomgr/fork_posix.cc src/core/lib/iomgr/fork_windows.cc src/core/lib/iomgr/gethostname_fallback.cc @@ -5264,33 +5258,6 @@ target_link_libraries(minimal_stack_is_minimal_test ) -endif() -if(gRPC_BUILD_TESTS) - -add_executable(mpmcqueue_test - test/core/iomgr/mpmcqueue_test.cc -) - -target_include_directories(mpmcqueue_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} -) - -target_link_libraries(mpmcqueue_test - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc_test_util -) - - endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) @@ -5965,33 +5932,6 @@ target_link_libraries(thd_test ) -endif() -if(gRPC_BUILD_TESTS) - -add_executable(threadpool_test - test/core/iomgr/threadpool_test.cc -) - -target_include_directories(threadpool_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} -) - -target_link_libraries(threadpool_test - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc_test_util -) - - endif() if(gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index 3926b36609a..a002c1dbba8 100644 --- a/Makefile +++ b/Makefile @@ -1484,8 +1484,6 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/ev_windows.cc \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ - src/core/lib/iomgr/executor/mpmcqueue.cc \ - src/core/lib/iomgr/executor/threadpool.cc \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname_fallback.cc \ @@ -1954,8 +1952,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/ev_windows.cc \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ - src/core/lib/iomgr/executor/mpmcqueue.cc \ - src/core/lib/iomgr/executor/threadpool.cc \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname_fallback.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 7b4b2bc26a6..d464d539ced 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -793,8 +793,6 @@ libs: - src/core/lib/iomgr/ev_posix.h - src/core/lib/iomgr/exec_ctx.h - src/core/lib/iomgr/executor.h - - src/core/lib/iomgr/executor/mpmcqueue.h - - src/core/lib/iomgr/executor/threadpool.h - src/core/lib/iomgr/gethostname.h - src/core/lib/iomgr/grpc_if_nametoindex.h - src/core/lib/iomgr/internal_errqueue.h @@ -1463,8 +1461,6 @@ libs: - src/core/lib/iomgr/ev_windows.cc - src/core/lib/iomgr/exec_ctx.cc - src/core/lib/iomgr/executor.cc - - src/core/lib/iomgr/executor/mpmcqueue.cc - - src/core/lib/iomgr/executor/threadpool.cc - src/core/lib/iomgr/fork_posix.cc - src/core/lib/iomgr/fork_windows.cc - src/core/lib/iomgr/gethostname_fallback.cc @@ -2011,8 +2007,6 @@ libs: - src/core/lib/iomgr/ev_posix.h - src/core/lib/iomgr/exec_ctx.h - src/core/lib/iomgr/executor.h - - src/core/lib/iomgr/executor/mpmcqueue.h - - src/core/lib/iomgr/executor/threadpool.h - src/core/lib/iomgr/gethostname.h - src/core/lib/iomgr/grpc_if_nametoindex.h - src/core/lib/iomgr/internal_errqueue.h @@ -2329,8 +2323,6 @@ libs: - src/core/lib/iomgr/ev_windows.cc - src/core/lib/iomgr/exec_ctx.cc - src/core/lib/iomgr/executor.cc - - src/core/lib/iomgr/executor/mpmcqueue.cc - - src/core/lib/iomgr/executor/threadpool.cc - src/core/lib/iomgr/fork_posix.cc - src/core/lib/iomgr/fork_windows.cc - src/core/lib/iomgr/gethostname_fallback.cc @@ -3565,15 +3557,6 @@ targets: deps: - grpc_test_util uses_polling: false -- name: mpmcqueue_test - build: test - language: c - headers: [] - src: - - test/core/iomgr/mpmcqueue_test.cc - deps: - - grpc_test_util - uses_polling: false - name: mpscq_test build: test language: c @@ -3834,15 +3817,6 @@ targets: deps: - grpc_test_util uses_polling: false -- name: threadpool_test - build: test - language: c - headers: [] - src: - - test/core/iomgr/threadpool_test.cc - deps: - - grpc_test_util - uses_polling: false - name: varint_test build: test language: c diff --git a/config.m4 b/config.m4 index 9a44118996e..d8f8ed0bdd8 100644 --- a/config.m4 +++ b/config.m4 @@ -543,8 +543,6 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/ev_windows.cc \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ - src/core/lib/iomgr/executor/mpmcqueue.cc \ - src/core/lib/iomgr/executor/threadpool.cc \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname_fallback.cc \ @@ -1323,7 +1321,6 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/gprpp) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/http) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/iomgr) - PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/iomgr/executor) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/json) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/matchers) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/profiling) diff --git a/config.w32 b/config.w32 index 174fcf9166b..1504b4d95d2 100644 --- a/config.w32 +++ b/config.w32 @@ -509,8 +509,6 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\iomgr\\ev_windows.cc " + "src\\core\\lib\\iomgr\\exec_ctx.cc " + "src\\core\\lib\\iomgr\\executor.cc " + - "src\\core\\lib\\iomgr\\executor\\mpmcqueue.cc " + - "src\\core\\lib\\iomgr\\executor\\threadpool.cc " + "src\\core\\lib\\iomgr\\fork_posix.cc " + "src\\core\\lib\\iomgr\\fork_windows.cc " + "src\\core\\lib\\iomgr\\gethostname_fallback.cc " + @@ -1445,7 +1443,6 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\gprpp"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\http"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\iomgr"); - FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\iomgr\\executor"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\json"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\matchers"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\profiling"); diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index f44860393e7..c54f97be7fb 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -751,8 +751,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/ev_posix.h', 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', - 'src/core/lib/iomgr/executor/mpmcqueue.h', - 'src/core/lib/iomgr/executor/threadpool.h', 'src/core/lib/iomgr/gethostname.h', 'src/core/lib/iomgr/grpc_if_nametoindex.h', 'src/core/lib/iomgr/internal_errqueue.h', @@ -1591,8 +1589,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/ev_posix.h', 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', - 'src/core/lib/iomgr/executor/mpmcqueue.h', - 'src/core/lib/iomgr/executor/threadpool.h', 'src/core/lib/iomgr/gethostname.h', 'src/core/lib/iomgr/grpc_if_nametoindex.h', 'src/core/lib/iomgr/internal_errqueue.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 3e5702cd5bf..67eaf366db8 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1192,10 +1192,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.cc', 'src/core/lib/iomgr/executor.h', - 'src/core/lib/iomgr/executor/mpmcqueue.cc', - 'src/core/lib/iomgr/executor/mpmcqueue.h', - 'src/core/lib/iomgr/executor/threadpool.cc', - 'src/core/lib/iomgr/executor/threadpool.h', 'src/core/lib/iomgr/fork_posix.cc', 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname.h', @@ -2203,8 +2199,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/ev_posix.h', 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', - 'src/core/lib/iomgr/executor/mpmcqueue.h', - 'src/core/lib/iomgr/executor/threadpool.h', 'src/core/lib/iomgr/gethostname.h', 'src/core/lib/iomgr/grpc_if_nametoindex.h', 'src/core/lib/iomgr/internal_errqueue.h', diff --git a/grpc.gemspec b/grpc.gemspec index 0cd4fcdcf9b..bc1e4c3a209 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1106,10 +1106,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/exec_ctx.h ) s.files += %w( src/core/lib/iomgr/executor.cc ) s.files += %w( src/core/lib/iomgr/executor.h ) - s.files += %w( src/core/lib/iomgr/executor/mpmcqueue.cc ) - s.files += %w( src/core/lib/iomgr/executor/mpmcqueue.h ) - s.files += %w( src/core/lib/iomgr/executor/threadpool.cc ) - s.files += %w( src/core/lib/iomgr/executor/threadpool.h ) s.files += %w( src/core/lib/iomgr/fork_posix.cc ) s.files += %w( src/core/lib/iomgr/fork_windows.cc ) s.files += %w( src/core/lib/iomgr/gethostname.h ) diff --git a/grpc.gyp b/grpc.gyp index 75ac5727a72..6fb84b8bf2c 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -834,8 +834,6 @@ 'src/core/lib/iomgr/ev_windows.cc', 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', - 'src/core/lib/iomgr/executor/mpmcqueue.cc', - 'src/core/lib/iomgr/executor/threadpool.cc', 'src/core/lib/iomgr/fork_posix.cc', 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname_fallback.cc', @@ -1296,8 +1294,6 @@ 'src/core/lib/iomgr/ev_windows.cc', 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', - 'src/core/lib/iomgr/executor/mpmcqueue.cc', - 'src/core/lib/iomgr/executor/threadpool.cc', 'src/core/lib/iomgr/fork_posix.cc', 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname_fallback.cc', diff --git a/package.xml b/package.xml index 85bdb8d3552..ec796847689 100644 --- a/package.xml +++ b/package.xml @@ -1088,10 +1088,6 @@ - - - - diff --git a/src/core/lib/iomgr/executor/mpmcqueue.cc b/src/core/lib/iomgr/executor/mpmcqueue.cc deleted file mode 100644 index e4d9d21dc31..00000000000 --- a/src/core/lib/iomgr/executor/mpmcqueue.cc +++ /dev/null @@ -1,182 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include - -#include "src/core/lib/iomgr/executor/mpmcqueue.h" - -namespace grpc_core { - -DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool"); - -inline void* InfLenFIFOQueue::PopFront() { - // Caller should already check queue is not empty and has already held the - // mutex. This function will assume that there is at least one element in the - // queue (i.e. queue_head_->content is valid). - void* result = queue_head_->content; - count_.store(count_.load(std::memory_order_relaxed) - 1, - std::memory_order_relaxed); - - // Updates Stats when trace flag turned on. - if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { - gpr_timespec wait_time = - gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), queue_head_->insert_time); - stats_.num_completed++; - stats_.total_queue_time = gpr_time_add(stats_.total_queue_time, wait_time); - stats_.max_queue_time = gpr_time_max( - gpr_convert_clock_type(stats_.max_queue_time, GPR_TIMESPAN), wait_time); - - if (count_.load(std::memory_order_relaxed) == 0) { - stats_.busy_queue_time = - gpr_time_add(stats_.busy_queue_time, - gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), busy_time)); - } - - gpr_log(GPR_INFO, - "[InfLenFIFOQueue PopFront] num_completed: %" PRIu64 - " total_queue_time: %f max_queue_time: %f busy_queue_time: %f", - stats_.num_completed, - gpr_timespec_to_micros(stats_.total_queue_time), - gpr_timespec_to_micros(stats_.max_queue_time), - gpr_timespec_to_micros(stats_.busy_queue_time)); - } - - queue_head_ = queue_head_->next; - // Signal waiting thread - if (count_.load(std::memory_order_relaxed) > 0) { - TopWaiter()->cv.Signal(); - } - - return result; -} - -InfLenFIFOQueue::Node* InfLenFIFOQueue::AllocateNodes(int num) { - num_nodes_ = num_nodes_ + num; - Node* new_chunk = new Node[num]; - new_chunk[0].next = &new_chunk[1]; - new_chunk[num - 1].prev = &new_chunk[num - 2]; - for (int i = 1; i < num - 1; ++i) { - new_chunk[i].prev = &new_chunk[i - 1]; - new_chunk[i].next = &new_chunk[i + 1]; - } - return new_chunk; -} - -InfLenFIFOQueue::InfLenFIFOQueue() { - delete_list_size_ = kDeleteListInitSize; - delete_list_ = new Node*[delete_list_size_]; - - Node* new_chunk = AllocateNodes(kQueueInitNumNodes); - delete_list_[delete_list_count_++] = new_chunk; - queue_head_ = queue_tail_ = new_chunk; - new_chunk[0].prev = &new_chunk[kQueueInitNumNodes - 1]; - new_chunk[kQueueInitNumNodes - 1].next = &new_chunk[0]; - - waiters_.next = &waiters_; - waiters_.prev = &waiters_; -} - -InfLenFIFOQueue::~InfLenFIFOQueue() { - GPR_ASSERT(count_.load(std::memory_order_relaxed) == 0); - for (size_t i = 0; i < delete_list_count_; ++i) { - delete[] delete_list_[i]; - } - delete[] delete_list_; -} - -void InfLenFIFOQueue::Put(void* elem) { - MutexLock l(&mu_); - - int curr_count = count_.load(std::memory_order_relaxed); - - if (queue_tail_ == queue_head_ && curr_count != 0) { - // List is full. Expands list to double size by inserting new chunk of nodes - Node* new_chunk = AllocateNodes(curr_count); - delete_list_[delete_list_count_++] = new_chunk; - // Expands delete list on full. - if (delete_list_count_ == delete_list_size_) { - delete_list_size_ = delete_list_size_ * 2; - delete_list_ = new Node*[delete_list_size_]; - } - new_chunk[0].prev = queue_tail_->prev; - new_chunk[curr_count - 1].next = queue_head_; - queue_tail_->prev->next = new_chunk; - queue_head_->prev = &new_chunk[curr_count - 1]; - queue_tail_ = new_chunk; - } - queue_tail_->content = static_cast(elem); - - // Updates Stats info - if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { - stats_.num_started++; - gpr_log(GPR_INFO, "[InfLenFIFOQueue Put] num_started: %" PRIu64, - stats_.num_started); - auto current_time = gpr_now(GPR_CLOCK_MONOTONIC); - if (curr_count == 0) { - busy_time = current_time; - } - queue_tail_->insert_time = current_time; - } - - count_.store(curr_count + 1, std::memory_order_relaxed); - queue_tail_ = queue_tail_->next; - - TopWaiter()->cv.Signal(); -} - -void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) { - MutexLock l(&mu_); - - if (count_.load(std::memory_order_relaxed) == 0) { - gpr_timespec start_time; - if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) && - wait_time != nullptr) { - start_time = gpr_now(GPR_CLOCK_MONOTONIC); - } - - Waiter self; - PushWaiter(&self); - do { - self.cv.Wait(&mu_); - } while (count_.load(std::memory_order_relaxed) == 0); - RemoveWaiter(&self); - if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) && - wait_time != nullptr) { - *wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time); - } - } - GPR_DEBUG_ASSERT(count_.load(std::memory_order_relaxed) > 0); - return PopFront(); -} - -void InfLenFIFOQueue::PushWaiter(Waiter* waiter) { - waiter->next = waiters_.next; - waiter->prev = &waiters_; - waiter->next->prev = waiter; - waiter->prev->next = waiter; -} - -void InfLenFIFOQueue::RemoveWaiter(Waiter* waiter) { - GPR_DEBUG_ASSERT(waiter != &waiters_); - waiter->next->prev = waiter->prev; - waiter->prev->next = waiter->next; -} - -InfLenFIFOQueue::Waiter* InfLenFIFOQueue::TopWaiter() { return waiters_.next; } - -} // namespace grpc_core diff --git a/src/core/lib/iomgr/executor/mpmcqueue.h b/src/core/lib/iomgr/executor/mpmcqueue.h deleted file mode 100644 index f3bc346e487..00000000000 --- a/src/core/lib/iomgr/executor/mpmcqueue.h +++ /dev/null @@ -1,171 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H -#define GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H - -#include - -#include - -#include "src/core/lib/debug/stats.h" -#include "src/core/lib/gprpp/sync.h" - -namespace grpc_core { - -extern DebugOnlyTraceFlag grpc_thread_pool_trace; - -// Abstract base class of a Multiple-Producer-Multiple-Consumer(MPMC) queue -// interface -class MPMCQueueInterface { - public: - virtual ~MPMCQueueInterface() {} - - // Puts elem into queue immediately at the end of queue. - // This might cause to block on full queue depending on implementation. - virtual void Put(void* elem) = 0; - - // Removes the oldest element from the queue and return it. - // This might cause to block on empty queue depending on implementation. - // Optional argument for collecting stats purpose. - virtual void* Get(gpr_timespec* wait_time) = 0; - - // Returns number of elements in the queue currently - virtual int count() const = 0; -}; - -class InfLenFIFOQueue : public MPMCQueueInterface { - public: - // Creates a new MPMC Queue. The queue created will have infinite length. - InfLenFIFOQueue(); - - // Releases all resources held by the queue. The queue must be empty, and no - // one waits on conditional variables. - ~InfLenFIFOQueue() override; - - // Puts elem into queue immediately at the end of queue. Since the queue has - // infinite length, this routine will never block and should never fail. - void Put(void* elem) override; - - // Removes the oldest element from the queue and returns it. - // This routine will cause the thread to block if queue is currently empty. - // Argument wait_time should be passed in when trace flag turning on (for - // collecting stats info purpose.) - void* Get(gpr_timespec* wait_time) override; - - // Returns number of elements in queue currently. - // There might be concurrently add/remove on queue, so count might change - // quickly. - int count() const override { return count_.load(std::memory_order_relaxed); } - - struct Node { - Node* next = nullptr; // Linking - Node* prev = nullptr; - void* content = nullptr; // Points to actual element - gpr_timespec insert_time; // Time for stats - }; - - // For test purpose only. Returns number of nodes allocated in queue. - // Any allocated node will be alive until the destruction of the queue. - int num_nodes() const { return num_nodes_; } - - // For test purpose only. Returns the initial number of nodes in queue. - int init_num_nodes() const { return kQueueInitNumNodes; } - - private: - // For Internal Use Only. - // Removes the oldest element from the queue and returns it. This routine - // will NOT check whether queue is empty, and it will NOT acquire mutex. - // Caller MUST check that queue is not empty and must acquire mutex before - // callling. - void* PopFront(); - - // Stats of queue. This will only be collect when debug trace mode is on. - // All printed stats info will have time measurement in microsecond. - struct Stats { - uint64_t num_started; // Number of elements have been added to queue - uint64_t num_completed; // Number of elements have been removed from - // the queue - gpr_timespec total_queue_time; // Total waiting time that all the - // removed elements have spent in queue - gpr_timespec max_queue_time; // Max waiting time among all removed - // elements - gpr_timespec busy_queue_time; // Accumulated amount of time that queue - // was not empty - - Stats() { - num_started = 0; - num_completed = 0; - total_queue_time = gpr_time_0(GPR_TIMESPAN); - max_queue_time = gpr_time_0(GPR_TIMESPAN); - busy_queue_time = gpr_time_0(GPR_TIMESPAN); - } - }; - - // Node for waiting thread queue. Stands for one waiting thread, should have - // exact one thread waiting on its CondVar. - // Using a doubly linked list for waiting thread queue to wake up waiting - // threads in LIFO order to reduce cache misses. - struct Waiter { - CondVar cv; - Waiter* next; - Waiter* prev; - }; - - // Pushs waiter to the front of queue, require caller held mutex - void PushWaiter(Waiter* waiter); - - // Removes waiter from queue, require caller held mutex - void RemoveWaiter(Waiter* waiter); - - // Returns pointer to the waiter that should be waken up next, should be the - // last added waiter. - Waiter* TopWaiter(); - - Mutex mu_; // Protecting lock - Waiter waiters_; // Head of waiting thread queue - - // Initial size for delete list - static const int kDeleteListInitSize = 1024; - // Initial number of nodes allocated - static const int kQueueInitNumNodes = 1024; - - Node** delete_list_ = nullptr; // Keeps track of all allocated array entries - // for deleting on destruction - size_t delete_list_count_ = 0; // Number of entries in list - size_t delete_list_size_ = 0; // Size of the list. List will be expanded to - // double size on full - - Node* queue_head_ = nullptr; // Head of the queue, remove position - Node* queue_tail_ = nullptr; // End of queue, insert position - std::atomic count_{0}; // Number of elements in queue - int num_nodes_ = 0; // Number of nodes allocated - - Stats stats_; // Stats info - gpr_timespec busy_time; // Start time of busy queue - - // Internal Helper. - // Allocates an array of nodes of size "num", links all nodes together except - // the first node's prev and last node's next. They should be set by caller - // manually afterward. - Node* AllocateNodes(int num); -}; - -} // namespace grpc_core - -#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H */ diff --git a/src/core/lib/iomgr/executor/threadpool.cc b/src/core/lib/iomgr/executor/threadpool.cc deleted file mode 100644 index 8bd954ccd0c..00000000000 --- a/src/core/lib/iomgr/executor/threadpool.cc +++ /dev/null @@ -1,136 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include - -#include "src/core/lib/iomgr/executor/threadpool.h" - -namespace grpc_core { - -void ThreadPoolWorker::Run() { - while (true) { - void* elem; - - if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) { - // Updates stats and print - gpr_timespec wait_time = gpr_time_0(GPR_TIMESPAN); - elem = queue_->Get(&wait_time); - stats_.sleep_time = gpr_time_add(stats_.sleep_time, wait_time); - gpr_log(GPR_INFO, - "ThreadPool Worker [%s %d] Stats: sleep_time %f", - thd_name_, index_, gpr_timespec_to_micros(stats_.sleep_time)); - } else { - elem = queue_->Get(nullptr); - } - if (elem == nullptr) { - break; - } - // Runs closure - auto* closure = static_cast(elem); - closure->functor_run(closure, closure->internal_success); - } -} - -void ThreadPool::SharedThreadPoolConstructor() { - // All worker threads in thread pool must be joinable. - thread_options_.set_joinable(true); - - // Create at least 1 worker thread. - if (num_threads_ <= 0) num_threads_ = 1; - - queue_ = new InfLenFIFOQueue(); - threads_ = static_cast( - gpr_zalloc(num_threads_ * sizeof(ThreadPoolWorker*))); - for (int i = 0; i < num_threads_; ++i) { - threads_[i] = new ThreadPoolWorker(thd_name_, queue_, thread_options_, i); - threads_[i]->Start(); - } -} - -size_t ThreadPool::DefaultStackSize() { -#if defined(__ANDROID__) || defined(__APPLE__) - return 1952 * 1024; -#else - return 64 * 1024; -#endif -} - -void ThreadPool::AssertHasNotBeenShutDown() { - // For debug checking purpose, using RELAXED order is sufficient. - GPR_DEBUG_ASSERT(!shut_down_.load(std::memory_order_relaxed)); -} - -ThreadPool::ThreadPool(int num_threads) : num_threads_(num_threads) { - thd_name_ = "ThreadPoolWorker"; - thread_options_ = Thread::Options(); - thread_options_.set_stack_size(DefaultStackSize()); - SharedThreadPoolConstructor(); -} - -ThreadPool::ThreadPool(int num_threads, const char* thd_name) - : num_threads_(num_threads), thd_name_(thd_name) { - thread_options_ = Thread::Options(); - thread_options_.set_stack_size(DefaultStackSize()); - SharedThreadPoolConstructor(); -} - -ThreadPool::ThreadPool(int num_threads, const char* thd_name, - const Thread::Options& thread_options) - : num_threads_(num_threads), - thd_name_(thd_name), - thread_options_(thread_options) { - if (thread_options_.stack_size() == 0) { - thread_options_.set_stack_size(DefaultStackSize()); - } - SharedThreadPoolConstructor(); -} - -ThreadPool::~ThreadPool() { - // For debug checking purpose, using RELAXED order is sufficient. - shut_down_.store(true, std::memory_order_relaxed); - - for (int i = 0; i < num_threads_; ++i) { - queue_->Put(nullptr); - } - - for (int i = 0; i < num_threads_; ++i) { - threads_[i]->Join(); - } - - for (int i = 0; i < num_threads_; ++i) { - delete threads_[i]; - } - gpr_free(threads_); - delete queue_; -} - -void ThreadPool::Add(grpc_completion_queue_functor* closure) { - AssertHasNotBeenShutDown(); - queue_->Put(static_cast(closure)); -} - -int ThreadPool::num_pending_closures() const { return queue_->count(); } - -int ThreadPool::pool_capacity() const { return num_threads_; } - -const Thread::Options& ThreadPool::thread_options() const { - return thread_options_; -} - -const char* ThreadPool::thread_name() const { return thd_name_; } -} // namespace grpc_core diff --git a/src/core/lib/iomgr/executor/threadpool.h b/src/core/lib/iomgr/executor/threadpool.h deleted file mode 100644 index e0e5eb07cd0..00000000000 --- a/src/core/lib/iomgr/executor/threadpool.h +++ /dev/null @@ -1,150 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H -#define GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H - -#include - -#include - -#include "src/core/lib/gprpp/thd.h" -#include "src/core/lib/iomgr/executor/mpmcqueue.h" - -namespace grpc_core { - -// A base abstract base class for threadpool. -// Threadpool is an executor that maintains a pool of threads sitting around -// and waiting for closures. A threadpool also maintains a queue of pending -// closures, when closures appearing in the queue, the threads in pool will -// pull them out and execute them. -class ThreadPoolInterface { - public: - // Waits for all pending closures to complete, then shuts down thread pool. - virtual ~ThreadPoolInterface() {} - - // Schedules a given closure for execution later. - // Depending on specific subclass implementation, this routine might cause - // current thread to be blocked (in case of unable to schedule). - // Closure should contain a function pointer and arguments it will take, more - // details for closure struct at /grpc/include/grpc/impl/codegen/grpc_types.h - virtual void Add(grpc_completion_queue_functor* closure) = 0; - - // Returns the current number of pending closures - virtual int num_pending_closures() const = 0; - - // Returns the capacity of pool (number of worker threads in pool) - virtual int pool_capacity() const = 0; - - // Thread option accessor - virtual const Thread::Options& thread_options() const = 0; - - // Returns the thread name for threads in this ThreadPool. - virtual const char* thread_name() const = 0; -}; - -// Worker thread for threadpool. Executes closures in the queue, until getting a -// NULL closure. -class ThreadPoolWorker { - public: - ThreadPoolWorker(const char* thd_name, MPMCQueueInterface* queue, - Thread::Options& options, int index) - : queue_(queue), thd_name_(thd_name), index_(index) { - thd_ = Thread( - thd_name, [](void* th) { static_cast(th)->Run(); }, - this, nullptr, options); - } - - ~ThreadPoolWorker() {} - - void Start() { thd_.Start(); } - void Join() { thd_.Join(); } - - private: - // struct for tracking stats of thread - struct Stats { - gpr_timespec sleep_time; - Stats() { sleep_time = gpr_time_0(GPR_TIMESPAN); } - }; - - void Run(); // Pulls closures from queue and executes them - - MPMCQueueInterface* queue_; // Queue in thread pool to pull closures from - Thread thd_; // Thread wrapped in - Stats stats_; // Stats to be collected in run time - const char* thd_name_; // Name of thread - int index_; // Index in thread pool -}; - -// A fixed size thread pool implementation of abstract thread pool interface. -// In this implementation, the number of threads in pool is fixed, but the -// capacity of closure queue is unlimited. -class ThreadPool : public ThreadPoolInterface { - public: - // Creates a thread pool with size of "num_threads", with default thread name - // "ThreadPoolWorker" and all thread options set to default. If the given size - // is 0 or less, there will be 1 worker thread created inside pool. - explicit ThreadPool(int num_threads); - - // Same as ThreadPool(int num_threads) constructor, except - // that it also sets "thd_name" as the name of all threads in the thread pool. - ThreadPool(int num_threads, const char* thd_name); - - // Same as ThreadPool(const char *thd_name, int num_threads) constructor, - // except that is also set thread_options for threads. - // Notes for stack size: - // If the stack size field of the passed in Thread::Options is set to default - // value 0, default ThreadPool stack size will be used. The current default - // stack size of this implementation is 1952K for mobile platform and 64K for - // all others. - ThreadPool(int num_threads, const char* thd_name, - const Thread::Options& thread_options); - - // Waits for all pending closures to complete, then shuts down thread pool. - ~ThreadPool() override; - - // Adds given closure into pending queue immediately. Since closure queue has - // infinite length, this routine will not block. - void Add(grpc_completion_queue_functor* closure) override; - - int num_pending_closures() const override; - int pool_capacity() const override; - const Thread::Options& thread_options() const override; - const char* thread_name() const override; - - private: - int num_threads_ = 0; - const char* thd_name_ = nullptr; - Thread::Options thread_options_; - ThreadPoolWorker** threads_ = nullptr; // Array of worker threads - MPMCQueueInterface* queue_ = nullptr; // Closure queue - - std::atomic shut_down_{ - false}; // Destructor has been called if set to true - - void SharedThreadPoolConstructor(); - // For ThreadPool, default stack size for mobile platform is 1952K. for other - // platforms is 64K. - size_t DefaultStackSize(); - // Internal Use Only for debug checking. - void AssertHasNotBeenShutDown(); -}; - -} // namespace grpc_core - -#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H */ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 3d9e7deeda8..a00678284ea 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -518,8 +518,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/ev_windows.cc', 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', - 'src/core/lib/iomgr/executor/mpmcqueue.cc', - 'src/core/lib/iomgr/executor/threadpool.cc', 'src/core/lib/iomgr/fork_posix.cc', 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname_fallback.cc', diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD index 9620a335573..56351e5ed7a 100644 --- a/test/core/iomgr/BUILD +++ b/test/core/iomgr/BUILD @@ -146,19 +146,6 @@ grpc_cc_test( ], ) -grpc_cc_test( - name = "mpmcqueue_test", - srcs = ["mpmcqueue_test.cc"], - language = "C++", - uses_event_engine = False, - uses_polling = False, - deps = [ - "//:gpr", - "//:grpc", - "//test/core/util:grpc_test_util", - ], -) - grpc_cc_test( name = "resolve_address_using_ares_resolver_posix_test", srcs = ["resolve_address_posix_test.cc"], @@ -292,19 +279,6 @@ grpc_cc_test( ], ) -grpc_cc_test( - name = "threadpool_test", - srcs = ["threadpool_test.cc"], - language = "C++", - uses_event_engine = False, - uses_polling = False, - deps = [ - "//:gpr", - "//:grpc", - "//test/core/util:grpc_test_util", - ], -) - grpc_cc_test( name = "time_averaged_stats_test", srcs = ["time_averaged_stats_test.cc"], diff --git a/test/core/iomgr/mpmcqueue_test.cc b/test/core/iomgr/mpmcqueue_test.cc deleted file mode 100644 index 76d48d44995..00000000000 --- a/test/core/iomgr/mpmcqueue_test.cc +++ /dev/null @@ -1,227 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/iomgr/executor/mpmcqueue.h" - -#include - -#include "src/core/lib/gprpp/thd.h" -#include "test/core/util/test_config.h" - -#define TEST_NUM_ITEMS 10000 - -// Testing items for queue -struct WorkItem { - int index; - bool done; - - explicit WorkItem(int i) : index(i) { done = false; } -}; - -// Thread to "produce" items and put items into queue -// It will also check that all items has been marked done and clean up all -// produced items on destructing. -class ProducerThread { - public: - ProducerThread(grpc_core::InfLenFIFOQueue* queue, int start_index, - int num_items) - : start_index_(start_index), num_items_(num_items), queue_(queue) { - items_ = nullptr; - thd_ = grpc_core::Thread( - "mpmcq_test_producer_thd", - [](void* th) { static_cast(th)->Run(); }, this); - } - ~ProducerThread() { - for (int i = 0; i < num_items_; ++i) { - GPR_ASSERT(items_[i]->done); - delete items_[i]; - } - delete[] items_; - } - - void Start() { thd_.Start(); } - void Join() { thd_.Join(); } - - private: - void Run() { - items_ = new WorkItem*[num_items_]; - for (int i = 0; i < num_items_; ++i) { - items_[i] = new WorkItem(start_index_ + i); - queue_->Put(items_[i]); - } - } - - int start_index_; - int num_items_; - grpc_core::InfLenFIFOQueue* queue_; - grpc_core::Thread thd_; - WorkItem** items_; -}; - -// Thread to pull out items from queue -class ConsumerThread { - public: - explicit ConsumerThread(grpc_core::InfLenFIFOQueue* queue) : queue_(queue) { - thd_ = grpc_core::Thread( - "mpmcq_test_consumer_thd", - [](void* th) { static_cast(th)->Run(); }, this); - } - ~ConsumerThread() {} - - void Start() { thd_.Start(); } - void Join() { thd_.Join(); } - - private: - void Run() { - // count number of Get() called in this thread - int count = 0; - - WorkItem* item; - while ((item = static_cast(queue_->Get(nullptr))) != nullptr) { - count++; - GPR_ASSERT(!item->done); - item->done = true; - } - - gpr_log(GPR_DEBUG, "ConsumerThread: %d times of Get() called.", count); - } - grpc_core::InfLenFIFOQueue* queue_; - grpc_core::Thread thd_; -}; - -static void test_FIFO(void) { - gpr_log(GPR_INFO, "test_FIFO"); - grpc_core::InfLenFIFOQueue large_queue; - for (int i = 0; i < TEST_NUM_ITEMS; ++i) { - large_queue.Put(static_cast(new WorkItem(i))); - } - GPR_ASSERT(large_queue.count() == TEST_NUM_ITEMS); - for (int i = 0; i < TEST_NUM_ITEMS; ++i) { - WorkItem* item = static_cast(large_queue.Get(nullptr)); - GPR_ASSERT(i == item->index); - delete item; - } -} - -// Test if queue's behavior of expanding is correct. (Only does expansion when -// it gets full, and each time expands to doubled size). -static void test_space_efficiency(void) { - gpr_log(GPR_INFO, "test_space_efficiency"); - grpc_core::InfLenFIFOQueue queue; - for (int i = 0; i < queue.init_num_nodes(); ++i) { - queue.Put(static_cast(new WorkItem(i))); - } - // Queue should not have been expanded at this time. - GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes()); - for (int i = 0; i < queue.init_num_nodes(); ++i) { - WorkItem* item = static_cast(queue.Get(nullptr)); - queue.Put(item); - } - GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes()); - for (int i = 0; i < queue.init_num_nodes(); ++i) { - WorkItem* item = static_cast(queue.Get(nullptr)); - delete item; - } - // Queue never shrinks even it is empty. - GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes()); - GPR_ASSERT(queue.count() == 0); - // queue empty now - for (int i = 0; i < queue.init_num_nodes() * 2; ++i) { - queue.Put(static_cast(new WorkItem(i))); - } - GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2); - // Queue should have been expanded once. - GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2); - for (int i = 0; i < queue.init_num_nodes(); ++i) { - WorkItem* item = static_cast(queue.Get(nullptr)); - delete item; - } - GPR_ASSERT(queue.count() == queue.init_num_nodes()); - // Queue will never shrink, should keep same number of node as before. - GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2); - for (int i = 0; i < queue.init_num_nodes() + 1; ++i) { - queue.Put(static_cast(new WorkItem(i))); - } - GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2 + 1); - // Queue should have been expanded twice. - GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4); - for (int i = 0; i < queue.init_num_nodes() * 2 + 1; ++i) { - WorkItem* item = static_cast(queue.Get(nullptr)); - delete item; - } - GPR_ASSERT(queue.count() == 0); - GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4); - gpr_log(GPR_DEBUG, "Done."); -} - -static void test_many_thread(void) { - gpr_log(GPR_INFO, "test_many_thread"); - const int num_producer_threads = 10; - const int num_consumer_threads = 20; - grpc_core::InfLenFIFOQueue queue; - ProducerThread** producer_threads = new ProducerThread*[num_producer_threads]; - ConsumerThread** consumer_threads = new ConsumerThread*[num_consumer_threads]; - - gpr_log(GPR_DEBUG, "Fork ProducerThreads..."); - for (int i = 0; i < num_producer_threads; ++i) { - producer_threads[i] = - new ProducerThread(&queue, i * TEST_NUM_ITEMS, TEST_NUM_ITEMS); - producer_threads[i]->Start(); - } - gpr_log(GPR_DEBUG, "ProducerThreads Started."); - gpr_log(GPR_DEBUG, "Fork ConsumerThreads..."); - for (int i = 0; i < num_consumer_threads; ++i) { - consumer_threads[i] = new ConsumerThread(&queue); - consumer_threads[i]->Start(); - } - gpr_log(GPR_DEBUG, "ConsumerThreads Started."); - gpr_log(GPR_DEBUG, "Waiting ProducerThreads to finish..."); - for (int i = 0; i < num_producer_threads; ++i) { - producer_threads[i]->Join(); - } - gpr_log(GPR_DEBUG, "All ProducerThreads Terminated."); - gpr_log(GPR_DEBUG, "Terminating ConsumerThreads..."); - for (int i = 0; i < num_consumer_threads; ++i) { - queue.Put(nullptr); - } - for (int i = 0; i < num_consumer_threads; ++i) { - consumer_threads[i]->Join(); - } - gpr_log(GPR_DEBUG, "All ConsumerThreads Terminated."); - gpr_log(GPR_DEBUG, "Checking WorkItems and Cleaning Up..."); - for (int i = 0; i < num_producer_threads; ++i) { - // Destructor of ProducerThread will do the check of WorkItems - delete producer_threads[i]; - } - delete[] producer_threads; - for (int i = 0; i < num_consumer_threads; ++i) { - delete consumer_threads[i]; - } - delete[] consumer_threads; - gpr_log(GPR_DEBUG, "Done."); -} - -int main(int argc, char** argv) { - grpc::testing::TestEnvironment env(&argc, argv); - grpc_init(); - test_FIFO(); - test_space_efficiency(); - test_many_thread(); - grpc_shutdown(); - return 0; -} diff --git a/test/core/iomgr/threadpool_test.cc b/test/core/iomgr/threadpool_test.cc deleted file mode 100644 index dd6b405307a..00000000000 --- a/test/core/iomgr/threadpool_test.cc +++ /dev/null @@ -1,189 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/iomgr/executor/threadpool.h" - -#include "test/core/util/test_config.h" - -static const int kSmallThreadPoolSize = 20; -static const int kLargeThreadPoolSize = 100; -static const int kThreadSmallIter = 100; -static const int kThreadLargeIter = 10000; - -static void test_size_zero(void) { - gpr_log(GPR_INFO, "test_size_zero"); - grpc_core::ThreadPool* pool_size_zero = new grpc_core::ThreadPool(0); - GPR_ASSERT(pool_size_zero->pool_capacity() == 1); - delete pool_size_zero; -} - -static void test_constructor_option(void) { - gpr_log(GPR_INFO, "test_constructor_option"); - // Tests options - grpc_core::Thread::Options options; - options.set_stack_size(192 * 1024); // Random non-default value - grpc_core::ThreadPool* pool = - new grpc_core::ThreadPool(0, "test_constructor_option", options); - GPR_ASSERT(pool->thread_options().stack_size() == options.stack_size()); - delete pool; -} - -// Simple functor for testing. It will count how many times being called. -class SimpleFunctorForAdd : public grpc_completion_queue_functor { - public: - friend class SimpleFunctorCheckForAdd; - SimpleFunctorForAdd() { - functor_run = &SimpleFunctorForAdd::Run; - inlineable = true; - internal_next = this; - internal_success = 0; - } - ~SimpleFunctorForAdd() {} - static void Run(struct grpc_completion_queue_functor* cb, int /*ok*/) { - auto* callback = static_cast(cb); - callback->count_.fetch_add(1, std::memory_order_relaxed); - } - - int count() { return count_.load(std::memory_order_relaxed); } - - private: - std::atomic count_{0}; -}; - -static void test_add(void) { - gpr_log(GPR_INFO, "test_add"); - grpc_core::ThreadPool* pool = - new grpc_core::ThreadPool(kSmallThreadPoolSize, "test_add"); - - SimpleFunctorForAdd* functor = new SimpleFunctorForAdd(); - for (int i = 0; i < kThreadSmallIter; ++i) { - pool->Add(functor); - } - delete pool; - GPR_ASSERT(functor->count() == kThreadSmallIter); - delete functor; - gpr_log(GPR_DEBUG, "Done."); -} - -// Thread that adds closures to pool -class WorkThread { - public: - WorkThread(grpc_core::ThreadPool* pool, SimpleFunctorForAdd* cb, int num_add) - : num_add_(num_add), cb_(cb), pool_(pool) { - thd_ = grpc_core::Thread( - "thread_pool_test_add_thd", - [](void* th) { static_cast(th)->Run(); }, this); - } - ~WorkThread() {} - - void Start() { thd_.Start(); } - void Join() { thd_.Join(); } - - private: - void Run() { - for (int i = 0; i < num_add_; ++i) { - pool_->Add(cb_); - } - } - - int num_add_; - SimpleFunctorForAdd* cb_; - grpc_core::ThreadPool* pool_; - grpc_core::Thread thd_; -}; - -static void test_multi_add(void) { - gpr_log(GPR_INFO, "test_multi_add"); - const int num_work_thds = 10; - grpc_core::ThreadPool* pool = - new grpc_core::ThreadPool(kLargeThreadPoolSize, "test_multi_add"); - SimpleFunctorForAdd* functor = new SimpleFunctorForAdd(); - WorkThread** work_thds = static_cast( - gpr_zalloc(sizeof(WorkThread*) * num_work_thds)); - gpr_log(GPR_DEBUG, "Fork threads for adding..."); - for (int i = 0; i < num_work_thds; ++i) { - work_thds[i] = new WorkThread(pool, functor, kThreadLargeIter); - work_thds[i]->Start(); - } - // Wait for all threads finish - gpr_log(GPR_DEBUG, "Waiting for all work threads finish..."); - for (int i = 0; i < num_work_thds; ++i) { - work_thds[i]->Join(); - delete work_thds[i]; - } - gpr_free(work_thds); - gpr_log(GPR_DEBUG, "Done."); - gpr_log(GPR_DEBUG, "Waiting for all closures finish..."); - // Destructor of thread pool will wait for all closures to finish - delete pool; - GPR_ASSERT(functor->count() == kThreadLargeIter * num_work_thds); - delete functor; - gpr_log(GPR_DEBUG, "Done."); -} - -// Checks the current count with a given number. -class SimpleFunctorCheckForAdd : public grpc_completion_queue_functor { - public: - SimpleFunctorCheckForAdd(int ok, int* count) : count_(count) { - functor_run = &SimpleFunctorCheckForAdd::Run; - inlineable = true; - internal_success = ok; - } - ~SimpleFunctorCheckForAdd() {} - static void Run(struct grpc_completion_queue_functor* cb, int /*ok*/) { - auto* callback = static_cast(cb); - (*callback->count_)++; - GPR_ASSERT(*callback->count_ == callback->internal_success); - } - - private: - int* count_; -}; - -static void test_one_thread_FIFO(void) { - gpr_log(GPR_INFO, "test_one_thread_FIFO"); - int counter = 0; - grpc_core::ThreadPool* pool = - new grpc_core::ThreadPool(1, "test_one_thread_FIFO"); - SimpleFunctorCheckForAdd** check_functors = - static_cast( - gpr_zalloc(sizeof(SimpleFunctorCheckForAdd*) * kThreadSmallIter)); - for (int i = 0; i < kThreadSmallIter; ++i) { - check_functors[i] = new SimpleFunctorCheckForAdd(i + 1, &counter); - pool->Add(check_functors[i]); - } - // Destructor of pool will wait until all closures finished. - delete pool; - for (int i = 0; i < kThreadSmallIter; ++i) { - delete check_functors[i]; - } - gpr_free(check_functors); - gpr_log(GPR_DEBUG, "Done."); -} - -int main(int argc, char** argv) { - grpc::testing::TestEnvironment env(&argc, argv); - grpc_init(); - test_size_zero(); - test_constructor_option(); - test_add(); - test_multi_add(); - test_one_thread_FIFO(); - grpc_shutdown(); - return 0; -} diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index 1ef47a2dc6c..259457b0343 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -289,21 +289,6 @@ grpc_cc_test( deps = [":helpers"], ) -grpc_cc_test( - name = "bm_threadpool", - size = "large", - srcs = ["bm_threadpool.cc"], - args = grpc_benchmark_args(), - tags = [ - "manual", - "no_windows", - "notap", - ], - uses_event_engine = False, - uses_polling = False, - deps = [":helpers"], -) - grpc_cc_library( name = "bm_callback_test_service_impl", testonly = 1, diff --git a/test/cpp/microbenchmarks/bm_threadpool.cc b/test/cpp/microbenchmarks/bm_threadpool.cc deleted file mode 100644 index 9fb6b17cab8..00000000000 --- a/test/cpp/microbenchmarks/bm_threadpool.cc +++ /dev/null @@ -1,331 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include -#include - -#include - -#include - -#include "src/core/lib/iomgr/executor/threadpool.h" -#include "test/core/util/test_config.h" -#include "test/cpp/microbenchmarks/helpers.h" -#include "test/cpp/util/test_config.h" - -namespace grpc { -namespace testing { - -// This helper class allows a thread to block for a pre-specified number of -// actions. BlockingCounter has an initial non-negative count on initialization. -// Each call to DecrementCount will decrease the count by 1. When making a call -// to Wait, if the count is greater than 0, the thread will be blocked, until -// the count reaches 0. -class BlockingCounter { - public: - explicit BlockingCounter(int count) : count_(count) {} - void DecrementCount() { - std::lock_guard l(mu_); - count_--; - if (count_ == 0) cv_.notify_all(); - } - - void Wait() { - std::unique_lock l(mu_); - while (count_ > 0) { - cv_.wait(l); - } - } - - private: - int count_; - std::mutex mu_; - std::condition_variable cv_; -}; - -// This is a functor/closure class for threadpool microbenchmark. -// This functor (closure) class will add another functor into pool if the -// number passed in (num_add) is greater than 0. Otherwise, it will decrement -// the counter to indicate that task is finished. This functor will suicide at -// the end, therefore, no need for caller to do clean-ups. -class AddAnotherFunctor : public grpc_completion_queue_functor { - public: - AddAnotherFunctor(grpc_core::ThreadPool* pool, BlockingCounter* counter, - int num_add) - : pool_(pool), counter_(counter), num_add_(num_add) { - functor_run = &AddAnotherFunctor::Run; - inlineable = false; - internal_next = this; - internal_success = 0; - } - // When the functor gets to run in thread pool, it will take itself as first - // argument and internal_success as second one. - static void Run(grpc_completion_queue_functor* cb, int /*ok*/) { - auto* callback = static_cast(cb); - if (--callback->num_add_ > 0) { - callback->pool_->Add(new AddAnotherFunctor( - callback->pool_, callback->counter_, callback->num_add_)); - } else { - callback->counter_->DecrementCount(); - } - // Suicides. - delete callback; - } - - private: - grpc_core::ThreadPool* pool_; - BlockingCounter* counter_; - int num_add_; -}; - -template -static void ThreadPoolAddAnother(benchmark::State& state) { - const int num_iterations = state.range(0); - const int num_threads = state.range(1); - // Number of adds done by each closure. - const int num_add = num_iterations / kConcurrentFunctor; - grpc_core::ThreadPool pool(num_threads); - while (state.KeepRunningBatch(num_iterations)) { - BlockingCounter counter(kConcurrentFunctor); - for (int i = 0; i < kConcurrentFunctor; ++i) { - pool.Add(new AddAnotherFunctor(&pool, &counter, num_add)); - } - counter.Wait(); - } - state.SetItemsProcessed(state.iterations()); -} - -// First pair of arguments is range for number of iterations (num_iterations). -// Second pair of arguments is range for thread pool size (num_threads). -BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 1)->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 4)->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 8)->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 16) - ->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 32) - ->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 64) - ->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 128) - ->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 512) - ->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 2048) - ->RangePair(524288, 524288, 1, 1024); - -// A functor class that will delete self on end of running. -class SuicideFunctorForAdd : public grpc_completion_queue_functor { - public: - explicit SuicideFunctorForAdd(BlockingCounter* counter) : counter_(counter) { - functor_run = &SuicideFunctorForAdd::Run; - inlineable = false; - internal_next = this; - internal_success = 0; - } - - static void Run(grpc_completion_queue_functor* cb, int /*ok*/) { - // On running, the first argument would be itself. - auto* callback = static_cast(cb); - callback->counter_->DecrementCount(); - delete callback; - } - - private: - BlockingCounter* counter_; -}; - -// Performs the scenario of external thread(s) adding closures into pool. -static void BM_ThreadPoolExternalAdd(benchmark::State& state) { - static grpc_core::ThreadPool* external_add_pool = nullptr; - int thread_idx = state.thread_index(); - // Setup for each run of test. - if (thread_idx == 0) { - const int num_threads = state.range(1); - external_add_pool = new grpc_core::ThreadPool(num_threads); - } - const int num_iterations = state.range(0) / state.threads(); - while (state.KeepRunningBatch(num_iterations)) { - BlockingCounter counter(num_iterations); - for (int i = 0; i < num_iterations; ++i) { - external_add_pool->Add(new SuicideFunctorForAdd(&counter)); - } - counter.Wait(); - } - - // Teardown at the end of each test run. - if (thread_idx == 0) { - state.SetItemsProcessed(state.range(0)); - delete external_add_pool; - } -} -BENCHMARK(BM_ThreadPoolExternalAdd) - // First pair is range for number of iterations (num_iterations). - // Second pair is range for thread pool size (num_threads). - ->RangePair(524288, 524288, 1, 1024) - ->ThreadRange(1, 256); // Concurrent external thread(s) up to 256 - -// Functor (closure) that adds itself into pool repeatedly. By adding self, the -// overhead would be low and can measure the time of add more accurately. -class AddSelfFunctor : public grpc_completion_queue_functor { - public: - AddSelfFunctor(grpc_core::ThreadPool* pool, BlockingCounter* counter, - int num_add) - : pool_(pool), counter_(counter), num_add_(num_add) { - functor_run = &AddSelfFunctor::Run; - inlineable = false; - internal_next = this; - internal_success = 0; - } - // When the functor gets to run in thread pool, it will take itself as first - // argument and internal_success as second one. - static void Run(grpc_completion_queue_functor* cb, int /*ok*/) { - auto* callback = static_cast(cb); - if (--callback->num_add_ > 0) { - callback->pool_->Add(cb); - } else { - callback->counter_->DecrementCount(); - // Suicides. - delete callback; - } - } - - private: - grpc_core::ThreadPool* pool_; - BlockingCounter* counter_; - int num_add_; -}; - -template -static void ThreadPoolAddSelf(benchmark::State& state) { - const int num_iterations = state.range(0); - const int num_threads = state.range(1); - // Number of adds done by each closure. - const int num_add = num_iterations / kConcurrentFunctor; - grpc_core::ThreadPool pool(num_threads); - while (state.KeepRunningBatch(num_iterations)) { - BlockingCounter counter(kConcurrentFunctor); - for (int i = 0; i < kConcurrentFunctor; ++i) { - pool.Add(new AddSelfFunctor(&pool, &counter, num_add)); - } - counter.Wait(); - } - state.SetItemsProcessed(state.iterations()); -} - -// First pair of arguments is range for number of iterations (num_iterations). -// Second pair of arguments is range for thread pool size (num_threads). -BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 1)->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 4)->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 8)->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 16)->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 32)->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 64)->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 128)->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 512)->RangePair(524288, 524288, 1, 1024); -BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 2048)->RangePair(524288, 524288, 1, 1024); - -#if defined(__GNUC__) && !defined(SWIG) -#if defined(__i386__) || defined(__x86_64__) -#define CACHELINE_SIZE 64 -#elif defined(__powerpc64__) -#define CACHELINE_SIZE 128 -#elif defined(__aarch64__) -#define CACHELINE_SIZE 64 -#elif defined(__arm__) -#if defined(__ARM_ARCH_5T__) -#define CACHELINE_SIZE 32 -#elif defined(__ARM_ARCH_7A__) -#define CACHELINE_SIZE 64 -#endif -#endif -#ifndef CACHELINE_SIZE -#define CACHELINE_SIZE 64 -#endif -#endif - -// A functor (closure) that simulates closures with small but non-trivial amount -// of work. -class ShortWorkFunctorForAdd : public grpc_completion_queue_functor { - public: - BlockingCounter* counter_; - - ShortWorkFunctorForAdd() { - functor_run = &ShortWorkFunctorForAdd::Run; - inlineable = false; - internal_next = this; - internal_success = 0; - val_ = 0; - } - static void Run(grpc_completion_queue_functor* cb, int /*ok*/) { - auto* callback = static_cast(cb); - // Uses pad to avoid compiler complaining unused variable error. - callback->pad[0] = 0; - for (int i = 0; i < 1000; ++i) { - callback->val_++; - } - callback->counter_->DecrementCount(); - } - - private: - char pad[CACHELINE_SIZE]; - volatile int val_; -}; - -// Simulates workloads where many short running callbacks are added to the -// threadpool. The callbacks are not enough to keep all the workers busy -// continuously so the number of workers running changes overtime. -// -// In effect this tests how well the threadpool avoids spurious wakeups. -static void BM_SpikyLoad(benchmark::State& state) { - const int num_threads = state.range(0); - - const int kNumSpikes = 1000; - const int batch_size = 3 * num_threads; - std::vector work_vector(batch_size); - grpc_core::ThreadPool pool(num_threads); - while (state.KeepRunningBatch(kNumSpikes * batch_size)) { - for (int i = 0; i != kNumSpikes; ++i) { - BlockingCounter counter(batch_size); - for (auto& w : work_vector) { - w.counter_ = &counter; - pool.Add(&w); - } - counter.Wait(); - } - } - state.SetItemsProcessed(state.iterations() * batch_size); -} -BENCHMARK(BM_SpikyLoad)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16); - -} // namespace testing -} // namespace grpc - -// Some distros have RunSpecifiedBenchmarks under the benchmark namespace, -// and others do not. This allows us to support both modes. -namespace benchmark { -void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } -} // namespace benchmark - -int main(int argc, char* argv[]) { - grpc::testing::TestEnvironment env(&argc, argv); - LibraryInitializer libInit; - ::benchmark::Initialize(&argc, argv); - grpc::testing::InitTest(&argc, &argv, false); - benchmark::RunTheBenchmarksNamespaced(); - return 0; -} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 8a735f35bf3..9215aaed7d7 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2089,10 +2089,6 @@ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/exec_ctx.h \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/executor.h \ -src/core/lib/iomgr/executor/mpmcqueue.cc \ -src/core/lib/iomgr/executor/mpmcqueue.h \ -src/core/lib/iomgr/executor/threadpool.cc \ -src/core/lib/iomgr/executor/threadpool.h \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 7a84f5ef842..7a8d81f5231 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1883,10 +1883,6 @@ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/exec_ctx.h \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/executor.h \ -src/core/lib/iomgr/executor/mpmcqueue.cc \ -src/core/lib/iomgr/executor/mpmcqueue.h \ -src/core/lib/iomgr/executor/threadpool.cc \ -src/core/lib/iomgr/executor/threadpool.h \ src/core/lib/iomgr/fork_posix.cc \ src/core/lib/iomgr/fork_windows.cc \ src/core/lib/iomgr/gethostname.h \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index fca70e1d030..a7e2dd55e74 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -823,30 +823,6 @@ ], "uses_polling": false }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": false, - "language": "c", - "name": "mpmcqueue_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "uses_polling": false - }, { "args": [], "benchmark": false, @@ -1411,30 +1387,6 @@ ], "uses_polling": false }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": false, - "language": "c", - "name": "threadpool_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "uses_polling": false - }, { "args": [], "benchmark": false,