Merge pull request #20119 from markdroth/c++_mpscq

Convert mpscq API from C to C++.
pull/15577/head^2
Mark D. Roth 5 years ago committed by GitHub
commit d1bca6ff92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      BUILD
  2. 6
      BUILD.gn
  3. 75
      CMakeLists.txt
  4. 86
      Makefile
  5. 26
      build.yaml
  6. 2
      config.m4
  7. 2
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 6
      gRPC-Core.podspec
  10. 4
      grpc.gemspec
  11. 2
      grpc.gyp
  12. 4
      package.xml
  13. 117
      src/core/lib/gpr/mpscq.cc
  14. 88
      src/core/lib/gpr/mpscq.h
  15. 108
      src/core/lib/gprpp/mpscq.cc
  16. 98
      src/core/lib/gprpp/mpscq.h
  17. 9
      src/core/lib/iomgr/call_combiner.cc
  18. 4
      src/core/lib/iomgr/call_combiner.h
  19. 11
      src/core/lib/iomgr/closure.h
  20. 17
      src/core/lib/iomgr/combiner.cc
  21. 1
      src/core/lib/iomgr/combiner.h
  22. 23
      src/core/lib/surface/completion_queue.cc
  23. 5
      src/core/lib/surface/completion_queue.h
  24. 33
      src/core/lib/surface/server.cc
  25. 2
      src/python/grpcio/grpc_core_dependencies.py
  26. 11
      test/core/gpr/BUILD
  27. 11
      test/core/gprpp/BUILD
  28. 37
      test/core/gprpp/mpscq_test.cc
  29. 2
      tools/doxygen/Doxyfile.c++.internal
  30. 4
      tools/doxygen/Doxyfile.core.internal
  31. 48
      tools/run_tests/generated/tests.json

@ -472,7 +472,6 @@ grpc_cc_library(
"src/core/lib/gpr/log_linux.cc",
"src/core/lib/gpr/log_posix.cc",
"src/core/lib/gpr/log_windows.cc",
"src/core/lib/gpr/mpscq.cc",
"src/core/lib/gpr/murmur_hash.cc",
"src/core/lib/gpr/string.cc",
"src/core/lib/gpr/string_posix.cc",
@ -494,6 +493,7 @@ grpc_cc_library(
"src/core/lib/gprpp/fork.cc",
"src/core/lib/gprpp/global_config_env.cc",
"src/core/lib/gprpp/host_port.cc",
"src/core/lib/gprpp/mpscq.cc",
"src/core/lib/gprpp/thd_posix.cc",
"src/core/lib/gprpp/thd_windows.cc",
"src/core/lib/profiling/basic_timers.cc",
@ -503,7 +503,6 @@ grpc_cc_library(
"src/core/lib/gpr/alloc.h",
"src/core/lib/gpr/arena.h",
"src/core/lib/gpr/env.h",
"src/core/lib/gpr/mpscq.h",
"src/core/lib/gpr/murmur_hash.h",
"src/core/lib/gpr/spinlock.h",
"src/core/lib/gpr/string.h",
@ -527,6 +526,7 @@ grpc_cc_library(
"src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mpscq.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/string_view.h",
"src/core/lib/gprpp/sync.h",

@ -104,8 +104,6 @@ config("grpc_config") {
"src/core/lib/gpr/log_linux.cc",
"src/core/lib/gpr/log_posix.cc",
"src/core/lib/gpr/log_windows.cc",
"src/core/lib/gpr/mpscq.cc",
"src/core/lib/gpr/mpscq.h",
"src/core/lib/gpr/murmur_hash.cc",
"src/core/lib/gpr/murmur_hash.h",
"src/core/lib/gpr/spinlock.h",
@ -150,6 +148,8 @@ config("grpc_config") {
"src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mpscq.cc",
"src/core/lib/gprpp/mpscq.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/sync.h",
"src/core/lib/gprpp/thd.h",
@ -1226,7 +1226,6 @@ config("grpc_config") {
"src/core/lib/gpr/alloc.h",
"src/core/lib/gpr/arena.h",
"src/core/lib/gpr/env.h",
"src/core/lib/gpr/mpscq.h",
"src/core/lib/gpr/murmur_hash.h",
"src/core/lib/gpr/spinlock.h",
"src/core/lib/gpr/string.h",
@ -1252,6 +1251,7 @@ config("grpc_config") {
"src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mpscq.h",
"src/core/lib/gprpp/optional.h",
"src/core/lib/gprpp/orphanable.h",
"src/core/lib/gprpp/pair.h",

@ -308,7 +308,6 @@ add_dependencies(buildtests_c gpr_env_test)
add_dependencies(buildtests_c gpr_host_port_test)
add_dependencies(buildtests_c gpr_log_test)
add_dependencies(buildtests_c gpr_manual_constructor_test)
add_dependencies(buildtests_c gpr_mpscq_test)
add_dependencies(buildtests_c gpr_spinlock_test)
add_dependencies(buildtests_c gpr_string_test)
add_dependencies(buildtests_c gpr_sync_test)
@ -628,6 +627,7 @@ add_dependencies(buildtests_cxx generic_end2end_test)
add_dependencies(buildtests_cxx global_config_env_test)
add_dependencies(buildtests_cxx global_config_test)
add_dependencies(buildtests_cxx golden_file_test)
add_dependencies(buildtests_cxx gprpp_mpscq_test)
add_dependencies(buildtests_cxx grpc_alts_credentials_options_test)
add_dependencies(buildtests_cxx grpc_cli)
add_dependencies(buildtests_cxx grpc_core_map_test)
@ -864,7 +864,6 @@ add_library(gpr
src/core/lib/gpr/log_linux.cc
src/core/lib/gpr/log_posix.cc
src/core/lib/gpr/log_windows.cc
src/core/lib/gpr/mpscq.cc
src/core/lib/gpr/murmur_hash.cc
src/core/lib/gpr/string.cc
src/core/lib/gpr/string_posix.cc
@ -886,6 +885,7 @@ add_library(gpr
src/core/lib/gprpp/fork.cc
src/core/lib/gprpp/global_config_env.cc
src/core/lib/gprpp/host_port.cc
src/core/lib/gprpp/mpscq.cc
src/core/lib/gprpp/thd_posix.cc
src/core/lib/gprpp/thd_windows.cc
src/core/lib/profiling/basic_timers.cc
@ -7084,37 +7084,6 @@ target_link_libraries(gpr_manual_constructor_test
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(gpr_mpscq_test
test/core/gpr/mpscq_test.cc
)
target_include_directories(gpr_mpscq_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_UPB_GENERATED_DIR}
PRIVATE ${_gRPC_UPB_GRPC_GENERATED_DIR}
PRIVATE ${_gRPC_UPB_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(gpr_mpscq_test
${_gRPC_ALLTARGETS_LIBRARIES}
gpr
grpc_test_util_unsecure
grpc_unsecure
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
@ -13346,6 +13315,46 @@ target_link_libraries(golden_file_test
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(gprpp_mpscq_test
test/core/gprpp/mpscq_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(gprpp_mpscq_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_UPB_GENERATED_DIR}
PRIVATE ${_gRPC_UPB_GRPC_GENERATED_DIR}
PRIVATE ${_gRPC_UPB_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(gprpp_mpscq_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
gpr
grpc_test_util_unsecure
grpc_unsecure
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)

@ -1038,7 +1038,6 @@ gpr_env_test: $(BINDIR)/$(CONFIG)/gpr_env_test
gpr_host_port_test: $(BINDIR)/$(CONFIG)/gpr_host_port_test
gpr_log_test: $(BINDIR)/$(CONFIG)/gpr_log_test
gpr_manual_constructor_test: $(BINDIR)/$(CONFIG)/gpr_manual_constructor_test
gpr_mpscq_test: $(BINDIR)/$(CONFIG)/gpr_mpscq_test
gpr_spinlock_test: $(BINDIR)/$(CONFIG)/gpr_spinlock_test
gpr_string_test: $(BINDIR)/$(CONFIG)/gpr_string_test
gpr_sync_test: $(BINDIR)/$(CONFIG)/gpr_sync_test
@ -1219,6 +1218,7 @@ generic_end2end_test: $(BINDIR)/$(CONFIG)/generic_end2end_test
global_config_env_test: $(BINDIR)/$(CONFIG)/global_config_env_test
global_config_test: $(BINDIR)/$(CONFIG)/global_config_test
golden_file_test: $(BINDIR)/$(CONFIG)/golden_file_test
gprpp_mpscq_test: $(BINDIR)/$(CONFIG)/gprpp_mpscq_test
grpc_alts_credentials_options_test: $(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test
grpc_cli: $(BINDIR)/$(CONFIG)/grpc_cli
grpc_core_map_test: $(BINDIR)/$(CONFIG)/grpc_core_map_test
@ -1478,7 +1478,6 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/gpr_host_port_test \
$(BINDIR)/$(CONFIG)/gpr_log_test \
$(BINDIR)/$(CONFIG)/gpr_manual_constructor_test \
$(BINDIR)/$(CONFIG)/gpr_mpscq_test \
$(BINDIR)/$(CONFIG)/gpr_spinlock_test \
$(BINDIR)/$(CONFIG)/gpr_string_test \
$(BINDIR)/$(CONFIG)/gpr_sync_test \
@ -1698,6 +1697,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/global_config_env_test \
$(BINDIR)/$(CONFIG)/global_config_test \
$(BINDIR)/$(CONFIG)/golden_file_test \
$(BINDIR)/$(CONFIG)/gprpp_mpscq_test \
$(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test \
$(BINDIR)/$(CONFIG)/grpc_cli \
$(BINDIR)/$(CONFIG)/grpc_core_map_test \
@ -1867,6 +1867,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/global_config_env_test \
$(BINDIR)/$(CONFIG)/global_config_test \
$(BINDIR)/$(CONFIG)/golden_file_test \
$(BINDIR)/$(CONFIG)/gprpp_mpscq_test \
$(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test \
$(BINDIR)/$(CONFIG)/grpc_cli \
$(BINDIR)/$(CONFIG)/grpc_core_map_test \
@ -2048,8 +2049,6 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/gpr_log_test || ( echo test gpr_log_test failed ; exit 1 )
$(E) "[RUN] Testing gpr_manual_constructor_test"
$(Q) $(BINDIR)/$(CONFIG)/gpr_manual_constructor_test || ( echo test gpr_manual_constructor_test failed ; exit 1 )
$(E) "[RUN] Testing gpr_mpscq_test"
$(Q) $(BINDIR)/$(CONFIG)/gpr_mpscq_test || ( echo test gpr_mpscq_test failed ; exit 1 )
$(E) "[RUN] Testing gpr_spinlock_test"
$(Q) $(BINDIR)/$(CONFIG)/gpr_spinlock_test || ( echo test gpr_spinlock_test failed ; exit 1 )
$(E) "[RUN] Testing gpr_string_test"
@ -2376,6 +2375,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/global_config_test || ( echo test global_config_test failed ; exit 1 )
$(E) "[RUN] Testing golden_file_test"
$(Q) $(BINDIR)/$(CONFIG)/golden_file_test || ( echo test golden_file_test failed ; exit 1 )
$(E) "[RUN] Testing gprpp_mpscq_test"
$(Q) $(BINDIR)/$(CONFIG)/gprpp_mpscq_test || ( echo test gprpp_mpscq_test failed ; exit 1 )
$(E) "[RUN] Testing grpc_alts_credentials_options_test"
$(Q) $(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test || ( echo test grpc_alts_credentials_options_test failed ; exit 1 )
$(E) "[RUN] Testing grpc_core_map_test"
@ -3457,7 +3458,6 @@ LIBGPR_SRC = \
src/core/lib/gpr/log_linux.cc \
src/core/lib/gpr/log_posix.cc \
src/core/lib/gpr/log_windows.cc \
src/core/lib/gpr/mpscq.cc \
src/core/lib/gpr/murmur_hash.cc \
src/core/lib/gpr/string.cc \
src/core/lib/gpr/string_posix.cc \
@ -3479,6 +3479,7 @@ LIBGPR_SRC = \
src/core/lib/gprpp/fork.cc \
src/core/lib/gprpp/global_config_env.cc \
src/core/lib/gprpp/host_port.cc \
src/core/lib/gprpp/mpscq.cc \
src/core/lib/gprpp/thd_posix.cc \
src/core/lib/gprpp/thd_windows.cc \
src/core/lib/profiling/basic_timers.cc \
@ -9992,38 +9993,6 @@ endif
endif
GPR_MPSCQ_TEST_SRC = \
test/core/gpr/mpscq_test.cc \
GPR_MPSCQ_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GPR_MPSCQ_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/gpr_mpscq_test: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/gpr_mpscq_test: $(GPR_MPSCQ_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(GPR_MPSCQ_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/gpr_mpscq_test
endif
$(OBJDIR)/$(CONFIG)/test/core/gpr/mpscq_test.o: $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a
deps_gpr_mpscq_test: $(GPR_MPSCQ_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(GPR_MPSCQ_TEST_OBJS:.o=.dep)
endif
endif
GPR_SPINLOCK_TEST_SRC = \
test/core/gpr/spinlock_test.cc \
@ -16736,6 +16705,49 @@ endif
$(OBJDIR)/$(CONFIG)/test/cpp/codegen/golden_file_test.o: $(GENDIR)/src/proto/grpc/testing/compiler_test.pb.cc $(GENDIR)/src/proto/grpc/testing/compiler_test.grpc.pb.cc
GPRPP_MPSCQ_TEST_SRC = \
test/core/gprpp/mpscq_test.cc \
GPRPP_MPSCQ_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GPRPP_MPSCQ_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/gprpp_mpscq_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/gprpp_mpscq_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/gprpp_mpscq_test: $(PROTOBUF_DEP) $(GPRPP_MPSCQ_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(GPRPP_MPSCQ_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/gprpp_mpscq_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/core/gprpp/mpscq_test.o: $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a
deps_gprpp_mpscq_test: $(GPRPP_MPSCQ_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(GPRPP_MPSCQ_TEST_OBJS:.o=.dep)
endif
endif
GRPC_ALTS_CREDENTIALS_OPTIONS_TEST_SRC = \
test/core/security/grpc_alts_credentials_options_test.cc \

@ -217,7 +217,6 @@ filegroups:
- src/core/lib/gpr/log_linux.cc
- src/core/lib/gpr/log_posix.cc
- src/core/lib/gpr/log_windows.cc
- src/core/lib/gpr/mpscq.cc
- src/core/lib/gpr/murmur_hash.cc
- src/core/lib/gpr/string.cc
- src/core/lib/gpr/string_posix.cc
@ -239,6 +238,7 @@ filegroups:
- src/core/lib/gprpp/fork.cc
- src/core/lib/gprpp/global_config_env.cc
- src/core/lib/gprpp/host_port.cc
- src/core/lib/gprpp/mpscq.cc
- src/core/lib/gprpp/thd_posix.cc
- src/core/lib/gprpp/thd_windows.cc
- src/core/lib/profiling/basic_timers.cc
@ -268,7 +268,6 @@ filegroups:
- src/core/lib/gpr/alloc.h
- src/core/lib/gpr/arena.h
- src/core/lib/gpr/env.h
- src/core/lib/gpr/mpscq.h
- src/core/lib/gpr/murmur_hash.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gpr/string.h
@ -292,6 +291,7 @@ filegroups:
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/map.h
- src/core/lib/gprpp/memory.h
- src/core/lib/gprpp/mpscq.h
- src/core/lib/gprpp/pair.h
- src/core/lib/gprpp/sync.h
- src/core/lib/gprpp/thd.h
@ -2741,17 +2741,6 @@ targets:
- grpc_test_util_unsecure
- grpc_unsecure
uses_polling: false
- name: gpr_mpscq_test
cpu_cost: 30
build: test
language: c
src:
- test/core/gpr/mpscq_test.cc
deps:
- gpr
- grpc_test_util_unsecure
- grpc_unsecure
uses_polling: false
- name: gpr_spinlock_test
cpu_cost: 3
build: test
@ -5004,6 +4993,17 @@ targets:
args:
- --generated_file_path=gens/src/proto/grpc/testing/
uses_polling: false
- name: gprpp_mpscq_test
cpu_cost: 30
build: test
language: c++
src:
- test/core/gprpp/mpscq_test.cc
deps:
- gpr
- grpc_test_util_unsecure
- grpc_unsecure
uses_polling: false
- name: grpc_alts_credentials_options_test
build: test
language: c++

@ -63,7 +63,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/gpr/log_linux.cc \
src/core/lib/gpr/log_posix.cc \
src/core/lib/gpr/log_windows.cc \
src/core/lib/gpr/mpscq.cc \
src/core/lib/gpr/murmur_hash.cc \
src/core/lib/gpr/string.cc \
src/core/lib/gpr/string_posix.cc \
@ -85,6 +84,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/gprpp/fork.cc \
src/core/lib/gprpp/global_config_env.cc \
src/core/lib/gprpp/host_port.cc \
src/core/lib/gprpp/mpscq.cc \
src/core/lib/gprpp/thd_posix.cc \
src/core/lib/gprpp/thd_windows.cc \
src/core/lib/profiling/basic_timers.cc \

@ -33,7 +33,6 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\gpr\\log_linux.cc " +
"src\\core\\lib\\gpr\\log_posix.cc " +
"src\\core\\lib\\gpr\\log_windows.cc " +
"src\\core\\lib\\gpr\\mpscq.cc " +
"src\\core\\lib\\gpr\\murmur_hash.cc " +
"src\\core\\lib\\gpr\\string.cc " +
"src\\core\\lib\\gpr\\string_posix.cc " +
@ -55,6 +54,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\gprpp\\fork.cc " +
"src\\core\\lib\\gprpp\\global_config_env.cc " +
"src\\core\\lib\\gprpp\\host_port.cc " +
"src\\core\\lib\\gprpp\\mpscq.cc " +
"src\\core\\lib\\gprpp\\thd_posix.cc " +
"src\\core\\lib\\gprpp\\thd_windows.cc " +
"src\\core\\lib\\profiling\\basic_timers.cc " +

@ -288,7 +288,6 @@ Pod::Spec.new do |s|
'src/core/lib/gpr/alloc.h',
'src/core/lib/gpr/arena.h',
'src/core/lib/gpr/env.h',
'src/core/lib/gpr/mpscq.h',
'src/core/lib/gpr/murmur_hash.h',
'src/core/lib/gpr/spinlock.h',
'src/core/lib/gpr/string.h',
@ -312,6 +311,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mpscq.h',
'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/sync.h',
'src/core/lib/gprpp/thd.h',

@ -191,7 +191,6 @@ Pod::Spec.new do |s|
ss.source_files = 'src/core/lib/gpr/alloc.h',
'src/core/lib/gpr/arena.h',
'src/core/lib/gpr/env.h',
'src/core/lib/gpr/mpscq.h',
'src/core/lib/gpr/murmur_hash.h',
'src/core/lib/gpr/spinlock.h',
'src/core/lib/gpr/string.h',
@ -215,6 +214,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mpscq.h',
'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/sync.h',
'src/core/lib/gprpp/thd.h',
@ -233,7 +233,6 @@ Pod::Spec.new do |s|
'src/core/lib/gpr/log_linux.cc',
'src/core/lib/gpr/log_posix.cc',
'src/core/lib/gpr/log_windows.cc',
'src/core/lib/gpr/mpscq.cc',
'src/core/lib/gpr/murmur_hash.cc',
'src/core/lib/gpr/string.cc',
'src/core/lib/gpr/string_posix.cc',
@ -255,6 +254,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/fork.cc',
'src/core/lib/gprpp/global_config_env.cc',
'src/core/lib/gprpp/host_port.cc',
'src/core/lib/gprpp/mpscq.cc',
'src/core/lib/gprpp/thd_posix.cc',
'src/core/lib/gprpp/thd_windows.cc',
'src/core/lib/profiling/basic_timers.cc',
@ -967,7 +967,6 @@ Pod::Spec.new do |s|
ss.private_header_files = 'src/core/lib/gpr/alloc.h',
'src/core/lib/gpr/arena.h',
'src/core/lib/gpr/env.h',
'src/core/lib/gpr/mpscq.h',
'src/core/lib/gpr/murmur_hash.h',
'src/core/lib/gpr/spinlock.h',
'src/core/lib/gpr/string.h',
@ -991,6 +990,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mpscq.h',
'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/sync.h',
'src/core/lib/gprpp/thd.h',

@ -85,7 +85,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gpr/alloc.h )
s.files += %w( src/core/lib/gpr/arena.h )
s.files += %w( src/core/lib/gpr/env.h )
s.files += %w( src/core/lib/gpr/mpscq.h )
s.files += %w( src/core/lib/gpr/murmur_hash.h )
s.files += %w( src/core/lib/gpr/spinlock.h )
s.files += %w( src/core/lib/gpr/string.h )
@ -109,6 +108,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gprpp/manual_constructor.h )
s.files += %w( src/core/lib/gprpp/map.h )
s.files += %w( src/core/lib/gprpp/memory.h )
s.files += %w( src/core/lib/gprpp/mpscq.h )
s.files += %w( src/core/lib/gprpp/pair.h )
s.files += %w( src/core/lib/gprpp/sync.h )
s.files += %w( src/core/lib/gprpp/thd.h )
@ -127,7 +127,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gpr/log_linux.cc )
s.files += %w( src/core/lib/gpr/log_posix.cc )
s.files += %w( src/core/lib/gpr/log_windows.cc )
s.files += %w( src/core/lib/gpr/mpscq.cc )
s.files += %w( src/core/lib/gpr/murmur_hash.cc )
s.files += %w( src/core/lib/gpr/string.cc )
s.files += %w( src/core/lib/gpr/string_posix.cc )
@ -149,6 +148,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gprpp/fork.cc )
s.files += %w( src/core/lib/gprpp/global_config_env.cc )
s.files += %w( src/core/lib/gprpp/host_port.cc )
s.files += %w( src/core/lib/gprpp/mpscq.cc )
s.files += %w( src/core/lib/gprpp/thd_posix.cc )
s.files += %w( src/core/lib/gprpp/thd_windows.cc )
s.files += %w( src/core/lib/profiling/basic_timers.cc )

@ -218,7 +218,6 @@
'src/core/lib/gpr/log_linux.cc',
'src/core/lib/gpr/log_posix.cc',
'src/core/lib/gpr/log_windows.cc',
'src/core/lib/gpr/mpscq.cc',
'src/core/lib/gpr/murmur_hash.cc',
'src/core/lib/gpr/string.cc',
'src/core/lib/gpr/string_posix.cc',
@ -240,6 +239,7 @@
'src/core/lib/gprpp/fork.cc',
'src/core/lib/gprpp/global_config_env.cc',
'src/core/lib/gprpp/host_port.cc',
'src/core/lib/gprpp/mpscq.cc',
'src/core/lib/gprpp/thd_posix.cc',
'src/core/lib/gprpp/thd_windows.cc',
'src/core/lib/profiling/basic_timers.cc',

@ -90,7 +90,6 @@
<file baseinstalldir="/" name="src/core/lib/gpr/alloc.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/arena.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/env.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/mpscq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/murmur_hash.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/spinlock.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/string.h" role="src" />
@ -114,6 +113,7 @@
<file baseinstalldir="/" name="src/core/lib/gprpp/manual_constructor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/map.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/memory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/mpscq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/pair.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/sync.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/thd.h" role="src" />
@ -132,7 +132,6 @@
<file baseinstalldir="/" name="src/core/lib/gpr/log_linux.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/log_posix.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/log_windows.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/mpscq.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/murmur_hash.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/string.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/string_posix.cc" role="src" />
@ -154,6 +153,7 @@
<file baseinstalldir="/" name="src/core/lib/gprpp/fork.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/global_config_env.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/host_port.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/mpscq.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/thd_posix.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/thd_windows.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/profiling/basic_timers.cc" role="src" />

@ -1,117 +0,0 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/gpr/mpscq.h"
#include <grpc/support/log.h>
void gpr_mpscq_init(gpr_mpscq* q) {
gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub);
q->tail = &q->stub;
gpr_atm_no_barrier_store(&q->stub.next, (gpr_atm)NULL);
}
void gpr_mpscq_destroy(gpr_mpscq* q) {
GPR_ASSERT(gpr_atm_no_barrier_load(&q->head) == (gpr_atm)&q->stub);
GPR_ASSERT(q->tail == &q->stub);
}
bool gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n) {
gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
gpr_mpscq_node* prev =
(gpr_mpscq_node*)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
gpr_atm_rel_store(&prev->next, (gpr_atm)n);
return prev == &q->stub;
}
gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q) {
bool empty;
return gpr_mpscq_pop_and_check_end(q, &empty);
}
gpr_mpscq_node* gpr_mpscq_pop_and_check_end(gpr_mpscq* q, bool* empty) {
gpr_mpscq_node* tail = q->tail;
gpr_mpscq_node* next = (gpr_mpscq_node*)gpr_atm_acq_load(&tail->next);
if (tail == &q->stub) {
// indicates the list is actually (ephemerally) empty
if (next == nullptr) {
*empty = true;
return nullptr;
}
q->tail = next;
tail = next;
next = (gpr_mpscq_node*)gpr_atm_acq_load(&tail->next);
}
if (next != nullptr) {
*empty = false;
q->tail = next;
return tail;
}
gpr_mpscq_node* head = (gpr_mpscq_node*)gpr_atm_acq_load(&q->head);
if (tail != head) {
*empty = false;
// indicates a retry is in order: we're still adding
return nullptr;
}
gpr_mpscq_push(q, &q->stub);
next = (gpr_mpscq_node*)gpr_atm_acq_load(&tail->next);
if (next != nullptr) {
*empty = false;
q->tail = next;
return tail;
}
// indicates a retry is in order: we're still adding
*empty = false;
return nullptr;
}
void gpr_locked_mpscq_init(gpr_locked_mpscq* q) {
gpr_mpscq_init(&q->queue);
gpr_mu_init(&q->mu);
}
void gpr_locked_mpscq_destroy(gpr_locked_mpscq* q) {
gpr_mpscq_destroy(&q->queue);
gpr_mu_destroy(&q->mu);
}
bool gpr_locked_mpscq_push(gpr_locked_mpscq* q, gpr_mpscq_node* n) {
return gpr_mpscq_push(&q->queue, n);
}
gpr_mpscq_node* gpr_locked_mpscq_try_pop(gpr_locked_mpscq* q) {
if (gpr_mu_trylock(&q->mu)) {
gpr_mpscq_node* n = gpr_mpscq_pop(&q->queue);
gpr_mu_unlock(&q->mu);
return n;
}
return nullptr;
}
gpr_mpscq_node* gpr_locked_mpscq_pop(gpr_locked_mpscq* q) {
gpr_mu_lock(&q->mu);
bool empty = false;
gpr_mpscq_node* n;
do {
n = gpr_mpscq_pop_and_check_end(&q->queue, &empty);
} while (n == nullptr && !empty);
gpr_mu_unlock(&q->mu);
return n;
}

@ -1,88 +0,0 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_GPR_MPSCQ_H
#define GRPC_CORE_LIB_GPR_MPSCQ_H
#include <grpc/support/port_platform.h>
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
#include <stdbool.h>
#include <stddef.h>
// Multiple-producer single-consumer lock free queue, based upon the
// implementation from Dmitry Vyukov here:
// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
// List node (include this in a data structure at the top, and add application
// fields after it - to simulate inheritance)
typedef struct gpr_mpscq_node {
gpr_atm next;
} gpr_mpscq_node;
// Actual queue type
typedef struct gpr_mpscq {
// make sure head & tail don't share a cacheline
union {
char padding[GPR_CACHELINE_SIZE];
gpr_atm head;
};
gpr_mpscq_node* tail;
gpr_mpscq_node stub;
} gpr_mpscq;
void gpr_mpscq_init(gpr_mpscq* q);
void gpr_mpscq_destroy(gpr_mpscq* q);
// Push a node
// Thread safe - can be called from multiple threads concurrently
// Returns true if this was possibly the first node (may return true
// sporadically, will not return false sporadically)
bool gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
// Thread compatible - can only be called from one thread at a time
gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q);
// Pop a node; sets *empty to true if the queue is empty, or false if it is not
gpr_mpscq_node* gpr_mpscq_pop_and_check_end(gpr_mpscq* q, bool* empty);
// An mpscq with a lock: it's safe to pop from multiple threads, but doing
// only one thread will succeed concurrently
typedef struct gpr_locked_mpscq {
gpr_mpscq queue;
gpr_mu mu;
} gpr_locked_mpscq;
void gpr_locked_mpscq_init(gpr_locked_mpscq* q);
void gpr_locked_mpscq_destroy(gpr_locked_mpscq* q);
// Push a node
// Thread safe - can be called from multiple threads concurrently
// Returns true if this was possibly the first node (may return true
// sporadically, will not return false sporadically)
bool gpr_locked_mpscq_push(gpr_locked_mpscq* q, gpr_mpscq_node* n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
// Thread safe - can be called from multiple threads concurrently
gpr_mpscq_node* gpr_locked_mpscq_try_pop(gpr_locked_mpscq* q);
// Pop a node. Returns NULL only if the queue was empty at some point after
// calling this function
gpr_mpscq_node* gpr_locked_mpscq_pop(gpr_locked_mpscq* q);
#endif /* GRPC_CORE_LIB_GPR_MPSCQ_H */

@ -0,0 +1,108 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/mpscq.h"
namespace grpc_core {
//
// MultiProducerSingleConsumerQueue
//
bool MultiProducerSingleConsumerQueue::Push(Node* node) {
node->next.Store(nullptr, MemoryOrder::RELAXED);
Node* prev = head_.Exchange(node, MemoryOrder::ACQ_REL);
prev->next.Store(node, MemoryOrder::RELEASE);
return prev == &stub_;
}
MultiProducerSingleConsumerQueue::Node*
MultiProducerSingleConsumerQueue::Pop() {
bool empty;
return PopAndCheckEnd(&empty);
}
MultiProducerSingleConsumerQueue::Node*
MultiProducerSingleConsumerQueue::PopAndCheckEnd(bool* empty) {
Node* tail = tail_;
Node* next = tail_->next.Load(MemoryOrder::ACQUIRE);
if (tail == &stub_) {
// indicates the list is actually (ephemerally) empty
if (next == nullptr) {
*empty = true;
return nullptr;
}
tail_ = next;
tail = next;
next = tail->next.Load(MemoryOrder::ACQUIRE);
}
if (next != nullptr) {
*empty = false;
tail_ = next;
return tail;
}
Node* head = head_.Load(MemoryOrder::ACQUIRE);
if (tail != head) {
*empty = false;
// indicates a retry is in order: we're still adding
return nullptr;
}
Push(&stub_);
next = tail->next.Load(MemoryOrder::ACQUIRE);
if (next != nullptr) {
*empty = false;
tail_ = next;
return tail;
}
// indicates a retry is in order: we're still adding
*empty = false;
return nullptr;
}
//
// LockedMultiProducerSingleConsumerQueue
//
bool LockedMultiProducerSingleConsumerQueue::Push(Node* node) {
return queue_.Push(node);
}
LockedMultiProducerSingleConsumerQueue::Node*
LockedMultiProducerSingleConsumerQueue::TryPop() {
if (gpr_mu_trylock(mu_.get())) {
Node* node = queue_.Pop();
gpr_mu_unlock(mu_.get());
return node;
}
return nullptr;
}
LockedMultiProducerSingleConsumerQueue::Node*
LockedMultiProducerSingleConsumerQueue::Pop() {
MutexLock lock(&mu_);
bool empty = false;
Node* node;
do {
node = queue_.PopAndCheckEnd(&empty);
} while (node == nullptr && !empty);
return node;
}
} // namespace grpc_core

@ -0,0 +1,98 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_GPRPP_MPSCQ_H
#define GRPC_CORE_LIB_GPRPP_MPSCQ_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/sync.h"
#include <grpc/support/log.h>
namespace grpc_core {
// Multiple-producer single-consumer lock free queue, based upon the
// implementation from Dmitry Vyukov here:
// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
class MultiProducerSingleConsumerQueue {
public:
// List node. Application node types can inherit from this.
struct Node {
Atomic<Node*> next;
};
MultiProducerSingleConsumerQueue() : head_{&stub_}, tail_(&stub_) {}
~MultiProducerSingleConsumerQueue() {
GPR_ASSERT(head_.Load(MemoryOrder::RELAXED) == &stub_);
GPR_ASSERT(tail_ == &stub_);
}
// Push a node
// Thread safe - can be called from multiple threads concurrently
// Returns true if this was possibly the first node (may return true
// sporadically, will not return false sporadically)
bool Push(Node* node);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
// Thread compatible - can only be called from one thread at a time
Node* Pop();
// Pop a node; sets *empty to true if the queue is empty, or false if it is
// not.
Node* PopAndCheckEnd(bool* empty);
private:
// make sure head & tail don't share a cacheline
union {
char padding_[GPR_CACHELINE_SIZE];
Atomic<Node*> head_;
};
Node* tail_;
Node stub_;
};
// An mpscq with a lock: it's safe to pop from multiple threads, but doing
// only one thread will succeed concurrently.
class LockedMultiProducerSingleConsumerQueue {
public:
typedef MultiProducerSingleConsumerQueue::Node Node;
// Push a node
// Thread safe - can be called from multiple threads concurrently
// Returns true if this was possibly the first node (may return true
// sporadically, will not return false sporadically)
bool Push(Node* node);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
// Thread safe - can be called from multiple threads concurrently
Node* TryPop();
// Pop a node. Returns NULL only if the queue was empty at some point after
// calling this function
Node* Pop();
private:
MultiProducerSingleConsumerQueue queue_;
Mutex mu_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_GPRPP_MPSCQ_H */

@ -48,7 +48,6 @@ gpr_atm EncodeCancelStateError(grpc_error* error) {
CallCombiner::CallCombiner() {
gpr_atm_no_barrier_store(&cancel_state_, 0);
gpr_atm_no_barrier_store(&size_, 0);
gpr_mpscq_init(&queue_);
#ifdef GRPC_TSAN_ENABLED
GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this,
grpc_schedule_on_exec_ctx);
@ -56,7 +55,6 @@ CallCombiner::CallCombiner() {
}
CallCombiner::~CallCombiner() {
gpr_mpscq_destroy(&queue_);
GRPC_ERROR_UNREF(DecodeCancelStateError(cancel_state_));
}
@ -140,7 +138,8 @@ void CallCombiner::Start(grpc_closure* closure, grpc_error* error,
}
// Queue was not empty, so add closure to queue.
closure->error_data.error = error;
gpr_mpscq_push(&queue_, reinterpret_cast<gpr_mpscq_node*>(closure));
queue_.Push(
reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
}
}
@ -163,8 +162,8 @@ void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
gpr_log(GPR_INFO, " checking queue");
}
bool empty;
grpc_closure* closure = reinterpret_cast<grpc_closure*>(
gpr_mpscq_pop_and_check_end(&queue_, &empty));
grpc_closure* closure =
reinterpret_cast<grpc_closure*>(queue_.PopAndCheckEnd(&empty));
if (closure == nullptr) {
// This can happen either due to a race condition within the mpscq
// code or because of a race with Start().

@ -25,8 +25,8 @@
#include <grpc/support/atm.h>
#include "src/core/lib/gpr/mpscq.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
@ -108,7 +108,7 @@ class CallCombiner {
#endif
gpr_atm size_ = 0; // size_t, num closures in queue or currently executing
gpr_mpscq queue_;
MultiProducerSingleConsumerQueue queue_;
// Either 0 (if not cancelled and no cancellation closure set),
// a grpc_closure* (if the lowest bit is 0),
// or a grpc_error* (if the lowest bit is 1).

@ -22,10 +22,13 @@
#include <grpc/support/port_platform.h>
#include <assert.h>
#include <stdbool.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <stdbool.h>
#include "src/core/lib/gpr/mpscq.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/profiling/timers.h"
@ -69,7 +72,9 @@ struct grpc_closure {
* space */
union {
grpc_closure* next;
gpr_mpscq_node atm_next;
grpc_core::ManualConstructor<
grpc_core::MultiProducerSingleConsumerQueue::Node>
mpscq_node;
uintptr_t scratch;
} next_data;

@ -28,6 +28,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/profiling/timers.h"
@ -45,10 +46,10 @@ grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner");
#define STATE_ELEM_COUNT_LOW_BIT 2
struct grpc_combiner {
grpc_combiner* next_combiner_on_this_exec_ctx;
grpc_combiner* next_combiner_on_this_exec_ctx = nullptr;
grpc_closure_scheduler scheduler;
grpc_closure_scheduler finally_scheduler;
gpr_mpscq queue;
grpc_core::MultiProducerSingleConsumerQueue queue;
// either:
// a pointer to the initiating exec ctx if that is the only exec_ctx that has
// ever queued to this combiner, or NULL. If this is non-null, it's not
@ -58,7 +59,7 @@ struct grpc_combiner {
// lower bit - zero if orphaned (STATE_UNORPHANED)
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state;
bool time_to_execute_final_list;
bool time_to_execute_final_list = false;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
@ -76,12 +77,11 @@ static const grpc_closure_scheduler_vtable finally_scheduler = {
static void offload(void* arg, grpc_error* error);
grpc_combiner* grpc_combiner_create(void) {
grpc_combiner* lock = static_cast<grpc_combiner*>(gpr_zalloc(sizeof(*lock)));
grpc_combiner* lock = grpc_core::New<grpc_combiner>();
gpr_ref_init(&lock->refs, 1);
lock->scheduler.vtable = &scheduler;
lock->finally_scheduler.vtable = &finally_scheduler;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
GRPC_CLOSURE_INIT(
&lock->offload, offload, lock,
@ -93,8 +93,7 @@ grpc_combiner* grpc_combiner_create(void) {
static void really_destroy(grpc_combiner* lock) {
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p really_destroy", lock));
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
gpr_mpscq_destroy(&lock->queue);
gpr_free(lock);
grpc_core::Delete(lock);
}
static void start_destroy(grpc_combiner* lock) {
@ -185,7 +184,7 @@ static void combiner_exec(grpc_closure* cl, grpc_error* error) {
GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
assert(cl->cb);
cl->error_data.error = error;
gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
lock->queue.Push(cl->next_data.mpscq_node.get());
}
static void move_next() {
@ -249,7 +248,7 @@ bool grpc_combiner_continue_exec_ctx() {
// peek to see if something new has shown up, and execute that with
// priority
(gpr_atm_acq_load(&lock->state) >> 1) > 1) {
gpr_mpscq_node* n = gpr_mpscq_pop(&lock->queue);
grpc_core::MultiProducerSingleConsumerQueue::Node* n = lock->queue.Pop();
GRPC_COMBINER_TRACE(
gpr_log(GPR_INFO, "C:%p maybe_finish_one n=%p", lock, n));
if (n == nullptr) {

@ -25,7 +25,6 @@
#include <grpc/support/atm.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/mpscq.h"
#include "src/core/lib/iomgr/exec_ctx.h"
// Provides serialized access to some resource.

@ -210,14 +210,14 @@ struct cq_vtable {
namespace {
/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
* (a lockfree multiproducer single consumer queue). It uses a queue_lock
* to support multiple consumers.
/* Queue that holds the cq_completion_events. Internally uses
* MultiProducerSingleConsumerQueue (a lockfree multiproducer single consumer
* queue). It uses a queue_lock to support multiple consumers.
* Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
class CqEventQueue {
public:
CqEventQueue() { gpr_mpscq_init(&queue_); }
~CqEventQueue() { gpr_mpscq_destroy(&queue_); }
CqEventQueue() = default;
~CqEventQueue() = default;
/* Note: The counter is not incremented/decremented atomically with push/pop.
* The count is only eventually consistent */
@ -232,7 +232,7 @@ class CqEventQueue {
/* Spinlock to serialize consumers i.e pop() operations */
gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER;
gpr_mpscq queue_;
grpc_core::MultiProducerSingleConsumerQueue queue_;
/* A lazy counter of number of items in the queue. This is NOT atomically
incremented/decremented along with push/pop operations and hence is only
@ -462,7 +462,8 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
}
bool CqEventQueue::Push(grpc_cq_completion* c) {
gpr_mpscq_push(&queue_, reinterpret_cast<gpr_mpscq_node*>(c));
queue_.Push(
reinterpret_cast<grpc_core::MultiProducerSingleConsumerQueue::Node*>(c));
return num_queue_items_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED) == 0;
}
@ -473,8 +474,7 @@ grpc_cq_completion* CqEventQueue::Pop() {
GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
bool is_empty = false;
c = reinterpret_cast<grpc_cq_completion*>(
gpr_mpscq_pop_and_check_end(&queue_, &is_empty));
c = reinterpret_cast<grpc_cq_completion*>(queue_.PopAndCheckEnd(&is_empty));
gpr_spinlock_unlock(&queue_lock_);
if (c == nullptr && !is_empty) {
@ -1007,8 +1007,9 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) == 0) {
/* Before returning, check if the queue has any items left over (since
gpr_mpscq_pop() can sometimes return NULL even if the queue is not
empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
MultiProducerSingleConsumerQueue::Pop() can sometimes return NULL
even if the queue is not empty. If so, keep retrying but do not
return GRPC_QUEUE_SHUTDOWN */
if (cqd->queue.num_items() > 0) {
/* Go to the beginning of the loop. No point doing a poll because
(cq->shutdown == true) is only possible when there is no pending

@ -24,8 +24,10 @@
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/pollset.h"
/* These trace flags default to 1. The corresponding lines are only traced
@ -36,7 +38,8 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags;
extern grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount;
typedef struct grpc_cq_completion {
gpr_mpscq_node node;
grpc_core::ManualConstructor<grpc_core::MultiProducerSingleConsumerQueue>
node;
/** user supplied tag */
void* tag;

@ -34,9 +34,9 @@
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/mpscq.h"
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
@ -50,6 +50,8 @@
grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel");
using grpc_core::LockedMultiProducerSingleConsumerQueue;
static void server_on_recv_initial_metadata(void* ptr, grpc_error* error);
static void server_recv_trailing_metadata_ready(void* user_data,
grpc_error* error);
@ -70,7 +72,9 @@ enum requested_call_type { BATCH_CALL, REGISTERED_CALL };
struct registered_method;
struct requested_call {
gpr_mpscq_node request_link; /* must be first */
grpc_core::ManualConstructor<
grpc_core::MultiProducerSingleConsumerQueue::Node>
mpscq_node;
requested_call_type type;
size_t cq_idx;
void* tag;
@ -198,7 +202,7 @@ struct request_matcher {
grpc_server* server;
call_data* pending_head;
call_data* pending_tail;
gpr_locked_mpscq* requests_per_cq;
LockedMultiProducerSingleConsumerQueue* requests_per_cq;
};
struct registered_method {
@ -350,17 +354,17 @@ static void channel_broadcaster_shutdown(channel_broadcaster* cb,
static void request_matcher_init(request_matcher* rm, grpc_server* server) {
rm->server = server;
rm->pending_head = rm->pending_tail = nullptr;
rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(
rm->requests_per_cq = static_cast<LockedMultiProducerSingleConsumerQueue*>(
gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
for (size_t i = 0; i < server->cq_count; i++) {
gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
new (&rm->requests_per_cq[i]) LockedMultiProducerSingleConsumerQueue();
}
}
static void request_matcher_destroy(request_matcher* rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) {
GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);
gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
GPR_ASSERT(rm->requests_per_cq[i].Pop() == nullptr);
rm->requests_per_cq[i].~LockedMultiProducerSingleConsumerQueue();
}
gpr_free(rm->requests_per_cq);
}
@ -389,7 +393,7 @@ static void request_matcher_kill_requests(grpc_server* server,
requested_call* rc;
for (size_t i = 0; i < server->cq_count; i++) {
while ((rc = reinterpret_cast<requested_call*>(
gpr_locked_mpscq_pop(&rm->requests_per_cq[i]))) != nullptr) {
rm->requests_per_cq[i].Pop())) != nullptr) {
fail_call(server, i, rc, GRPC_ERROR_REF(error));
}
}
@ -534,8 +538,8 @@ static void publish_new_rpc(void* arg, grpc_error* error) {
for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
requested_call* rc = reinterpret_cast<requested_call*>(
gpr_locked_mpscq_try_pop(&rm->requests_per_cq[cq_idx]));
requested_call* rc =
reinterpret_cast<requested_call*>(rm->requests_per_cq[cq_idx].TryPop());
if (rc == nullptr) {
continue;
} else {
@ -556,8 +560,8 @@ static void publish_new_rpc(void* arg, grpc_error* error) {
// added to the pending list.
for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
requested_call* rc = reinterpret_cast<requested_call*>(
gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
requested_call* rc =
reinterpret_cast<requested_call*>(rm->requests_per_cq[cq_idx].Pop());
if (rc == nullptr) {
continue;
} else {
@ -1430,13 +1434,12 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
rm = &rc->data.registered.method->matcher;
break;
}
if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
if (rm->requests_per_cq[cq_idx].Push(rc->mpscq_node.get())) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != nullptr) {
rc = reinterpret_cast<requested_call*>(
gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
rc = reinterpret_cast<requested_call*>(rm->requests_per_cq[cq_idx].Pop());
if (rc == nullptr) break;
rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);

@ -32,7 +32,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/gpr/log_linux.cc',
'src/core/lib/gpr/log_posix.cc',
'src/core/lib/gpr/log_windows.cc',
'src/core/lib/gpr/mpscq.cc',
'src/core/lib/gpr/murmur_hash.cc',
'src/core/lib/gpr/string.cc',
'src/core/lib/gpr/string_posix.cc',
@ -54,6 +53,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/gprpp/fork.cc',
'src/core/lib/gprpp/global_config_env.cc',
'src/core/lib/gprpp/host_port.cc',
'src/core/lib/gprpp/mpscq.cc',
'src/core/lib/gprpp/thd_posix.cc',
'src/core/lib/gprpp/thd_windows.cc',
'src/core/lib/profiling/basic_timers.cc',

@ -68,17 +68,6 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "mpscq_test",
srcs = ["mpscq_test.cc"],
exec_compatible_with = ["//third_party/toolchains/machine_size:large"],
language = "C++",
deps = [
"//:gpr",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "murmur_hash_test",
srcs = ["murmur_hash_test.cc"],

@ -113,6 +113,17 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "mpscq_test",
srcs = ["mpscq_test.cc"],
exec_compatible_with = ["//third_party/toolchains/machine_size:large"],
language = "C++",
deps = [
"//:gpr",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "optional_test",
srcs = ["optional_test.cc"],

@ -16,7 +16,7 @@
*
*/
#include "src/core/lib/gpr/mpscq.h"
#include "src/core/lib/gprpp/mpscq.h"
#include <inttypes.h>
#include <stdlib.h>
@ -29,14 +29,16 @@
#include "src/core/lib/gprpp/thd.h"
#include "test/core/util/test_config.h"
using grpc_core::MultiProducerSingleConsumerQueue;
typedef struct test_node {
gpr_mpscq_node node;
MultiProducerSingleConsumerQueue::Node node;
size_t i;
size_t* ctr;
} test_node;
static test_node* new_node(size_t i, size_t* ctr) {
test_node* n = static_cast<test_node*>(gpr_malloc(sizeof(test_node)));
test_node* n = grpc_core::New<test_node>();
n->i = i;
n->ctr = ctr;
return n;
@ -44,13 +46,12 @@ static test_node* new_node(size_t i, size_t* ctr) {
static void test_serial(void) {
gpr_log(GPR_DEBUG, "test_serial");
gpr_mpscq q;
gpr_mpscq_init(&q);
MultiProducerSingleConsumerQueue q;
for (size_t i = 0; i < 10000000; i++) {
gpr_mpscq_push(&q, &new_node(i, nullptr)->node);
q.Push(&new_node(i, nullptr)->node);
}
for (size_t i = 0; i < 10000000; i++) {
test_node* n = reinterpret_cast<test_node*>(gpr_mpscq_pop(&q));
test_node* n = reinterpret_cast<test_node*>(q.Pop());
GPR_ASSERT(n);
GPR_ASSERT(n->i == i);
gpr_free(n);
@ -59,7 +60,7 @@ static void test_serial(void) {
typedef struct {
size_t ctr;
gpr_mpscq* q;
MultiProducerSingleConsumerQueue* q;
gpr_event* start;
} thd_args;
@ -69,7 +70,7 @@ static void test_thread(void* args) {
thd_args* a = static_cast<thd_args*>(args);
gpr_event_wait(a->start, gpr_inf_future(GPR_CLOCK_REALTIME));
for (size_t i = 1; i <= THREAD_ITERATIONS; i++) {
gpr_mpscq_push(a->q, &new_node(i, &a->ctr)->node);
a->q->Push(&new_node(i, &a->ctr)->node);
}
}
@ -79,8 +80,7 @@ static void test_mt(void) {
gpr_event_init(&start);
grpc_core::Thread thds[100];
thd_args ta[GPR_ARRAY_SIZE(thds)];
gpr_mpscq q;
gpr_mpscq_init(&q);
MultiProducerSingleConsumerQueue q;
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
ta[i].ctr = 0;
ta[i].q = &q;
@ -92,8 +92,8 @@ static void test_mt(void) {
size_t spins = 0;
gpr_event_set(&start, (void*)1);
while (num_done != GPR_ARRAY_SIZE(thds)) {
gpr_mpscq_node* n;
while ((n = gpr_mpscq_pop(&q)) == nullptr) {
MultiProducerSingleConsumerQueue::Node* n;
while ((n = q.Pop()) == nullptr) {
spins++;
}
test_node* tn = reinterpret_cast<test_node*>(n);
@ -106,7 +106,6 @@ static void test_mt(void) {
for (auto& th : thds) {
th.Join();
}
gpr_mpscq_destroy(&q);
}
typedef struct {
@ -115,7 +114,7 @@ typedef struct {
gpr_mu mu;
size_t num_done;
size_t spins;
gpr_mpscq* q;
MultiProducerSingleConsumerQueue* q;
gpr_event* start;
} pull_args;
@ -129,8 +128,8 @@ static void pull_thread(void* arg) {
gpr_mu_unlock(&pa->mu);
return;
}
gpr_mpscq_node* n;
while ((n = gpr_mpscq_pop(pa->q)) == nullptr) {
MultiProducerSingleConsumerQueue::Node* n;
while ((n = pa->q->Pop()) == nullptr) {
pa->spins++;
}
test_node* tn = reinterpret_cast<test_node*>(n);
@ -149,8 +148,7 @@ static void test_mt_multipop(void) {
grpc_core::Thread thds[50];
grpc_core::Thread pull_thds[50];
thd_args ta[GPR_ARRAY_SIZE(thds)];
gpr_mpscq q;
gpr_mpscq_init(&q);
MultiProducerSingleConsumerQueue q;
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
ta[i].ctr = 0;
ta[i].q = &q;
@ -179,7 +177,6 @@ static void test_mt_multipop(void) {
th.Join();
}
gpr_mu_destroy(&pa.mu);
gpr_mpscq_destroy(&q);
}
int main(int argc, char** argv) {

@ -1090,7 +1090,6 @@ src/core/lib/debug/trace.h \
src/core/lib/gpr/alloc.h \
src/core/lib/gpr/arena.h \
src/core/lib/gpr/env.h \
src/core/lib/gpr/mpscq.h \
src/core/lib/gpr/murmur_hash.h \
src/core/lib/gpr/spinlock.h \
src/core/lib/gpr/string.h \
@ -1116,6 +1115,7 @@ src/core/lib/gprpp/inlined_vector.h \
src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/map.h \
src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mpscq.h \
src/core/lib/gprpp/optional.h \
src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/pair.h \

@ -1203,8 +1203,6 @@ src/core/lib/gpr/log_android.cc \
src/core/lib/gpr/log_linux.cc \
src/core/lib/gpr/log_posix.cc \
src/core/lib/gpr/log_windows.cc \
src/core/lib/gpr/mpscq.cc \
src/core/lib/gpr/mpscq.h \
src/core/lib/gpr/murmur_hash.cc \
src/core/lib/gpr/murmur_hash.h \
src/core/lib/gpr/spinlock.h \
@ -1252,6 +1250,8 @@ src/core/lib/gprpp/inlined_vector.h \
src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/map.h \
src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mpscq.cc \
src/core/lib/gprpp/mpscq.h \
src/core/lib/gprpp/optional.h \
src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/pair.h \

@ -955,30 +955,6 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 30,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "gpr_mpscq_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -4739,6 +4715,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 30,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c++",
"name": "gprpp_mpscq_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save