Replace LogicalThread with WorkSerializer

reviewable/pr21846/r1
Yash Tibrewal 5 years ago
parent 1715a811bb
commit 74309886bb
  1. 4
      BUILD
  2. 6
      BUILD.gn
  3. 86
      CMakeLists.txt
  4. 108
      Makefile
  5. 24
      build.yaml
  6. 2
      config.m4
  7. 2
      config.w32
  8. 4
      gRPC-C++.podspec
  9. 6
      gRPC-Core.podspec
  10. 4
      grpc.gemspec
  11. 10
      grpc.gyp
  12. 4
      package.xml
  13. 52
      src/core/lib/iomgr/logical_thread.h
  14. 67
      src/core/lib/iomgr/work_serializer.cc
  15. 74
      src/core/lib/iomgr/work_serializer.h
  16. 2
      src/python/grpcio/grpc_core_dependencies.py
  17. 4
      test/core/iomgr/BUILD
  18. 25
      test/core/iomgr/work_serializer_test.cc
  19. 2
      tools/doxygen/Doxyfile.c++.internal
  20. 4
      tools/doxygen/Doxyfile.core.internal
  21. 48
      tools/run_tests/generated/tests.json

@ -752,7 +752,6 @@ grpc_cc_library(
"src/core/lib/iomgr/is_epollexclusive_available.cc",
"src/core/lib/iomgr/load_file.cc",
"src/core/lib/iomgr/lockfree_event.cc",
"src/core/lib/iomgr/logical_thread.cc",
"src/core/lib/iomgr/polling_entity.cc",
"src/core/lib/iomgr/pollset.cc",
"src/core/lib/iomgr/pollset_custom.cc",
@ -804,6 +803,7 @@ grpc_cc_library(
"src/core/lib/iomgr/wakeup_fd_nospecial.cc",
"src/core/lib/iomgr/wakeup_fd_pipe.cc",
"src/core/lib/iomgr/wakeup_fd_posix.cc",
"src/core/lib/iomgr/work_serializer.cc",
"src/core/lib/json/json.cc",
"src/core/lib/json/json_reader.cc",
"src/core/lib/json/json_reader_new.cc",
@ -906,7 +906,6 @@ grpc_cc_library(
"src/core/lib/iomgr/is_epollexclusive_available.h",
"src/core/lib/iomgr/load_file.h",
"src/core/lib/iomgr/lockfree_event.h",
"src/core/lib/iomgr/logical_thread.h",
"src/core/lib/iomgr/nameser.h",
"src/core/lib/iomgr/polling_entity.h",
"src/core/lib/iomgr/pollset.h",
@ -949,6 +948,7 @@ grpc_cc_library(
"src/core/lib/iomgr/unix_sockets_posix.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/work_serializer.h",
"src/core/lib/json/json.h",
"src/core/lib/slice/b64.h",
"src/core/lib/slice/percent_encoding.h",

@ -620,8 +620,6 @@ config("grpc_config") {
"src/core/lib/iomgr/load_file.h",
"src/core/lib/iomgr/lockfree_event.cc",
"src/core/lib/iomgr/lockfree_event.h",
"src/core/lib/iomgr/logical_thread.cc",
"src/core/lib/iomgr/logical_thread.h",
"src/core/lib/iomgr/nameser.h",
"src/core/lib/iomgr/poller/eventmanager_libuv.cc",
"src/core/lib/iomgr/poller/eventmanager_libuv.h",
@ -715,6 +713,8 @@ config("grpc_config") {
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.cc",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/work_serializer.cc",
"src/core/lib/iomgr/work_serializer.h",
"src/core/lib/json/json.cc",
"src/core/lib/json/json.h",
"src/core/lib/json/json_reader.cc",
@ -1313,7 +1313,6 @@ config("grpc_config") {
"src/core/lib/iomgr/is_epollexclusive_available.h",
"src/core/lib/iomgr/load_file.h",
"src/core/lib/iomgr/lockfree_event.h",
"src/core/lib/iomgr/logical_thread.h",
"src/core/lib/iomgr/nameser.h",
"src/core/lib/iomgr/poller/eventmanager_libuv.h",
"src/core/lib/iomgr/polling_entity.h",
@ -1354,6 +1353,7 @@ config("grpc_config") {
"src/core/lib/iomgr/unix_sockets_posix.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/work_serializer.h",
"src/core/lib/json/json.h",
"src/core/lib/profiling/timers.h",
"src/core/lib/slice/b64.h",

@ -827,7 +827,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx json_run_localhost)
endif()
add_dependencies(buildtests_cxx json_test_new)
add_dependencies(buildtests_cxx logical_thread_test)
add_dependencies(buildtests_cxx message_allocator_end2end_test)
add_dependencies(buildtests_cxx metrics_client)
add_dependencies(buildtests_cxx mock_test)
@ -894,6 +893,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx transport_connectivity_state_test)
add_dependencies(buildtests_cxx transport_pid_controller_test)
add_dependencies(buildtests_cxx transport_security_common_api_test)
add_dependencies(buildtests_cxx work_serializer_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx writes_per_rpc_test)
endif()
@ -1070,7 +1070,6 @@ add_library(alts_test_util
src/core/lib/iomgr/is_epollexclusive_available.cc
src/core/lib/iomgr/load_file.cc
src/core/lib/iomgr/lockfree_event.cc
src/core/lib/iomgr/logical_thread.cc
src/core/lib/iomgr/poller/eventmanager_libuv.cc
src/core/lib/iomgr/polling_entity.cc
src/core/lib/iomgr/pollset.cc
@ -1124,6 +1123,7 @@ add_library(alts_test_util
src/core/lib/iomgr/wakeup_fd_nospecial.cc
src/core/lib/iomgr/wakeup_fd_pipe.cc
src/core/lib/iomgr/wakeup_fd_posix.cc
src/core/lib/iomgr/work_serializer.cc
src/core/lib/json/json.cc
src/core/lib/json/json_reader.cc
src/core/lib/json/json_reader_new.cc
@ -1560,7 +1560,6 @@ add_library(grpc
src/core/lib/iomgr/is_epollexclusive_available.cc
src/core/lib/iomgr/load_file.cc
src/core/lib/iomgr/lockfree_event.cc
src/core/lib/iomgr/logical_thread.cc
src/core/lib/iomgr/poller/eventmanager_libuv.cc
src/core/lib/iomgr/polling_entity.cc
src/core/lib/iomgr/pollset.cc
@ -1614,6 +1613,7 @@ add_library(grpc
src/core/lib/iomgr/wakeup_fd_nospecial.cc
src/core/lib/iomgr/wakeup_fd_pipe.cc
src/core/lib/iomgr/wakeup_fd_posix.cc
src/core/lib/iomgr/work_serializer.cc
src/core/lib/json/json.cc
src/core/lib/json/json_reader.cc
src/core/lib/json/json_reader_new.cc
@ -2046,7 +2046,6 @@ add_library(grpc_cronet
src/core/lib/iomgr/is_epollexclusive_available.cc
src/core/lib/iomgr/load_file.cc
src/core/lib/iomgr/lockfree_event.cc
src/core/lib/iomgr/logical_thread.cc
src/core/lib/iomgr/poller/eventmanager_libuv.cc
src/core/lib/iomgr/polling_entity.cc
src/core/lib/iomgr/pollset.cc
@ -2100,6 +2099,7 @@ add_library(grpc_cronet
src/core/lib/iomgr/wakeup_fd_nospecial.cc
src/core/lib/iomgr/wakeup_fd_pipe.cc
src/core/lib/iomgr/wakeup_fd_posix.cc
src/core/lib/iomgr/work_serializer.cc
src/core/lib/json/json.cc
src/core/lib/json/json_reader.cc
src/core/lib/json/json_reader_new.cc
@ -2477,7 +2477,6 @@ add_library(grpc_test_util
src/core/lib/iomgr/is_epollexclusive_available.cc
src/core/lib/iomgr/load_file.cc
src/core/lib/iomgr/lockfree_event.cc
src/core/lib/iomgr/logical_thread.cc
src/core/lib/iomgr/poller/eventmanager_libuv.cc
src/core/lib/iomgr/polling_entity.cc
src/core/lib/iomgr/pollset.cc
@ -2531,6 +2530,7 @@ add_library(grpc_test_util
src/core/lib/iomgr/wakeup_fd_nospecial.cc
src/core/lib/iomgr/wakeup_fd_pipe.cc
src/core/lib/iomgr/wakeup_fd_posix.cc
src/core/lib/iomgr/work_serializer.cc
src/core/lib/json/json.cc
src/core/lib/json/json_reader.cc
src/core/lib/json/json_reader_new.cc
@ -2822,7 +2822,6 @@ add_library(grpc_test_util_unsecure
src/core/lib/iomgr/is_epollexclusive_available.cc
src/core/lib/iomgr/load_file.cc
src/core/lib/iomgr/lockfree_event.cc
src/core/lib/iomgr/logical_thread.cc
src/core/lib/iomgr/poller/eventmanager_libuv.cc
src/core/lib/iomgr/polling_entity.cc
src/core/lib/iomgr/pollset.cc
@ -2876,6 +2875,7 @@ add_library(grpc_test_util_unsecure
src/core/lib/iomgr/wakeup_fd_nospecial.cc
src/core/lib/iomgr/wakeup_fd_pipe.cc
src/core/lib/iomgr/wakeup_fd_posix.cc
src/core/lib/iomgr/work_serializer.cc
src/core/lib/json/json.cc
src/core/lib/json/json_reader.cc
src/core/lib/json/json_reader_new.cc
@ -3143,7 +3143,6 @@ add_library(grpc_unsecure
src/core/lib/iomgr/is_epollexclusive_available.cc
src/core/lib/iomgr/load_file.cc
src/core/lib/iomgr/lockfree_event.cc
src/core/lib/iomgr/logical_thread.cc
src/core/lib/iomgr/poller/eventmanager_libuv.cc
src/core/lib/iomgr/polling_entity.cc
src/core/lib/iomgr/pollset.cc
@ -3197,6 +3196,7 @@ add_library(grpc_unsecure
src/core/lib/iomgr/wakeup_fd_nospecial.cc
src/core/lib/iomgr/wakeup_fd_pipe.cc
src/core/lib/iomgr/wakeup_fd_posix.cc
src/core/lib/iomgr/work_serializer.cc
src/core/lib/json/json.cc
src/core/lib/json/json_reader.cc
src/core/lib/json/json_reader_new.cc
@ -14050,42 +14050,6 @@ target_link_libraries(json_test_new
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(logical_thread_test
test/core/iomgr/logical_thread_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(logical_thread_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(logical_thread_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif()
if(gRPC_BUILD_TESTS)
@ -16061,6 +16025,42 @@ target_link_libraries(transport_security_common_api_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(work_serializer_test
test/core/iomgr/work_serializer_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(work_serializer_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(work_serializer_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

@ -1257,7 +1257,6 @@ interop_server: $(BINDIR)/$(CONFIG)/interop_server
interop_test: $(BINDIR)/$(CONFIG)/interop_test
json_run_localhost: $(BINDIR)/$(CONFIG)/json_run_localhost
json_test_new: $(BINDIR)/$(CONFIG)/json_test_new
logical_thread_test: $(BINDIR)/$(CONFIG)/logical_thread_test
message_allocator_end2end_test: $(BINDIR)/$(CONFIG)/message_allocator_end2end_test
metrics_client: $(BINDIR)/$(CONFIG)/metrics_client
mock_test: $(BINDIR)/$(CONFIG)/mock_test
@ -1308,6 +1307,7 @@ timer_test: $(BINDIR)/$(CONFIG)/timer_test
transport_connectivity_state_test: $(BINDIR)/$(CONFIG)/transport_connectivity_state_test
transport_pid_controller_test: $(BINDIR)/$(CONFIG)/transport_pid_controller_test
transport_security_common_api_test: $(BINDIR)/$(CONFIG)/transport_security_common_api_test
work_serializer_test: $(BINDIR)/$(CONFIG)/work_serializer_test
writes_per_rpc_test: $(BINDIR)/$(CONFIG)/writes_per_rpc_test
xds_bootstrap_test: $(BINDIR)/$(CONFIG)/xds_bootstrap_test
xds_end2end_test: $(BINDIR)/$(CONFIG)/xds_end2end_test
@ -1728,7 +1728,6 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/interop_test \
$(BINDIR)/$(CONFIG)/json_run_localhost \
$(BINDIR)/$(CONFIG)/json_test_new \
$(BINDIR)/$(CONFIG)/logical_thread_test \
$(BINDIR)/$(CONFIG)/message_allocator_end2end_test \
$(BINDIR)/$(CONFIG)/metrics_client \
$(BINDIR)/$(CONFIG)/mock_test \
@ -1779,6 +1778,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/transport_connectivity_state_test \
$(BINDIR)/$(CONFIG)/transport_pid_controller_test \
$(BINDIR)/$(CONFIG)/transport_security_common_api_test \
$(BINDIR)/$(CONFIG)/work_serializer_test \
$(BINDIR)/$(CONFIG)/writes_per_rpc_test \
$(BINDIR)/$(CONFIG)/xds_bootstrap_test \
$(BINDIR)/$(CONFIG)/xds_end2end_test \
@ -1904,7 +1904,6 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/interop_test \
$(BINDIR)/$(CONFIG)/json_run_localhost \
$(BINDIR)/$(CONFIG)/json_test_new \
$(BINDIR)/$(CONFIG)/logical_thread_test \
$(BINDIR)/$(CONFIG)/message_allocator_end2end_test \
$(BINDIR)/$(CONFIG)/metrics_client \
$(BINDIR)/$(CONFIG)/mock_test \
@ -1955,6 +1954,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/transport_connectivity_state_test \
$(BINDIR)/$(CONFIG)/transport_pid_controller_test \
$(BINDIR)/$(CONFIG)/transport_security_common_api_test \
$(BINDIR)/$(CONFIG)/work_serializer_test \
$(BINDIR)/$(CONFIG)/writes_per_rpc_test \
$(BINDIR)/$(CONFIG)/xds_bootstrap_test \
$(BINDIR)/$(CONFIG)/xds_end2end_test \
@ -2419,8 +2419,6 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/interop_test || ( echo test interop_test failed ; exit 1 )
$(E) "[RUN] Testing json_test_new"
$(Q) $(BINDIR)/$(CONFIG)/json_test_new || ( echo test json_test_new failed ; exit 1 )
$(E) "[RUN] Testing logical_thread_test"
$(Q) $(BINDIR)/$(CONFIG)/logical_thread_test || ( echo test logical_thread_test failed ; exit 1 )
$(E) "[RUN] Testing message_allocator_end2end_test"
$(Q) $(BINDIR)/$(CONFIG)/message_allocator_end2end_test || ( echo test message_allocator_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing mock_test"
@ -2505,6 +2503,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/transport_pid_controller_test || ( echo test transport_pid_controller_test failed ; exit 1 )
$(E) "[RUN] Testing transport_security_common_api_test"
$(Q) $(BINDIR)/$(CONFIG)/transport_security_common_api_test || ( echo test transport_security_common_api_test failed ; exit 1 )
$(E) "[RUN] Testing work_serializer_test"
$(Q) $(BINDIR)/$(CONFIG)/work_serializer_test || ( echo test work_serializer_test failed ; exit 1 )
$(E) "[RUN] Testing writes_per_rpc_test"
$(Q) $(BINDIR)/$(CONFIG)/writes_per_rpc_test || ( echo test writes_per_rpc_test failed ; exit 1 )
$(E) "[RUN] Testing xds_bootstrap_test"
@ -3584,7 +3584,6 @@ LIBALTS_TEST_UTIL_SRC = \
src/core/lib/iomgr/is_epollexclusive_available.cc \
src/core/lib/iomgr/load_file.cc \
src/core/lib/iomgr/lockfree_event.cc \
src/core/lib/iomgr/logical_thread.cc \
src/core/lib/iomgr/poller/eventmanager_libuv.cc \
src/core/lib/iomgr/polling_entity.cc \
src/core/lib/iomgr/pollset.cc \
@ -3638,6 +3637,7 @@ LIBALTS_TEST_UTIL_SRC = \
src/core/lib/iomgr/wakeup_fd_nospecial.cc \
src/core/lib/iomgr/wakeup_fd_pipe.cc \
src/core/lib/iomgr/wakeup_fd_posix.cc \
src/core/lib/iomgr/work_serializer.cc \
src/core/lib/json/json.cc \
src/core/lib/json/json_reader.cc \
src/core/lib/json/json_reader_new.cc \
@ -4042,7 +4042,6 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/is_epollexclusive_available.cc \
src/core/lib/iomgr/load_file.cc \
src/core/lib/iomgr/lockfree_event.cc \
src/core/lib/iomgr/logical_thread.cc \
src/core/lib/iomgr/poller/eventmanager_libuv.cc \
src/core/lib/iomgr/polling_entity.cc \
src/core/lib/iomgr/pollset.cc \
@ -4096,6 +4095,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/wakeup_fd_nospecial.cc \
src/core/lib/iomgr/wakeup_fd_pipe.cc \
src/core/lib/iomgr/wakeup_fd_posix.cc \
src/core/lib/iomgr/work_serializer.cc \
src/core/lib/json/json.cc \
src/core/lib/json/json_reader.cc \
src/core/lib/json/json_reader_new.cc \
@ -4520,7 +4520,6 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/iomgr/is_epollexclusive_available.cc \
src/core/lib/iomgr/load_file.cc \
src/core/lib/iomgr/lockfree_event.cc \
src/core/lib/iomgr/logical_thread.cc \
src/core/lib/iomgr/poller/eventmanager_libuv.cc \
src/core/lib/iomgr/polling_entity.cc \
src/core/lib/iomgr/pollset.cc \
@ -4574,6 +4573,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/iomgr/wakeup_fd_nospecial.cc \
src/core/lib/iomgr/wakeup_fd_pipe.cc \
src/core/lib/iomgr/wakeup_fd_posix.cc \
src/core/lib/iomgr/work_serializer.cc \
src/core/lib/json/json.cc \
src/core/lib/json/json_reader.cc \
src/core/lib/json/json_reader_new.cc \
@ -4942,7 +4942,6 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/iomgr/is_epollexclusive_available.cc \
src/core/lib/iomgr/load_file.cc \
src/core/lib/iomgr/lockfree_event.cc \
src/core/lib/iomgr/logical_thread.cc \
src/core/lib/iomgr/poller/eventmanager_libuv.cc \
src/core/lib/iomgr/polling_entity.cc \
src/core/lib/iomgr/pollset.cc \
@ -4996,6 +4995,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/iomgr/wakeup_fd_nospecial.cc \
src/core/lib/iomgr/wakeup_fd_pipe.cc \
src/core/lib/iomgr/wakeup_fd_posix.cc \
src/core/lib/iomgr/work_serializer.cc \
src/core/lib/json/json.cc \
src/core/lib/json/json_reader.cc \
src/core/lib/json/json_reader_new.cc \
@ -5273,7 +5273,6 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
src/core/lib/iomgr/is_epollexclusive_available.cc \
src/core/lib/iomgr/load_file.cc \
src/core/lib/iomgr/lockfree_event.cc \
src/core/lib/iomgr/logical_thread.cc \
src/core/lib/iomgr/poller/eventmanager_libuv.cc \
src/core/lib/iomgr/polling_entity.cc \
src/core/lib/iomgr/pollset.cc \
@ -5327,6 +5326,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
src/core/lib/iomgr/wakeup_fd_nospecial.cc \
src/core/lib/iomgr/wakeup_fd_pipe.cc \
src/core/lib/iomgr/wakeup_fd_posix.cc \
src/core/lib/iomgr/work_serializer.cc \
src/core/lib/json/json.cc \
src/core/lib/json/json_reader.cc \
src/core/lib/json/json_reader_new.cc \
@ -5567,7 +5567,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/is_epollexclusive_available.cc \
src/core/lib/iomgr/load_file.cc \
src/core/lib/iomgr/lockfree_event.cc \
src/core/lib/iomgr/logical_thread.cc \
src/core/lib/iomgr/poller/eventmanager_libuv.cc \
src/core/lib/iomgr/polling_entity.cc \
src/core/lib/iomgr/pollset.cc \
@ -5621,6 +5620,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/wakeup_fd_nospecial.cc \
src/core/lib/iomgr/wakeup_fd_pipe.cc \
src/core/lib/iomgr/wakeup_fd_posix.cc \
src/core/lib/iomgr/work_serializer.cc \
src/core/lib/json/json.cc \
src/core/lib/json/json_reader.cc \
src/core/lib/json/json_reader_new.cc \
@ -18410,49 +18410,6 @@ endif
endif
LOGICAL_THREAD_TEST_SRC = \
test/core/iomgr/logical_thread_test.cc \
LOGICAL_THREAD_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LOGICAL_THREAD_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/logical_thread_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/logical_thread_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/logical_thread_test: $(PROTOBUF_DEP) $(LOGICAL_THREAD_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(LOGICAL_THREAD_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/logical_thread_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/core/iomgr/logical_thread_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_logical_thread_test: $(LOGICAL_THREAD_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(LOGICAL_THREAD_TEST_OBJS:.o=.dep)
endif
endif
MESSAGE_ALLOCATOR_END2END_TEST_SRC = \
test/cpp/end2end/message_allocator_end2end_test.cc \
@ -20677,6 +20634,49 @@ endif
endif
WORK_SERIALIZER_TEST_SRC = \
test/core/iomgr/work_serializer_test.cc \
WORK_SERIALIZER_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(WORK_SERIALIZER_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/work_serializer_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/work_serializer_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/work_serializer_test: $(PROTOBUF_DEP) $(WORK_SERIALIZER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(WORK_SERIALIZER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/work_serializer_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/core/iomgr/work_serializer_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_work_serializer_test: $(WORK_SERIALIZER_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(WORK_SERIALIZER_TEST_OBJS:.o=.dep)
endif
endif
WRITES_PER_RPC_TEST_SRC = \
test/cpp/performance/writes_per_rpc_test.cc \

@ -716,7 +716,6 @@ filegroups:
- src/core/lib/iomgr/is_epollexclusive_available.cc
- src/core/lib/iomgr/load_file.cc
- src/core/lib/iomgr/lockfree_event.cc
- src/core/lib/iomgr/logical_thread.cc
- src/core/lib/iomgr/poller/eventmanager_libuv.cc
- src/core/lib/iomgr/polling_entity.cc
- src/core/lib/iomgr/pollset.cc
@ -770,6 +769,7 @@ filegroups:
- src/core/lib/iomgr/wakeup_fd_nospecial.cc
- src/core/lib/iomgr/wakeup_fd_pipe.cc
- src/core/lib/iomgr/wakeup_fd_posix.cc
- src/core/lib/iomgr/work_serializer.cc
- src/core/lib/json/json.cc
- src/core/lib/json/json_reader.cc
- src/core/lib/json/json_reader_new.cc
@ -898,7 +898,6 @@ filegroups:
- src/core/lib/iomgr/is_epollexclusive_available.h
- src/core/lib/iomgr/load_file.h
- src/core/lib/iomgr/lockfree_event.h
- src/core/lib/iomgr/logical_thread.h
- src/core/lib/iomgr/nameser.h
- src/core/lib/iomgr/poller/eventmanager_libuv.h
- src/core/lib/iomgr/polling_entity.h
@ -939,6 +938,7 @@ filegroups:
- src/core/lib/iomgr/unix_sockets_posix.h
- src/core/lib/iomgr/wakeup_fd_pipe.h
- src/core/lib/iomgr/wakeup_fd_posix.h
- src/core/lib/iomgr/work_serializer.h
- src/core/lib/json/json.h
- src/core/lib/slice/b64.h
- src/core/lib/slice/percent_encoding.h
@ -5406,16 +5406,6 @@ targets:
- grpc
- gpr
uses_polling: false
- name: logical_thread_test
cpu_cost: 10
build: test
language: c++
src:
- test/core/iomgr/logical_thread_test.cc
deps:
- grpc_test_util
- grpc
- gpr
- name: message_allocator_end2end_test
gtest: true
cpu_cost: 0.5
@ -6087,6 +6077,16 @@ targets:
- alts_test_util
- gpr
- grpc
- name: work_serializer_test
cpu_cost: 10
build: test
language: c++
src:
- test/core/iomgr/work_serializer_test.cc
deps:
- grpc_test_util
- grpc
- gpr
- name: writes_per_rpc_test
gtest: true
cpu_cost: 0.5

@ -292,7 +292,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/is_epollexclusive_available.cc \
src/core/lib/iomgr/load_file.cc \
src/core/lib/iomgr/lockfree_event.cc \
src/core/lib/iomgr/logical_thread.cc \
src/core/lib/iomgr/poller/eventmanager_libuv.cc \
src/core/lib/iomgr/polling_entity.cc \
src/core/lib/iomgr/pollset.cc \
@ -346,6 +345,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/wakeup_fd_nospecial.cc \
src/core/lib/iomgr/wakeup_fd_pipe.cc \
src/core/lib/iomgr/wakeup_fd_posix.cc \
src/core/lib/iomgr/work_serializer.cc \
src/core/lib/json/json.cc \
src/core/lib/json/json_reader.cc \
src/core/lib/json/json_reader_new.cc \

@ -261,7 +261,6 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\iomgr\\is_epollexclusive_available.cc " +
"src\\core\\lib\\iomgr\\load_file.cc " +
"src\\core\\lib\\iomgr\\lockfree_event.cc " +
"src\\core\\lib\\iomgr\\logical_thread.cc " +
"src\\core\\lib\\iomgr\\poller\\eventmanager_libuv.cc " +
"src\\core\\lib\\iomgr\\polling_entity.cc " +
"src\\core\\lib\\iomgr\\pollset.cc " +
@ -315,6 +314,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\iomgr\\wakeup_fd_nospecial.cc " +
"src\\core\\lib\\iomgr\\wakeup_fd_pipe.cc " +
"src\\core\\lib\\iomgr\\wakeup_fd_posix.cc " +
"src\\core\\lib\\iomgr\\work_serializer.cc " +
"src\\core\\lib\\json\\json.cc " +
"src\\core\\lib\\json\\json_reader.cc " +
"src\\core\\lib\\json\\json_reader_new.cc " +

@ -443,7 +443,6 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/is_epollexclusive_available.h',
'src/core/lib/iomgr/load_file.h',
'src/core/lib/iomgr/lockfree_event.h',
'src/core/lib/iomgr/logical_thread.h',
'src/core/lib/iomgr/nameser.h',
'src/core/lib/iomgr/poller/eventmanager_libuv.h',
'src/core/lib/iomgr/polling_entity.h',
@ -484,6 +483,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/unix_sockets_posix.h',
'src/core/lib/iomgr/wakeup_fd_pipe.h',
'src/core/lib/iomgr/wakeup_fd_posix.h',
'src/core/lib/iomgr/work_serializer.h',
'src/core/lib/json/json.h',
'src/core/lib/profiling/timers.h',
'src/core/lib/security/context/security_context.h',
@ -875,7 +875,6 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/is_epollexclusive_available.h',
'src/core/lib/iomgr/load_file.h',
'src/core/lib/iomgr/lockfree_event.h',
'src/core/lib/iomgr/logical_thread.h',
'src/core/lib/iomgr/nameser.h',
'src/core/lib/iomgr/poller/eventmanager_libuv.h',
'src/core/lib/iomgr/polling_entity.h',
@ -916,6 +915,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/unix_sockets_posix.h',
'src/core/lib/iomgr/wakeup_fd_pipe.h',
'src/core/lib/iomgr/wakeup_fd_posix.h',
'src/core/lib/iomgr/work_serializer.h',
'src/core/lib/json/json.h',
'src/core/lib/profiling/timers.h',
'src/core/lib/security/context/security_context.h',

@ -655,8 +655,6 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/load_file.h',
'src/core/lib/iomgr/lockfree_event.cc',
'src/core/lib/iomgr/lockfree_event.h',
'src/core/lib/iomgr/logical_thread.cc',
'src/core/lib/iomgr/logical_thread.h',
'src/core/lib/iomgr/nameser.h',
'src/core/lib/iomgr/poller/eventmanager_libuv.cc',
'src/core/lib/iomgr/poller/eventmanager_libuv.h',
@ -750,6 +748,8 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/wakeup_fd_pipe.h',
'src/core/lib/iomgr/wakeup_fd_posix.cc',
'src/core/lib/iomgr/wakeup_fd_posix.h',
'src/core/lib/iomgr/work_serializer.cc',
'src/core/lib/iomgr/work_serializer.h',
'src/core/lib/json/json.cc',
'src/core/lib/json/json.h',
'src/core/lib/json/json_reader.cc',
@ -1204,7 +1204,6 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/is_epollexclusive_available.h',
'src/core/lib/iomgr/load_file.h',
'src/core/lib/iomgr/lockfree_event.h',
'src/core/lib/iomgr/logical_thread.h',
'src/core/lib/iomgr/nameser.h',
'src/core/lib/iomgr/poller/eventmanager_libuv.h',
'src/core/lib/iomgr/polling_entity.h',
@ -1245,6 +1244,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/unix_sockets_posix.h',
'src/core/lib/iomgr/wakeup_fd_pipe.h',
'src/core/lib/iomgr/wakeup_fd_posix.h',
'src/core/lib/iomgr/work_serializer.h',
'src/core/lib/json/json.h',
'src/core/lib/profiling/timers.h',
'src/core/lib/security/context/security_context.h',

@ -578,8 +578,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/load_file.h )
s.files += %w( src/core/lib/iomgr/lockfree_event.cc )
s.files += %w( src/core/lib/iomgr/lockfree_event.h )
s.files += %w( src/core/lib/iomgr/logical_thread.cc )
s.files += %w( src/core/lib/iomgr/logical_thread.h )
s.files += %w( src/core/lib/iomgr/nameser.h )
s.files += %w( src/core/lib/iomgr/poller/eventmanager_libuv.cc )
s.files += %w( src/core/lib/iomgr/poller/eventmanager_libuv.h )
@ -673,6 +671,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/wakeup_fd_pipe.h )
s.files += %w( src/core/lib/iomgr/wakeup_fd_posix.cc )
s.files += %w( src/core/lib/iomgr/wakeup_fd_posix.h )
s.files += %w( src/core/lib/iomgr/work_serializer.cc )
s.files += %w( src/core/lib/iomgr/work_serializer.h )
s.files += %w( src/core/lib/json/json.cc )
s.files += %w( src/core/lib/json/json.h )
s.files += %w( src/core/lib/json/json_reader.cc )

@ -263,7 +263,6 @@
'src/core/lib/iomgr/is_epollexclusive_available.cc',
'src/core/lib/iomgr/load_file.cc',
'src/core/lib/iomgr/lockfree_event.cc',
'src/core/lib/iomgr/logical_thread.cc',
'src/core/lib/iomgr/poller/eventmanager_libuv.cc',
'src/core/lib/iomgr/polling_entity.cc',
'src/core/lib/iomgr/pollset.cc',
@ -317,6 +316,7 @@
'src/core/lib/iomgr/wakeup_fd_nospecial.cc',
'src/core/lib/iomgr/wakeup_fd_pipe.cc',
'src/core/lib/iomgr/wakeup_fd_posix.cc',
'src/core/lib/iomgr/work_serializer.cc',
'src/core/lib/json/json.cc',
'src/core/lib/json/json_reader.cc',
'src/core/lib/json/json_reader_new.cc',
@ -563,7 +563,6 @@
'src/core/lib/iomgr/is_epollexclusive_available.cc',
'src/core/lib/iomgr/load_file.cc',
'src/core/lib/iomgr/lockfree_event.cc',
'src/core/lib/iomgr/logical_thread.cc',
'src/core/lib/iomgr/poller/eventmanager_libuv.cc',
'src/core/lib/iomgr/polling_entity.cc',
'src/core/lib/iomgr/pollset.cc',
@ -617,6 +616,7 @@
'src/core/lib/iomgr/wakeup_fd_nospecial.cc',
'src/core/lib/iomgr/wakeup_fd_pipe.cc',
'src/core/lib/iomgr/wakeup_fd_posix.cc',
'src/core/lib/iomgr/work_serializer.cc',
'src/core/lib/json/json.cc',
'src/core/lib/json/json_reader.cc',
'src/core/lib/json/json_reader_new.cc',
@ -988,7 +988,6 @@
'src/core/lib/iomgr/is_epollexclusive_available.cc',
'src/core/lib/iomgr/load_file.cc',
'src/core/lib/iomgr/lockfree_event.cc',
'src/core/lib/iomgr/logical_thread.cc',
'src/core/lib/iomgr/poller/eventmanager_libuv.cc',
'src/core/lib/iomgr/polling_entity.cc',
'src/core/lib/iomgr/pollset.cc',
@ -1042,6 +1041,7 @@
'src/core/lib/iomgr/wakeup_fd_nospecial.cc',
'src/core/lib/iomgr/wakeup_fd_pipe.cc',
'src/core/lib/iomgr/wakeup_fd_posix.cc',
'src/core/lib/iomgr/work_serializer.cc',
'src/core/lib/json/json.cc',
'src/core/lib/json/json_reader.cc',
'src/core/lib/json/json_reader_new.cc',
@ -1253,7 +1253,6 @@
'src/core/lib/iomgr/is_epollexclusive_available.cc',
'src/core/lib/iomgr/load_file.cc',
'src/core/lib/iomgr/lockfree_event.cc',
'src/core/lib/iomgr/logical_thread.cc',
'src/core/lib/iomgr/poller/eventmanager_libuv.cc',
'src/core/lib/iomgr/polling_entity.cc',
'src/core/lib/iomgr/pollset.cc',
@ -1307,6 +1306,7 @@
'src/core/lib/iomgr/wakeup_fd_nospecial.cc',
'src/core/lib/iomgr/wakeup_fd_pipe.cc',
'src/core/lib/iomgr/wakeup_fd_posix.cc',
'src/core/lib/iomgr/work_serializer.cc',
'src/core/lib/json/json.cc',
'src/core/lib/json/json_reader.cc',
'src/core/lib/json/json_reader_new.cc',
@ -1494,7 +1494,6 @@
'src/core/lib/iomgr/is_epollexclusive_available.cc',
'src/core/lib/iomgr/load_file.cc',
'src/core/lib/iomgr/lockfree_event.cc',
'src/core/lib/iomgr/logical_thread.cc',
'src/core/lib/iomgr/poller/eventmanager_libuv.cc',
'src/core/lib/iomgr/polling_entity.cc',
'src/core/lib/iomgr/pollset.cc',
@ -1548,6 +1547,7 @@
'src/core/lib/iomgr/wakeup_fd_nospecial.cc',
'src/core/lib/iomgr/wakeup_fd_pipe.cc',
'src/core/lib/iomgr/wakeup_fd_posix.cc',
'src/core/lib/iomgr/work_serializer.cc',
'src/core/lib/json/json.cc',
'src/core/lib/json/json_reader.cc',
'src/core/lib/json/json_reader_new.cc',

@ -561,8 +561,6 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/load_file.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/lockfree_event.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/lockfree_event.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/logical_thread.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/logical_thread.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/nameser.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/poller/eventmanager_libuv.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/poller/eventmanager_libuv.h" role="src" />
@ -656,6 +654,8 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_pipe.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_posix.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/work_serializer.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/work_serializer.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/json/json.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/json/json.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/json/json_reader.cc" role="src" />

@ -1,52 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include <functional>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/ref_counted.h"
#ifndef GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H
#define GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H
namespace grpc_core {
extern DebugOnlyTraceFlag grpc_logical_thread_trace;
// LogicalThread is a mechanism to schedule callbacks in a synchronized manner.
// All callbacks scheduled on a LogicalThread instance will be executed serially
// in a borrowed thread. The API provides a FIFO guarantee to the execution of
// callbacks scheduled on the thread.
class LogicalThread : public RefCounted<LogicalThread> {
public:
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location);
private:
void DrainQueue();
Atomic<size_t> size_{0};
MultiProducerSingleConsumerQueue queue_;
};
} /* namespace grpc_core */
#endif /* GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H */

@ -18,11 +18,11 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/logical_thread.h"
#include "src/core/lib/iomgr/work_serializer.h"
namespace grpc_core {
DebugOnlyTraceFlag grpc_logical_thread_trace(false, "logical_thread");
DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer");
struct CallbackWrapper {
CallbackWrapper(std::function<void()> cb, const grpc_core::DebugLocation& loc)
@ -33,51 +33,74 @@ struct CallbackWrapper {
const DebugLocation location;
};
void LogicalThread::Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, "LogicalThread::Run() %p Scheduling callback [%s:%d]",
void WorkSerializerImpl::Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
this, location.file(), location.line());
}
const size_t prev_size = size_.FetchAdd(1);
if (prev_size == 0) {
// There is no other closure executing right now on this logical thread.
// The work serializer should not have been orphaned.
GPR_DEBUG_ASSERT(prev_size > 0);
if (prev_size == 1) {
// There is no other closure executing right now on this work serializer.
// Execute this closure immediately.
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Executing immediately");
}
callback();
// Loan this thread to the logical thread and drain the queue.
// Loan this thread to the work serializer thread and drain the queue.
DrainQueue();
} else {
CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location);
// There already are closures executing on this logical thread. Simply add
// There already are closures executing on this work serializer. Simply add
// this closure to the queue.
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper);
}
queue_.Push(&cb_wrapper->mpscq_node);
}
}
// The thread that calls this loans itself to the logical thread so as to
void WorkSerializerImpl::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
}
size_t prev_size = size_.FetchSub(1);
if (prev_size == 1) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Destroying");
}
delete this;
}
}
// The thread that calls this loans itself to the work serializer so as to
// execute all the scheduled callback. This is called from within
// LogicalThread::Run() after executing a callback immediately, and hence size_
// WorkSerializer::Run() after executing a callback immediately, and hence size_
// is atleast 1.
void LogicalThread::DrainQueue() {
void WorkSerializerImpl::DrainQueue() {
while (true) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, "LogicalThread::DrainQueue() %p", this);
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
}
size_t prev_size = size_.FetchSub(1);
// prev_size should be atleast 1 since
GPR_DEBUG_ASSERT(prev_size >= 1);
// It is possible that while draining the queue, one of the callbacks ended
// up orphaning the work serializer. In that case, delete the object.
if (prev_size == 1) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Queue Drained. Destroying");
}
delete this;
return;
}
if (prev_size == 2) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Queue Drained");
}
break;
return;
}
// There is atleast one callback on the queue. Pop the callback from the
// queue and execute it.
@ -87,11 +110,11 @@ void LogicalThread::DrainQueue() {
queue_.PopAndCheckEnd(&empty_unused))) == nullptr) {
// This can happen either due to a race condition within the mpscq
// implementation or because of a race with Run()
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Queue returned nullptr, trying again");
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Running item %p : callback scheduled at [%s:%d]",
cb_wrapper, cb_wrapper->location.file(),
cb_wrapper->location.line());

@ -0,0 +1,74 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include <functional>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#ifndef GRPC_CORE_LIB_IOMGR_WORK_SERIALIZER_H
#define GRPC_CORE_LIB_IOMGR_WORK_SERIALIZER_H
namespace grpc_core {
extern DebugOnlyTraceFlag grpc_work_serializer_trace;
class WorkSerializerImpl : public Orphanable {
public:
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location);
void Orphan() override;
private:
void DrainQueue();
// An initial size of 1 keeps track of whether the work serializer has been
// orphaned.
Atomic<size_t> size_{1};
MultiProducerSingleConsumerQueue queue_;
};
// WorkSerializer is a mechanism to schedule callbacks in a synchronized manner.
// All callbacks scheduled on a WorkSerializer instance will be executed
// serially in a borrowed thread. The API provides a FIFO guarantee to the
// execution of callbacks scheduled on the thread.
class WorkSerializer {
public:
WorkSerializer() { impl_ = MakeOrphanable<WorkSerializerImpl>(); }
// TODO(yashkt): Replace grpc_core::DebugLocation with absl::SourceLocation
// once we can start using it directly.
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
impl_->Run(callback, location);
}
private:
OrphanablePtr<WorkSerializerImpl> impl_;
};
} /* namespace grpc_core */
#endif /* GRPC_CORE_LIB_IOMGR_WORK_SERIALIZER_H */

@ -270,7 +270,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/is_epollexclusive_available.cc',
'src/core/lib/iomgr/load_file.cc',
'src/core/lib/iomgr/lockfree_event.cc',
'src/core/lib/iomgr/logical_thread.cc',
'src/core/lib/iomgr/poller/eventmanager_libuv.cc',
'src/core/lib/iomgr/polling_entity.cc',
'src/core/lib/iomgr/pollset.cc',
@ -324,6 +323,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/wakeup_fd_nospecial.cc',
'src/core/lib/iomgr/wakeup_fd_pipe.cc',
'src/core/lib/iomgr/wakeup_fd_posix.cc',
'src/core/lib/iomgr/work_serializer.cc',
'src/core/lib/json/json.cc',
'src/core/lib/json/json_reader.cc',
'src/core/lib/json/json_reader_new.cc',

@ -135,8 +135,8 @@ grpc_cc_test(
)
grpc_cc_test(
name = "logical_thread_test",
srcs = ["logical_thread_test.cc"],
name = "work_serializer_test",
srcs = ["work_serializer_test.cc"],
exec_properties = LARGE_MACHINE,
external_deps = [
"gtest",

@ -26,28 +26,25 @@
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/logical_thread.h"
#include "src/core/lib/iomgr/work_serializer.h"
#include "test/core/util/test_config.h"
namespace {
TEST(LogicalThreadTest, NoOp) {
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
}
TEST(WorkSerializerTest, NoOp) { grpc_core::WorkSerializer lock; }
TEST(LogicalThreadTest, ExecuteOne) {
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
TEST(WorkSerializerTest, ExecuteOne) {
grpc_core::WorkSerializer lock;
gpr_event done;
gpr_event_init(&done);
lock->Run([&done]() { gpr_event_set(&done, (void*)1); }, DEBUG_LOCATION);
lock.Run([&done]() { gpr_event_set(&done, (void*)1); }, DEBUG_LOCATION);
EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
}
class TestThread {
public:
explicit TestThread(grpc_core::RefCountedPtr<grpc_core::LogicalThread> lock)
: lock_(std::move(lock)),
thread_("grpc_execute_many", ExecuteManyLoop, this) {
explicit TestThread(grpc_core::WorkSerializer* lock)
: lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
gpr_event_init(&done_);
thread_.Start();
}
@ -86,18 +83,18 @@ class TestThread {
DEBUG_LOCATION);
}
grpc_core::RefCountedPtr<grpc_core::LogicalThread> lock_;
grpc_core::WorkSerializer* lock_ = nullptr;
grpc_core::Thread thread_;
size_t counter_ = 0;
gpr_event done_;
};
TEST(LogicalThreadTest, ExecuteMany) {
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
TEST(WorkSerializerTest, ExecuteMany) {
grpc_core::WorkSerializer lock;
{
std::vector<std::unique_ptr<TestThread>> threads;
for (size_t i = 0; i < 100; ++i) {
threads.push_back(std::unique_ptr<TestThread>(new TestThread(lock)));
threads.push_back(std::unique_ptr<TestThread>(new TestThread(&lock)));
}
}
}

@ -1160,7 +1160,6 @@ src/core/lib/iomgr/iomgr_posix.h \
src/core/lib/iomgr/is_epollexclusive_available.h \
src/core/lib/iomgr/load_file.h \
src/core/lib/iomgr/lockfree_event.h \
src/core/lib/iomgr/logical_thread.h \
src/core/lib/iomgr/nameser.h \
src/core/lib/iomgr/poller/eventmanager_libuv.h \
src/core/lib/iomgr/polling_entity.h \
@ -1201,6 +1200,7 @@ src/core/lib/iomgr/udp_server.h \
src/core/lib/iomgr/unix_sockets_posix.h \
src/core/lib/iomgr/wakeup_fd_pipe.h \
src/core/lib/iomgr/wakeup_fd_posix.h \
src/core/lib/iomgr/work_serializer.h \
src/core/lib/json/json.h \
src/core/lib/profiling/timers.h \
src/core/lib/slice/b64.h \

@ -1368,8 +1368,6 @@ src/core/lib/iomgr/load_file.cc \
src/core/lib/iomgr/load_file.h \
src/core/lib/iomgr/lockfree_event.cc \
src/core/lib/iomgr/lockfree_event.h \
src/core/lib/iomgr/logical_thread.cc \
src/core/lib/iomgr/logical_thread.h \
src/core/lib/iomgr/nameser.h \
src/core/lib/iomgr/poller/eventmanager_libuv.cc \
src/core/lib/iomgr/poller/eventmanager_libuv.h \
@ -1463,6 +1461,8 @@ src/core/lib/iomgr/wakeup_fd_pipe.cc \
src/core/lib/iomgr/wakeup_fd_pipe.h \
src/core/lib/iomgr/wakeup_fd_posix.cc \
src/core/lib/iomgr/wakeup_fd_posix.h \
src/core/lib/iomgr/work_serializer.cc \
src/core/lib/iomgr/work_serializer.h \
src/core/lib/json/json.cc \
src/core/lib/json/json.h \
src/core/lib/json/json_reader.cc \

@ -5043,30 +5043,6 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 10,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c++",
"name": "logical_thread_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
@ -6058,6 +6034,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 10,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c++",
"name": "work_serializer_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save