Add shim to support condition variable wakeup fds where pipe/eventfd is not available

pull/7664/head
Ken Payson 9 years ago
parent 93b09478f0
commit 31caabdead
  1. 12
      BUILD
  2. 5
      CMakeLists.txt
  3. 42
      Makefile
  4. 1
      binding.gyp
  5. 16
      build.yaml
  6. 1
      config.m4
  7. 3
      gRPC-Core.podspec
  8. 2
      grpc.gemspec
  9. 2
      package.xml
  10. 328
      src/core/lib/iomgr/wakeup_fd_cv.c
  11. 58
      src/core/lib/iomgr/wakeup_fd_cv.h
  12. 9
      src/core/lib/iomgr/wakeup_fd_pipe.c
  13. 15
      src/core/lib/iomgr/wakeup_fd_posix.c
  14. 1
      src/core/lib/iomgr/wakeup_fd_posix.h
  15. 1
      src/python/grpcio/grpc_core_dependencies.py
  16. 246
      test/core/iomgr/wakeup_fd_cv_test.c
  17. 2
      tools/doxygen/Doxyfile.c++.internal
  18. 2
      tools/doxygen/Doxyfile.core.internal
  19. 19
      tools/run_tests/sources_and_headers.json
  20. 19
      tools/run_tests/tests.json
  21. 3
      vsprojects/vcxproj/grpc++/grpc++.vcxproj
  22. 6
      vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
  23. 3
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
  24. 6
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
  25. 3
      vsprojects/vcxproj/grpc/grpc.vcxproj
  26. 6
      vsprojects/vcxproj/grpc/grpc.vcxproj.filters
  27. 3
      vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
  28. 6
      vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
  29. 3
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
  30. 6
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters

12
BUILD

@ -215,6 +215,7 @@ cc_library(
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
"src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",
@ -372,6 +373,7 @@ cc_library(
"src/core/lib/iomgr/udp_server.c",
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
"src/core/lib/iomgr/wakeup_fd_cv.c",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",
@ -610,6 +612,7 @@ cc_library(
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
"src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",
@ -753,6 +756,7 @@ cc_library(
"src/core/lib/iomgr/udp_server.c",
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
"src/core/lib/iomgr/wakeup_fd_cv.c",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",
@ -962,6 +966,7 @@ cc_library(
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
"src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",
@ -1096,6 +1101,7 @@ cc_library(
"src/core/lib/iomgr/udp_server.c",
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
"src/core/lib/iomgr/wakeup_fd_cv.c",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",
@ -1309,6 +1315,7 @@ cc_library(
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
"src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",
@ -1423,6 +1430,7 @@ cc_library(
"src/core/lib/iomgr/udp_server.c",
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
"src/core/lib/iomgr/wakeup_fd_cv.c",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",
@ -1719,6 +1727,7 @@ cc_library(
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
"src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",
@ -1828,6 +1837,7 @@ cc_library(
"src/core/lib/iomgr/udp_server.c",
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
"src/core/lib/iomgr/wakeup_fd_cv.c",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",
@ -2218,6 +2228,7 @@ objc_library(
"src/core/lib/iomgr/udp_server.c",
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
"src/core/lib/iomgr/wakeup_fd_cv.c",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",
@ -2435,6 +2446,7 @@ objc_library(
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
"src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",

@ -345,6 +345,7 @@ add_library(grpc
src/core/lib/iomgr/udp_server.c
src/core/lib/iomgr/unix_sockets_posix.c
src/core/lib/iomgr/unix_sockets_posix_noop.c
src/core/lib/iomgr/wakeup_fd_cv.c
src/core/lib/iomgr/wakeup_fd_eventfd.c
src/core/lib/iomgr/wakeup_fd_nospecial.c
src/core/lib/iomgr/wakeup_fd_pipe.c
@ -603,6 +604,7 @@ add_library(grpc_cronet
src/core/lib/iomgr/udp_server.c
src/core/lib/iomgr/unix_sockets_posix.c
src/core/lib/iomgr/unix_sockets_posix_noop.c
src/core/lib/iomgr/wakeup_fd_cv.c
src/core/lib/iomgr/wakeup_fd_eventfd.c
src/core/lib/iomgr/wakeup_fd_nospecial.c
src/core/lib/iomgr/wakeup_fd_pipe.c
@ -834,6 +836,7 @@ add_library(grpc_unsecure
src/core/lib/iomgr/udp_server.c
src/core/lib/iomgr/unix_sockets_posix.c
src/core/lib/iomgr/unix_sockets_posix_noop.c
src/core/lib/iomgr/wakeup_fd_cv.c
src/core/lib/iomgr/wakeup_fd_eventfd.c
src/core/lib/iomgr/wakeup_fd_nospecial.c
src/core/lib/iomgr/wakeup_fd_pipe.c
@ -1091,6 +1094,7 @@ add_library(grpc++
src/core/lib/iomgr/udp_server.c
src/core/lib/iomgr/unix_sockets_posix.c
src/core/lib/iomgr/unix_sockets_posix_noop.c
src/core/lib/iomgr/wakeup_fd_cv.c
src/core/lib/iomgr/wakeup_fd_eventfd.c
src/core/lib/iomgr/wakeup_fd_nospecial.c
src/core/lib/iomgr/wakeup_fd_pipe.c
@ -1448,6 +1452,7 @@ add_library(grpc++_unsecure
src/core/lib/iomgr/udp_server.c
src/core/lib/iomgr/unix_sockets_posix.c
src/core/lib/iomgr/unix_sockets_posix_noop.c
src/core/lib/iomgr/wakeup_fd_cv.c
src/core/lib/iomgr/wakeup_fd_eventfd.c
src/core/lib/iomgr/wakeup_fd_nospecial.c
src/core/lib/iomgr/wakeup_fd_pipe.c

@ -1021,6 +1021,7 @@ transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test
udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test
uri_fuzzer_test: $(BINDIR)/$(CONFIG)/uri_fuzzer_test
uri_parser_test: $(BINDIR)/$(CONFIG)/uri_parser_test
wakeup_fd_cv_test: $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test
alarm_cpp_test: $(BINDIR)/$(CONFIG)/alarm_cpp_test
async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test
auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test
@ -1329,6 +1330,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/transport_security_test \
$(BINDIR)/$(CONFIG)/udp_server_test \
$(BINDIR)/$(CONFIG)/uri_parser_test \
$(BINDIR)/$(CONFIG)/wakeup_fd_cv_test \
$(BINDIR)/$(CONFIG)/public_headers_must_be_c89 \
$(BINDIR)/$(CONFIG)/badreq_bad_client_test \
$(BINDIR)/$(CONFIG)/connection_prefix_bad_client_test \
@ -1717,6 +1719,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/udp_server_test || ( echo test udp_server_test failed ; exit 1 )
$(E) "[RUN] Testing uri_parser_test"
$(Q) $(BINDIR)/$(CONFIG)/uri_parser_test || ( echo test uri_parser_test failed ; exit 1 )
$(E) "[RUN] Testing wakeup_fd_cv_test"
$(Q) $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test || ( echo test wakeup_fd_cv_test failed ; exit 1 )
$(E) "[RUN] Testing public_headers_must_be_c89"
$(Q) $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 || ( echo test public_headers_must_be_c89 failed ; exit 1 )
$(E) "[RUN] Testing badreq_bad_client_test"
@ -2566,6 +2570,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
@ -2842,6 +2847,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
@ -3107,6 +3113,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
@ -3300,6 +3307,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
@ -3640,6 +3648,7 @@ LIBGRPC++_SRC = \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
@ -4275,6 +4284,7 @@ LIBGRPC++_UNSECURE_SRC = \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
@ -10805,6 +10815,38 @@ endif
endif
WAKEUP_FD_CV_TEST_SRC = \
test/core/iomgr/wakeup_fd_cv_test.c \
WAKEUP_FD_CV_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(WAKEUP_FD_CV_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/wakeup_fd_cv_test: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/wakeup_fd_cv_test: $(WAKEUP_FD_CV_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LD) $(LDFLAGS) $(WAKEUP_FD_CV_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test
endif
$(OBJDIR)/$(CONFIG)/test/core/iomgr/wakeup_fd_cv_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_wakeup_fd_cv_test: $(WAKEUP_FD_CV_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(WAKEUP_FD_CV_TEST_OBJS:.o=.dep)
endif
endif
ALARM_CPP_TEST_SRC = \
test/cpp/common/alarm_cpp_test.cc \

@ -617,6 +617,7 @@
'src/core/lib/iomgr/udp_server.c',
'src/core/lib/iomgr/unix_sockets_posix.c',
'src/core/lib/iomgr/unix_sockets_posix_noop.c',
'src/core/lib/iomgr/wakeup_fd_cv.c',
'src/core/lib/iomgr/wakeup_fd_eventfd.c',
'src/core/lib/iomgr/wakeup_fd_nospecial.c',
'src/core/lib/iomgr/wakeup_fd_pipe.c',

@ -217,6 +217,7 @@ filegroups:
- src/core/lib/iomgr/timer_heap.h
- src/core/lib/iomgr/udp_server.h
- src/core/lib/iomgr/unix_sockets_posix.h
- src/core/lib/iomgr/wakeup_fd_cv.h
- src/core/lib/iomgr/wakeup_fd_pipe.h
- src/core/lib/iomgr/wakeup_fd_posix.h
- src/core/lib/iomgr/workqueue.h
@ -299,6 +300,7 @@ filegroups:
- src/core/lib/iomgr/udp_server.c
- src/core/lib/iomgr/unix_sockets_posix.c
- src/core/lib/iomgr/unix_sockets_posix_noop.c
- src/core/lib/iomgr/wakeup_fd_cv.c
- src/core/lib/iomgr/wakeup_fd_eventfd.c
- src/core/lib/iomgr/wakeup_fd_nospecial.c
- src/core/lib/iomgr/wakeup_fd_pipe.c
@ -2535,6 +2537,20 @@ targets:
- grpc
- gpr_test_util
- gpr
- name: wakeup_fd_cv_test
build: test
language: c
src:
- test/core/iomgr/wakeup_fd_cv_test.c
deps:
- grpc_test_util
- grpc
- gpr_test_util
- gpr
platforms:
- mac
- linux
- posix
- name: alarm_cpp_test
gtest: true
build: test

@ -136,6 +136,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \

@ -306,6 +306,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/timer_heap.h',
'src/core/lib/iomgr/udp_server.h',
'src/core/lib/iomgr/unix_sockets_posix.h',
'src/core/lib/iomgr/wakeup_fd_cv.h',
'src/core/lib/iomgr/wakeup_fd_pipe.h',
'src/core/lib/iomgr/wakeup_fd_posix.h',
'src/core/lib/iomgr/workqueue.h',
@ -467,6 +468,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/udp_server.c',
'src/core/lib/iomgr/unix_sockets_posix.c',
'src/core/lib/iomgr/unix_sockets_posix_noop.c',
'src/core/lib/iomgr/wakeup_fd_cv.c',
'src/core/lib/iomgr/wakeup_fd_eventfd.c',
'src/core/lib/iomgr/wakeup_fd_nospecial.c',
'src/core/lib/iomgr/wakeup_fd_pipe.c',
@ -668,6 +670,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/timer_heap.h',
'src/core/lib/iomgr/udp_server.h',
'src/core/lib/iomgr/unix_sockets_posix.h',
'src/core/lib/iomgr/wakeup_fd_cv.h',
'src/core/lib/iomgr/wakeup_fd_pipe.h',
'src/core/lib/iomgr/wakeup_fd_posix.h',
'src/core/lib/iomgr/workqueue.h',

@ -226,6 +226,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/timer_heap.h )
s.files += %w( src/core/lib/iomgr/udp_server.h )
s.files += %w( src/core/lib/iomgr/unix_sockets_posix.h )
s.files += %w( src/core/lib/iomgr/wakeup_fd_cv.h )
s.files += %w( src/core/lib/iomgr/wakeup_fd_pipe.h )
s.files += %w( src/core/lib/iomgr/wakeup_fd_posix.h )
s.files += %w( src/core/lib/iomgr/workqueue.h )
@ -387,6 +388,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/udp_server.c )
s.files += %w( src/core/lib/iomgr/unix_sockets_posix.c )
s.files += %w( src/core/lib/iomgr/unix_sockets_posix_noop.c )
s.files += %w( src/core/lib/iomgr/wakeup_fd_cv.c )
s.files += %w( src/core/lib/iomgr/wakeup_fd_eventfd.c )
s.files += %w( src/core/lib/iomgr/wakeup_fd_nospecial.c )
s.files += %w( src/core/lib/iomgr/wakeup_fd_pipe.c )

@ -233,6 +233,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/timer_heap.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/udp_server.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/unix_sockets_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_cv.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_pipe.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/workqueue.h" role="src" />
@ -394,6 +395,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/udp_server.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/unix_sockets_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/unix_sockets_posix_noop.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_cv.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_eventfd.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_nospecial.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_pipe.c" role="src" />

@ -0,0 +1,328 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc/support/port_platform.h>
#ifdef GPR_POSIX_WAKEUP_FD
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include <errno.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/ev_posix.h"
#define MAX_TABLE_RESIZE 256
#define DEFAULT_TABLE_SIZE 16
#define POLL_PERIOD_MS 1000
#define FD_TO_IDX(fd) (-(fd)-1)
#define IDX_TO_FD(idx) (-(idx)-1)
typedef struct cv_node {
gpr_cv* cv;
struct cv_node* next;
} cv_node;
typedef struct fd_node {
int is_set;
cv_node* cvs;
struct fd_node* next_free;
} fd_node;
typedef struct cv_fd_table {
fd_node* cvfds;
fd_node* free_fds;
unsigned int size;
grpc_poll_function_type poll;
} cv_fd_table;
typedef struct poll_result {
struct pollfd* fds;
gpr_cv* cv;
int completed;
int res;
int err;
} poll_result;
typedef struct poll_args {
struct pollfd* fds;
nfds_t nfds;
int timeout;
poll_result* result;
} poll_args;
static gpr_mu g_mu = PTHREAD_MUTEX_INITIALIZER;
static cv_fd_table g_cvfds;
// Some environments do not implement pthread_cancel(), so we run
// this poll in a detached thread, and wake up periodically and
// check if the calling thread is still waiting on a result
static void run_poll(void* arg) {
int result, timeout;
poll_args* pargs = (poll_args*)arg;
gpr_mu_lock(&g_mu);
if (pargs->result != NULL) {
while (pargs->result != NULL) {
if (pargs->timeout < 0) {
timeout = POLL_PERIOD_MS;
} else {
timeout = GPR_MIN(POLL_PERIOD_MS, pargs->timeout);
pargs->timeout -= timeout;
}
gpr_mu_unlock(&g_mu);
result = g_cvfds.poll(pargs->fds, pargs->nfds, timeout);
gpr_mu_lock(&g_mu);
if (pargs->result != NULL) {
if (result != 0 || pargs->timeout == 0) {
memcpy(pargs->result->fds, pargs->fds,
sizeof(struct pollfd) * pargs->nfds);
pargs->result->res = result;
pargs->result->err = errno;
pargs->result->completed = 1;
gpr_cv_signal(pargs->result->cv);
break;
}
}
}
}
gpr_free(pargs->fds);
gpr_free(pargs);
gpr_mu_unlock(&g_mu);
}
int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
unsigned int i;
int res, idx;
cv_node *cvn, *prev;
struct pollfd* sockfds;
nfds_t nsockfds = 0;
gpr_cv pollcv;
gpr_thd_id t_id;
gpr_thd_options opt;
poll_args* pargs;
poll_result* pres;
gpr_mu_lock(&g_mu);
gpr_cv_init(&pollcv);
for (i = 0; i < nfds; i++) {
fds[i].revents = 0;
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
idx = FD_TO_IDX(fds[i].fd);
cvn = gpr_malloc(sizeof(cv_node));
cvn->cv = &pollcv;
cvn->next = g_cvfds.cvfds[idx].cvs;
g_cvfds.cvfds[idx].cvs = cvn;
// We should return immediately if there are pending events,
// but we still need to call poll() to check for socket events
if (g_cvfds.cvfds[idx].is_set) {
timeout = 0;
}
} else if (fds[i].fd >= 0) {
nsockfds++;
}
}
sockfds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd >= 0) {
sockfds[idx].fd = fds[i].fd;
sockfds[idx].events = fds[i].events;
sockfds[idx].revents = 0;
idx++;
}
}
errno = 0;
if (nsockfds > 0) {
pres = gpr_malloc(sizeof(struct poll_result));
pargs = gpr_malloc(sizeof(struct poll_args));
pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
memcpy(pargs->fds, sockfds, sizeof(struct pollfd) * nsockfds);
pargs->nfds = nsockfds;
pargs->timeout = timeout;
pargs->result = pres;
pres->fds = sockfds;
pres->cv = &pollcv;
pres->completed = 0;
pres->res = 0;
pres->err = 0;
opt = gpr_thd_options_default();
gpr_thd_options_set_detached(&opt);
gpr_thd_new(&t_id, &run_poll, pargs, &opt);
// We want the poll() thread to trigger the deadline, so wait forever here
gpr_cv_wait(&pollcv, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
if (!pres->completed) {
pargs->result = NULL;
}
res = pres->res;
errno = pres->err;
gpr_free(pres);
} else {
gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
deadline =
gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
gpr_cv_wait(&pollcv, &g_mu, deadline);
res = 0;
}
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs;
prev = NULL;
while (cvn->cv != &pollcv) {
prev = cvn;
cvn = cvn->next;
GPR_ASSERT(cvn);
}
if (!prev) {
g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next;
} else {
prev->next = cvn->next;
}
gpr_free(cvn);
if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) {
fds[i].revents = POLLIN;
if (res >= 0) res++;
}
} else if (fds[i].fd >= 0) {
fds[i].revents = sockfds[idx].revents;
idx++;
}
}
gpr_free(sockfds);
gpr_cv_destroy(&pollcv);
gpr_mu_unlock(&g_mu);
return res;
}
static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
unsigned int i, newsize;
int idx;
gpr_mu_lock(&g_mu);
if (!g_cvfds.free_fds) {
newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE);
g_cvfds.cvfds = gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize);
for (i = g_cvfds.size; i < newsize; i++) {
g_cvfds.cvfds[i].is_set = 0;
g_cvfds.cvfds[i].cvs = NULL;
g_cvfds.cvfds[i].next_free = g_cvfds.free_fds;
g_cvfds.free_fds = &g_cvfds.cvfds[i];
}
g_cvfds.size = newsize;
}
idx = (int)(g_cvfds.free_fds - g_cvfds.cvfds);
g_cvfds.free_fds = g_cvfds.free_fds->next_free;
g_cvfds.cvfds[idx].cvs = NULL;
g_cvfds.cvfds[idx].is_set = 0;
fd_info->read_fd = IDX_TO_FD(idx);
fd_info->write_fd = -1;
gpr_mu_unlock(&g_mu);
return GRPC_ERROR_NONE;
}
void grpc_global_cv_fd_table_init() {
gpr_mu_lock(&g_mu);
g_cvfds.size = DEFAULT_TABLE_SIZE;
g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * DEFAULT_TABLE_SIZE);
g_cvfds.free_fds = NULL;
for (int i = 0; i < DEFAULT_TABLE_SIZE; i++) {
g_cvfds.cvfds[i].is_set = 0;
g_cvfds.cvfds[i].cvs = NULL;
g_cvfds.cvfds[i].next_free = g_cvfds.free_fds;
g_cvfds.free_fds = &g_cvfds.cvfds[i];
}
// Override the poll function with one that supports cvfds
g_cvfds.poll = grpc_poll_function;
grpc_poll_function = &cvfd_poll;
gpr_mu_unlock(&g_mu);
}
void grpc_global_cv_fd_table_shutdown() {
gpr_mu_lock(&g_mu);
grpc_poll_function = g_cvfds.poll;
gpr_free(g_cvfds.cvfds);
gpr_mu_unlock(&g_mu);
}
static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) {
cv_node* cvn;
gpr_mu_lock(&g_mu);
g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 1;
cvn = g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs;
while (cvn) {
gpr_cv_signal(cvn->cv);
cvn = cvn->next;
}
gpr_mu_unlock(&g_mu);
return GRPC_ERROR_NONE;
}
static grpc_error* cv_fd_consume(grpc_wakeup_fd* fd_info) {
gpr_mu_lock(&g_mu);
g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 0;
gpr_mu_unlock(&g_mu);
return GRPC_ERROR_NONE;
}
static void cv_fd_destroy(grpc_wakeup_fd* fd_info) {
if (fd_info->read_fd == 0) {
return;
}
gpr_mu_lock(&g_mu);
// Assert that there are no active pollers
GPR_ASSERT(!g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs);
g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds;
g_cvfds.free_fds = &g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)];
gpr_mu_unlock(&g_mu);
}
static int cv_check_availability(void) { return 1; }
const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable = {
cv_fd_init, cv_fd_consume, cv_fd_wakeup, cv_fd_destroy,
cv_check_availability};
#endif /* GPR_POSIX_WAKUP_FD */

@ -0,0 +1,58 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/*
* wakeup_fd_cv uses condition variables to implement wakeup fds.
*
* It is intended for use only in cases when eventfd() and pipe() are not
* available. It can only be used with the "poll" engine.
*
* Implementation:
* A global table of cv wakeup fds is mantained. A cv wakeup fd is a negative
* file descriptor. poll() is then run in a background thread with only the
* real socket fds while we wait on a condition variable trigged by either the
* poll() called or a wakeup_fd() call.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H
#define GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
void grpc_global_cv_fd_table_init();
void grpc_global_cv_fd_table_shutdown();
extern grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable;
#endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H */

@ -95,8 +95,13 @@ static void pipe_destroy(grpc_wakeup_fd* fd_info) {
}
static int pipe_check_availability(void) {
/* Assume that pipes are always available. */
return 1;
grpc_wakeup_fd fd;
if (pipe_init(&fd) == GRPC_ERROR_NONE) {
pipe_destroy(&fd);
return 1;
} else {
return 0;
}
}
const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = {

@ -36,22 +36,33 @@
#ifdef GPR_POSIX_WAKEUP_FD
#include <stddef.h>
#include "src/core/lib/iomgr/wakeup_fd_cv.h"
#include "src/core/lib/iomgr/wakeup_fd_pipe.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL;
int grpc_allow_specialized_wakeup_fd = 1;
int grpc_allow_pipe_wakeup_fd = 1;
void grpc_wakeup_fd_global_init(void) {
if (grpc_allow_specialized_wakeup_fd &&
grpc_specialized_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &grpc_specialized_wakeup_fd_vtable;
} else {
} else if (grpc_allow_pipe_wakeup_fd &&
grpc_pipe_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable;
} else {
wakeup_fd_vtable = &grpc_cv_wakeup_fd_vtable;
grpc_global_cv_fd_table_init();
}
}
void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; }
void grpc_wakeup_fd_global_destroy(void) {
if (wakeup_fd_vtable == &grpc_cv_wakeup_fd_vtable) {
grpc_global_cv_fd_table_shutdown();
}
wakeup_fd_vtable = NULL;
}
grpc_error *grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
return wakeup_fd_vtable->init(fd_info);

@ -88,6 +88,7 @@ struct grpc_wakeup_fd {
};
extern int grpc_allow_specialized_wakeup_fd;
extern int grpc_allow_pipe_wakeup_fd;
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)

@ -130,6 +130,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/udp_server.c',
'src/core/lib/iomgr/unix_sockets_posix.c',
'src/core/lib/iomgr/unix_sockets_posix_noop.c',
'src/core/lib/iomgr/wakeup_fd_cv.c',
'src/core/lib/iomgr/wakeup_fd_eventfd.c',
'src/core/lib/iomgr/wakeup_fd_nospecial.c',
'src/core/lib/iomgr/wakeup_fd_pipe.c',

@ -0,0 +1,246 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <pthread.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/support/env.h"
typedef struct poll_args {
struct pollfd *fds;
nfds_t nfds;
int timeout;
int result;
} poll_args;
gpr_cv poll_cv;
gpr_mu poll_mu;
static int socket_event = 0;
// Trigger a "socket" POLLIN in mock_poll()
void trigger_socket_event() {
gpr_mu_lock(&poll_mu);
socket_event = 1;
gpr_cv_broadcast(&poll_cv);
gpr_mu_unlock(&poll_mu);
}
void reset_socket_event() {
gpr_mu_lock(&poll_mu);
socket_event = 0;
gpr_mu_unlock(&poll_mu);
}
// Mocks posix poll() function
int mock_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
int res = 0;
gpr_timespec poll_time;
gpr_mu_lock(&poll_mu);
GPR_ASSERT(nfds == 3);
GPR_ASSERT(fds[0].fd == 20);
GPR_ASSERT(fds[1].fd == 30);
GPR_ASSERT(fds[2].fd == 50);
GPR_ASSERT(fds[0].events == (POLLIN | POLLHUP));
GPR_ASSERT(fds[1].events == (POLLIN | POLLHUP));
GPR_ASSERT(fds[2].events == POLLIN);
if (timeout < 0) {
poll_time = gpr_inf_future(GPR_CLOCK_REALTIME);
} else {
poll_time = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(timeout, GPR_TIMESPAN));
}
if (socket_event || !gpr_cv_wait(&poll_cv, &poll_mu, poll_time)) {
fds[0].revents = POLLIN;
res = 1;
}
gpr_mu_unlock(&poll_mu);
return res;
}
void background_poll(void *args) {
poll_args *pargs = (poll_args *)args;
pargs->result = grpc_poll_function(pargs->fds, pargs->nfds, pargs->timeout);
}
void test_many_fds(void) {
int i;
grpc_wakeup_fd_global_init();
grpc_wakeup_fd fd[1000];
for (i = 0; i < 1000; i++) {
GPR_ASSERT(grpc_wakeup_fd_init(&fd[i]) == GRPC_ERROR_NONE);
}
for (i = 0; i < 1000; i++) {
grpc_wakeup_fd_destroy(&fd[i]);
}
grpc_wakeup_fd_global_destroy();
}
void test_poll_cv_trigger(void) {
grpc_wakeup_fd cvfd1, cvfd2, cvfd3;
struct pollfd pfds[6];
poll_args pargs;
gpr_thd_id t_id;
gpr_thd_options opt;
grpc_poll_function = &mock_poll;
grpc_wakeup_fd_global_init();
GPR_ASSERT(grpc_wakeup_fd_init(&cvfd1) == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_wakeup_fd_init(&cvfd2) == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_wakeup_fd_init(&cvfd3) == GRPC_ERROR_NONE);
GPR_ASSERT(cvfd1.read_fd < 0);
GPR_ASSERT(cvfd2.read_fd < 0);
GPR_ASSERT(cvfd3.read_fd < 0);
GPR_ASSERT(cvfd1.read_fd != cvfd2.read_fd);
GPR_ASSERT(cvfd2.read_fd != cvfd3.read_fd);
GPR_ASSERT(cvfd1.read_fd != cvfd3.read_fd);
pfds[0].fd = cvfd1.read_fd;
pfds[1].fd = cvfd2.read_fd;
pfds[2].fd = 20;
pfds[3].fd = 30;
pfds[4].fd = cvfd3.read_fd;
pfds[5].fd = 50;
pfds[0].events = 0;
pfds[1].events = POLLIN;
pfds[2].events = POLLIN | POLLHUP;
pfds[3].events = POLLIN | POLLHUP;
pfds[4].events = POLLIN;
pfds[5].events = POLLIN;
pargs.fds = pfds;
pargs.nfds = 6;
pargs.timeout = 1000;
pargs.result = -2;
opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
// Wakeup wakeup_fd not listening for events
GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd1) == GRPC_ERROR_NONE);
gpr_thd_join(t_id);
GPR_ASSERT(pargs.result == 0);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == 0);
GPR_ASSERT(pfds[2].revents == 0);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
// Pollin on socket fd
pargs.timeout = -1;
pargs.result = -2;
gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
trigger_socket_event();
gpr_thd_join(t_id);
GPR_ASSERT(pargs.result == 1);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == 0);
GPR_ASSERT(pfds[2].revents == POLLIN);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
// Pollin on wakeup fd
reset_socket_event();
pargs.result = -2;
gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd2) == GRPC_ERROR_NONE);
gpr_thd_join(t_id);
GPR_ASSERT(pargs.result == 1);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == POLLIN);
GPR_ASSERT(pfds[2].revents == 0);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
// Pollin on wakeup fd + socket fd
trigger_socket_event();
pargs.result = -2;
gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
gpr_thd_join(t_id);
GPR_ASSERT(pargs.result == 2);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == POLLIN);
GPR_ASSERT(pfds[2].revents == POLLIN);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
// No Events
pargs.result = -2;
pargs.timeout = 1000;
reset_socket_event();
GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd1) == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd2) == GRPC_ERROR_NONE);
gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
gpr_thd_join(t_id);
GPR_ASSERT(pargs.result == 0);
GPR_ASSERT(pfds[0].revents == 0);
GPR_ASSERT(pfds[1].revents == 0);
GPR_ASSERT(pfds[2].revents == 0);
GPR_ASSERT(pfds[3].revents == 0);
GPR_ASSERT(pfds[4].revents == 0);
GPR_ASSERT(pfds[5].revents == 0);
grpc_wakeup_fd_global_destroy();
}
int main(int argc, char **argv) {
gpr_setenv("GRPC_POLL_STRATEGY", "poll");
grpc_allow_specialized_wakeup_fd = 0;
grpc_allow_pipe_wakeup_fd = 0;
gpr_mu_init(&poll_mu);
gpr_cv_init(&poll_cv);
test_many_fds();
test_poll_cv_trigger();
// Make sure detached polling threads have chance
// to exit and clean up memory. pthread_exit() causes tsan/msan
// issues, so we just wait an ample amount of time
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(5, GPR_TIMESPAN)));
return 0;
}

@ -925,6 +925,7 @@ src/core/lib/iomgr/timer.h \
src/core/lib/iomgr/timer_heap.h \
src/core/lib/iomgr/udp_server.h \
src/core/lib/iomgr/unix_sockets_posix.h \
src/core/lib/iomgr/wakeup_fd_cv.h \
src/core/lib/iomgr/wakeup_fd_pipe.h \
src/core/lib/iomgr/wakeup_fd_posix.h \
src/core/lib/iomgr/workqueue.h \
@ -1039,6 +1040,7 @@ src/core/lib/iomgr/timer_heap.c \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \

@ -842,6 +842,7 @@ src/core/lib/iomgr/timer.h \
src/core/lib/iomgr/timer_heap.h \
src/core/lib/iomgr/udp_server.h \
src/core/lib/iomgr/unix_sockets_posix.h \
src/core/lib/iomgr/wakeup_fd_cv.h \
src/core/lib/iomgr/wakeup_fd_pipe.h \
src/core/lib/iomgr/wakeup_fd_posix.h \
src/core/lib/iomgr/workqueue.h \
@ -1003,6 +1004,7 @@ src/core/lib/iomgr/timer_heap.c \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \

@ -1891,6 +1891,22 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"language": "c",
"name": "wakeup_fd_cv_test",
"src": [
"test/core/iomgr/wakeup_fd_cv_test.c"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
@ -5941,6 +5957,7 @@
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
"src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",
@ -6085,6 +6102,8 @@
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix.h",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
"src/core/lib/iomgr/wakeup_fd_cv.c",
"src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",

@ -1977,6 +1977,25 @@
"windows"
]
},
{
"args": [],
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "wakeup_fd_cv_test",
"platforms": [
"linux",
"mac",
"posix"
]
},
{
"args": [],
"ci_platforms": [

@ -425,6 +425,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\timer_heap.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\udp_server.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\workqueue.h" />
@ -627,6 +628,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_nospecial.c">

@ -259,6 +259,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -869,6 +872,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

@ -421,6 +421,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\timer_heap.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\udp_server.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\workqueue.h" />
@ -613,6 +614,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_nospecial.c">

@ -244,6 +244,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -842,6 +845,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

@ -351,6 +351,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\timer_heap.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\udp_server.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\workqueue.h" />
@ -568,6 +569,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_nospecial.c">

@ -163,6 +163,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -821,6 +824,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

@ -242,6 +242,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\timer_heap.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\udp_server.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\workqueue.h" />
@ -412,6 +413,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_nospecial.c">

@ -211,6 +211,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -602,6 +605,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

@ -341,6 +341,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\timer_heap.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\udp_server.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\workqueue.h" />
@ -536,6 +537,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_nospecial.c">

@ -166,6 +166,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -731,6 +734,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

Loading…
Cancel
Save