Use backup pollser instead of connectivity watcher

pull/12732/head
Yuchen Zeng 7 years ago
parent d9ce7d9233
commit 0bad30a244
  1. 6
      BUILD
  2. 18
      CMakeLists.txt
  3. 18
      Makefile
  4. 3
      binding.gyp
  5. 6
      build.yaml
  6. 3
      config.m4
  7. 3
      config.w32
  8. 9
      gRPC-Core.podspec
  9. 6
      grpc.gemspec
  10. 12
      grpc.gyp
  11. 6
      package.xml
  12. 143
      src/core/ext/filters/client_channel/backup_poller.cc
  13. 14
      src/core/ext/filters/client_channel/backup_poller.h
  14. 160
      src/core/ext/filters/client_channel/channel_connectivity.cc
  15. 195
      src/core/ext/filters/client_channel/channel_connectivity_internal.cc
  16. 33
      src/core/ext/filters/client_channel/channel_connectivity_internal.h
  17. 6
      src/core/ext/filters/client_channel/client_channel.cc
  18. 179
      src/core/ext/filters/client_channel/connectivity_watcher.cc
  19. 3
      src/python/grpcio/grpc_core_dependencies.py
  20. 5
      test/cpp/end2end/async_end2end_test.cc
  21. 5
      test/cpp/end2end/end2end_test.cc
  22. 6
      tools/doxygen/Doxyfile.core.internal
  23. 9
      tools/run_tests/generated/sources_and_headers.json

@ -872,12 +872,11 @@ grpc_cc_library(
grpc_cc_library(
name = "grpc_client_channel",
srcs = [
"src/core/ext/filters/client_channel/channel_connectivity_internal.cc",
"src/core/ext/filters/client_channel/backup_poller.cc",
"src/core/ext/filters/client_channel/channel_connectivity.cc",
"src/core/ext/filters/client_channel/client_channel.cc",
"src/core/ext/filters/client_channel/client_channel_factory.cc",
"src/core/ext/filters/client_channel/client_channel_plugin.cc",
"src/core/ext/filters/client_channel/connectivity_watcher.cc",
"src/core/ext/filters/client_channel/connector.cc",
"src/core/ext/filters/client_channel/http_connect_handshaker.cc",
"src/core/ext/filters/client_channel/http_proxy.cc",
@ -896,10 +895,9 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/uri_parser.cc",
],
hdrs = [
"src/core/ext/filters/client_channel/channel_connectivity_internal.h",
"src/core/ext/filters/client_channel/backup_poller.h",
"src/core/ext/filters/client_channel/client_channel.h",
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/connectivity_watcher.h",
"src/core/ext/filters/client_channel/connector.h",
"src/core/ext/filters/client_channel/http_connect_handshaker.h",
"src/core/ext/filters/client_channel/http_proxy.h",

@ -1153,12 +1153,11 @@ add_library(grpc
src/core/tsi/transport_security_adapter.cc
src/core/ext/transport/chttp2/server/chttp2_server.cc
src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/channel_connectivity_internal.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connectivity_watcher.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
src/core/ext/filters/client_channel/http_proxy.cc
@ -1479,12 +1478,11 @@ add_library(grpc_cronet
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/message_compress_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/channel_connectivity_internal.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connectivity_watcher.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
src/core/ext/filters/client_channel/http_proxy.cc
@ -1771,12 +1769,11 @@ add_library(grpc_test_util
src/core/lib/transport/transport.cc
src/core/lib/transport/transport_op_string.cc
src/core/lib/debug/trace.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/channel_connectivity_internal.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connectivity_watcher.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
src/core/ext/filters/client_channel/http_proxy.cc
@ -2038,12 +2035,11 @@ add_library(grpc_test_util_unsecure
src/core/lib/transport/transport.cc
src/core/lib/transport/transport_op_string.cc
src/core/lib/debug/trace.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/channel_connectivity_internal.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connectivity_watcher.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
src/core/ext/filters/client_channel/http_proxy.cc
@ -2324,12 +2320,11 @@ add_library(grpc_unsecure
src/core/ext/transport/chttp2/client/insecure/channel_create.cc
src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
src/core/ext/transport/chttp2/client/chttp2_connector.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/channel_connectivity_internal.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connectivity_watcher.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
src/core/ext/filters/client_channel/http_proxy.cc
@ -3061,12 +3056,11 @@ add_library(grpc++_cronet
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/message_compress_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/client_channel/backup_poller.cc
src/core/ext/filters/client_channel/channel_connectivity.cc
src/core/ext/filters/client_channel/channel_connectivity_internal.cc
src/core/ext/filters/client_channel/client_channel.cc
src/core/ext/filters/client_channel/client_channel_factory.cc
src/core/ext/filters/client_channel/client_channel_plugin.cc
src/core/ext/filters/client_channel/connectivity_watcher.cc
src/core/ext/filters/client_channel/connector.cc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
src/core/ext/filters/client_channel/http_proxy.cc

@ -3153,12 +3153,11 @@ LIBGRPC_SRC = \
src/core/tsi/transport_security_adapter.cc \
src/core/ext/transport/chttp2/server/chttp2_server.cc \
src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/channel_connectivity_internal.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connectivity_watcher.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
src/core/ext/filters/client_channel/http_proxy.cc \
@ -3478,12 +3477,11 @@ LIBGRPC_CRONET_SRC = \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/channel_connectivity_internal.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connectivity_watcher.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
src/core/ext/filters/client_channel/http_proxy.cc \
@ -3768,12 +3766,11 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/transport/transport.cc \
src/core/lib/transport/transport_op_string.cc \
src/core/lib/debug/trace.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/channel_connectivity_internal.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connectivity_watcher.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
src/core/ext/filters/client_channel/http_proxy.cc \
@ -4025,12 +4022,11 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
src/core/lib/transport/transport.cc \
src/core/lib/transport/transport_op_string.cc \
src/core/lib/debug/trace.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/channel_connectivity_internal.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connectivity_watcher.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
src/core/ext/filters/client_channel/http_proxy.cc \
@ -4288,12 +4284,11 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/transport/chttp2/client/insecure/channel_create.cc \
src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc \
src/core/ext/transport/chttp2/client/chttp2_connector.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/channel_connectivity_internal.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connectivity_watcher.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
src/core/ext/filters/client_channel/http_proxy.cc \
@ -5003,12 +4998,11 @@ LIBGRPC++_CRONET_SRC = \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/channel_connectivity_internal.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connectivity_watcher.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
src/core/ext/filters/client_channel/http_proxy.cc \

@ -853,12 +853,11 @@
'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/server/chttp2_server.cc',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/channel_connectivity_internal.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connectivity_watcher.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
'src/core/ext/filters/client_channel/http_proxy.cc',

@ -463,10 +463,9 @@ filegroups:
- grpc_trace_headers
- name: grpc_client_channel
headers:
- src/core/ext/filters/client_channel/channel_connectivity_internal.h
- src/core/ext/filters/client_channel/backup_poller.h
- src/core/ext/filters/client_channel/client_channel.h
- src/core/ext/filters/client_channel/client_channel_factory.h
- src/core/ext/filters/client_channel/connectivity_watcher.h
- src/core/ext/filters/client_channel/connector.h
- src/core/ext/filters/client_channel/http_connect_handshaker.h
- src/core/ext/filters/client_channel/http_proxy.h
@ -484,12 +483,11 @@ filegroups:
- src/core/ext/filters/client_channel/subchannel_index.h
- src/core/ext/filters/client_channel/uri_parser.h
src:
- src/core/ext/filters/client_channel/backup_poller.cc
- src/core/ext/filters/client_channel/channel_connectivity.cc
- src/core/ext/filters/client_channel/channel_connectivity_internal.cc
- src/core/ext/filters/client_channel/client_channel.cc
- src/core/ext/filters/client_channel/client_channel_factory.cc
- src/core/ext/filters/client_channel/client_channel_plugin.cc
- src/core/ext/filters/client_channel/connectivity_watcher.cc
- src/core/ext/filters/client_channel/connector.cc
- src/core/ext/filters/client_channel/http_connect_handshaker.cc
- src/core/ext/filters/client_channel/http_proxy.cc

@ -278,12 +278,11 @@ if test "$PHP_GRPC" != "no"; then
src/core/tsi/transport_security_adapter.cc \
src/core/ext/transport/chttp2/server/chttp2_server.cc \
src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/channel_connectivity_internal.cc \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connectivity_watcher.cc \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
src/core/ext/filters/client_channel/http_proxy.cc \

@ -255,12 +255,11 @@ if (PHP_GRPC != "no") {
"src\\core\\tsi\\transport_security_adapter.cc " +
"src\\core\\ext\\transport\\chttp2\\server\\chttp2_server.cc " +
"src\\core\\ext\\transport\\chttp2\\client\\secure\\secure_channel_create.cc " +
"src\\core\\ext\\filters\\client_channel\\backup_poller.cc " +
"src\\core\\ext\\filters\\client_channel\\channel_connectivity.cc " +
"src\\core\\ext\\filters\\client_channel\\channel_connectivity_internal.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel_factory.cc " +
"src\\core\\ext\\filters\\client_channel\\client_channel_plugin.cc " +
"src\\core\\ext\\filters\\client_channel\\connectivity_watcher.cc " +
"src\\core\\ext\\filters\\client_channel\\connector.cc " +
"src\\core\\ext\\filters\\client_channel\\http_connect_handshaker.cc " +
"src\\core\\ext\\filters\\client_channel\\http_proxy.cc " +

@ -299,10 +299,9 @@ Pod::Spec.new do |s|
'src/core/tsi/transport_security_adapter.h',
'src/core/tsi/transport_security_interface.h',
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/filters/client_channel/channel_connectivity_internal.h',
'src/core/ext/filters/client_channel/backup_poller.h',
'src/core/ext/filters/client_channel/client_channel.h',
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/connectivity_watcher.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/http_connect_handshaker.h',
'src/core/ext/filters/client_channel/http_proxy.h',
@ -670,12 +669,11 @@ Pod::Spec.new do |s|
'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/server/chttp2_server.cc',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/channel_connectivity_internal.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connectivity_watcher.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
'src/core/ext/filters/client_channel/http_proxy.cc',
@ -806,10 +804,9 @@ Pod::Spec.new do |s|
'src/core/tsi/transport_security_adapter.h',
'src/core/tsi/transport_security_interface.h',
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/filters/client_channel/channel_connectivity_internal.h',
'src/core/ext/filters/client_channel/backup_poller.h',
'src/core/ext/filters/client_channel/client_channel.h',
'src/core/ext/filters/client_channel/client_channel_factory.h',
'src/core/ext/filters/client_channel/connectivity_watcher.h',
'src/core/ext/filters/client_channel/connector.h',
'src/core/ext/filters/client_channel/http_connect_handshaker.h',
'src/core/ext/filters/client_channel/http_proxy.h',

@ -230,10 +230,9 @@ Gem::Specification.new do |s|
s.files += %w( src/core/tsi/transport_security_adapter.h )
s.files += %w( src/core/tsi/transport_security_interface.h )
s.files += %w( src/core/ext/transport/chttp2/server/chttp2_server.h )
s.files += %w( src/core/ext/filters/client_channel/channel_connectivity_internal.h )
s.files += %w( src/core/ext/filters/client_channel/backup_poller.h )
s.files += %w( src/core/ext/filters/client_channel/client_channel.h )
s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.h )
s.files += %w( src/core/ext/filters/client_channel/connectivity_watcher.h )
s.files += %w( src/core/ext/filters/client_channel/connector.h )
s.files += %w( src/core/ext/filters/client_channel/http_connect_handshaker.h )
s.files += %w( src/core/ext/filters/client_channel/http_proxy.h )
@ -605,12 +604,11 @@ Gem::Specification.new do |s|
s.files += %w( src/core/tsi/transport_security_adapter.cc )
s.files += %w( src/core/ext/transport/chttp2/server/chttp2_server.cc )
s.files += %w( src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc )
s.files += %w( src/core/ext/filters/client_channel/backup_poller.cc )
s.files += %w( src/core/ext/filters/client_channel/channel_connectivity.cc )
s.files += %w( src/core/ext/filters/client_channel/channel_connectivity_internal.cc )
s.files += %w( src/core/ext/filters/client_channel/client_channel.cc )
s.files += %w( src/core/ext/filters/client_channel/client_channel_factory.cc )
s.files += %w( src/core/ext/filters/client_channel/client_channel_plugin.cc )
s.files += %w( src/core/ext/filters/client_channel/connectivity_watcher.cc )
s.files += %w( src/core/ext/filters/client_channel/connector.cc )
s.files += %w( src/core/ext/filters/client_channel/http_connect_handshaker.cc )
s.files += %w( src/core/ext/filters/client_channel/http_proxy.cc )

@ -419,12 +419,11 @@
'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/server/chttp2_server.cc',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/channel_connectivity_internal.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connectivity_watcher.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
'src/core/ext/filters/client_channel/http_proxy.cc',
@ -663,12 +662,11 @@
'src/core/lib/transport/transport.cc',
'src/core/lib/transport/transport_op_string.cc',
'src/core/lib/debug/trace.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/channel_connectivity_internal.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connectivity_watcher.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
'src/core/ext/filters/client_channel/http_proxy.cc',
@ -872,12 +870,11 @@
'src/core/lib/transport/transport.cc',
'src/core/lib/transport/transport_op_string.cc',
'src/core/lib/debug/trace.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/channel_connectivity_internal.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connectivity_watcher.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
'src/core/ext/filters/client_channel/http_proxy.cc',
@ -1099,12 +1096,11 @@
'src/core/ext/transport/chttp2/client/insecure/channel_create.cc',
'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc',
'src/core/ext/transport/chttp2/client/chttp2_connector.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/channel_connectivity_internal.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connectivity_watcher.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
'src/core/ext/filters/client_channel/http_proxy.cc',

@ -242,10 +242,9 @@
<file baseinstalldir="/" name="src/core/tsi/transport_security_adapter.h" role="src" />
<file baseinstalldir="/" name="src/core/tsi/transport_security_interface.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/server/chttp2_server.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/channel_connectivity_internal.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/backup_poller.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/connectivity_watcher.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/connector.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/http_connect_handshaker.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/http_proxy.h" role="src" />
@ -617,12 +616,11 @@
<file baseinstalldir="/" name="src/core/tsi/transport_security_adapter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/server/chttp2_server.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/backup_poller.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/channel_connectivity.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/channel_connectivity_internal.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_factory.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/client_channel_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/connectivity_watcher.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/connector.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/http_connect_handshaker.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/http_proxy.cc" role="src" />

@ -0,0 +1,143 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#define DEFAULT_POLLING_INTERVAL_MS 500
typedef struct backup_poller {
grpc_timer polling_timer;
grpc_closure run_poller_closure;
grpc_closure shutdown_closure;
gpr_mu* pollset_mu;
grpc_pollset* pollset;
gpr_refcount refs;
gpr_refcount shutdown_refs;
} backup_poller;
static gpr_once g_once = GPR_ONCE_INIT;
static gpr_mu g_poller_mu;
static backup_poller* g_poller = NULL;
static void init_g_poller_mu() { gpr_mu_init(&g_poller_mu); }
static bool is_disabled() {
char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_backup_poller");
bool disabled = gpr_is_true(env);
gpr_free(env);
return disabled;
}
static bool backup_poller_shutdown_unref(grpc_exec_ctx* exec_ctx,
backup_poller* p) {
if (gpr_unref(&p->shutdown_refs)) {
grpc_pollset_destroy(exec_ctx, p->pollset);
gpr_free(p->pollset);
gpr_free(p);
}
return true;
}
static void done_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
backup_poller_shutdown_unref(exec_ctx, (backup_poller*)arg);
}
static void g_poller_unref(grpc_exec_ctx* exec_ctx) {
if (gpr_unref(&g_poller->refs)) {
gpr_mu_lock(&g_poller_mu);
backup_poller* p = g_poller;
g_poller = NULL;
gpr_mu_unlock(&g_poller_mu);
grpc_timer_cancel(exec_ctx, &p->polling_timer);
gpr_mu_lock(p->pollset_mu);
grpc_pollset_shutdown(exec_ctx, p->pollset,
GRPC_CLOSURE_INIT(&p->shutdown_closure, done_poller,
p, grpc_schedule_on_exec_ctx));
gpr_mu_unlock(p->pollset_mu);
}
}
static void run_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
backup_poller* p = (backup_poller*)arg;
if (error != GRPC_ERROR_NONE) {
if (error != GRPC_ERROR_CANCELLED) {
GRPC_LOG_IF_ERROR("check_connectivity_state", error);
}
backup_poller_shutdown_unref(exec_ctx, p);
return;
}
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(p->pollset_mu);
grpc_error* err = grpc_pollset_work(exec_ctx, p->pollset, NULL, now,
gpr_inf_past(GPR_CLOCK_MONOTONIC));
gpr_mu_unlock(p->pollset_mu);
GRPC_LOG_IF_ERROR("Run client channel backup poller", err);
grpc_timer_init(
exec_ctx, &p->polling_timer,
gpr_time_add(
now, gpr_time_from_millis(DEFAULT_POLLING_INTERVAL_MS, GPR_TIMESPAN)),
&p->run_poller_closure, now);
}
void grpc_client_channel_start_backup_polling(
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) {
if (is_disabled()) return;
gpr_once_init(&g_once, init_g_poller_mu);
gpr_mu_lock(&g_poller_mu);
if (g_poller == NULL) {
g_poller = (backup_poller*)gpr_zalloc(sizeof(backup_poller));
g_poller->pollset = (grpc_pollset*)gpr_malloc(grpc_pollset_size());
grpc_pollset_init(g_poller->pollset, &g_poller->pollset_mu);
gpr_ref_init(&g_poller->refs, 0);
// one for timer cancellation, one for pollset shutdown
gpr_ref_init(&g_poller->shutdown_refs, 2);
GRPC_CLOSURE_INIT(&g_poller->run_poller_closure, run_poller, g_poller,
grpc_schedule_on_exec_ctx);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_timer_init(
exec_ctx, &g_poller->polling_timer,
gpr_time_add(now, gpr_time_from_millis(DEFAULT_POLLING_INTERVAL_MS,
GPR_TIMESPAN)),
&g_poller->run_poller_closure, now);
}
gpr_ref(&g_poller->refs);
gpr_mu_unlock(&g_poller_mu);
grpc_pollset_set_add_pollset(exec_ctx, interested_parties, g_poller->pollset);
}
void grpc_client_channel_stop_backup_polling(
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) {
if (is_disabled()) return;
grpc_pollset_set_del_pollset(exec_ctx, interested_parties, g_poller->pollset);
g_poller_unref(exec_ctx);
}

@ -16,8 +16,8 @@
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_stack.h"
@ -25,8 +25,10 @@
/* Constantly watches client channel connectivity status to reconnect a
* transiently disconnected channel */
void grpc_client_channel_start_watching_connectivity(
grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem,
grpc_channel_stack* channel_stack);
void grpc_client_channel_start_backup_polling(
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H */
void grpc_client_channel_stop_backup_polling(
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H */

@ -23,8 +23,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/channel_connectivity_internal.h"
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/completion_queue.h"
@ -52,6 +52,125 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
return GRPC_CHANNEL_SHUTDOWN;
}
typedef enum {
WAITING,
READY_TO_CALL_BACK,
CALLING_BACK_AND_FINISHED,
} callback_phase;
typedef struct {
gpr_mu mu;
callback_phase phase;
grpc_closure on_complete;
grpc_closure on_timeout;
grpc_closure watcher_timer_init;
grpc_timer alarm;
grpc_connectivity_state state;
grpc_completion_queue *cq;
grpc_cq_completion completion_storage;
grpc_channel *channel;
grpc_error *error;
void *tag;
} state_watcher;
static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(w->channel));
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
"watch_channel_connectivity");
} else {
abort();
}
gpr_mu_destroy(&w->mu);
gpr_free(w);
}
static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
grpc_cq_completion *ignored) {
bool should_delete = false;
state_watcher *w = (state_watcher *)pw;
gpr_mu_lock(&w->mu);
switch (w->phase) {
case WAITING:
case READY_TO_CALL_BACK:
GPR_UNREACHABLE_CODE(return );
case CALLING_BACK_AND_FINISHED:
should_delete = true;
break;
}
gpr_mu_unlock(&w->mu);
if (should_delete) {
delete_state_watcher(exec_ctx, w);
}
}
static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
bool due_to_completion, grpc_error *error) {
if (due_to_completion) {
grpc_timer_cancel(exec_ctx, &w->alarm);
} else {
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(w->channel));
grpc_client_channel_watch_connectivity_state(
exec_ctx, client_channel_elem,
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)), NULL,
&w->on_complete, NULL);
}
gpr_mu_lock(&w->mu);
if (due_to_completion) {
if (GRPC_TRACER_ON(grpc_trace_operation_failures)) {
GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
} else {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Timed out waiting for connection state change");
} else if (error == GRPC_ERROR_CANCELLED) {
error = GRPC_ERROR_NONE;
}
}
switch (w->phase) {
case WAITING:
GRPC_ERROR_REF(error);
w->error = error;
w->phase = READY_TO_CALL_BACK;
break;
case READY_TO_CALL_BACK:
if (error != GRPC_ERROR_NONE) {
GPR_ASSERT(!due_to_completion);
GRPC_ERROR_UNREF(w->error);
GRPC_ERROR_REF(error);
w->error = error;
}
w->phase = CALLING_BACK_AND_FINISHED;
grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w,
&w->completion_storage);
break;
case CALLING_BACK_AND_FINISHED:
GPR_UNREACHABLE_CODE(return );
break;
}
gpr_mu_unlock(&w->mu);
GRPC_ERROR_UNREF(error);
}
static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw,
grpc_error *error) {
partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error));
}
static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
grpc_error *error) {
partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error));
}
int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) {
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
@ -83,10 +202,10 @@ int grpc_channel_support_connectivity_watcher(grpc_channel *channel) {
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(channel_stack);
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
state_watcher *w = (state_watcher *)gpr_malloc(sizeof(*w));
GRPC_API_TRACE(
"grpc_channel_watch_connectivity_state("
@ -96,8 +215,37 @@ void grpc_channel_watch_connectivity_state(
"cq=%p, tag=%p)",
7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, cq, tag));
grpc_channel_watch_connectivity_state_internal(
&exec_ctx, client_channel_elem, channel_stack, last_observed_state,
deadline, cq, tag);
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
gpr_mu_init(&w->mu);
GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&w->on_timeout, timeout_complete, w,
grpc_schedule_on_exec_ctx);
w->phase = WAITING;
w->state = last_observed_state;
w->cq = cq;
w->tag = tag;
w->channel = channel;
w->error = NULL;
watcher_timer_init_arg *wa =
(watcher_timer_init_arg *)gpr_malloc(sizeof(watcher_timer_init_arg));
wa->w = w;
wa->deadline = deadline;
GRPC_CLOSURE_INIT(&w->watcher_timer_init, watcher_timer_init, wa,
grpc_schedule_on_exec_ctx);
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
grpc_client_channel_watch_connectivity_state(
&exec_ctx, client_channel_elem,
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state,
&w->on_complete, &w->watcher_timer_init);
} else {
abort();
}
grpc_exec_ctx_finish(&exec_ctx);
}

@ -1,195 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/ext/filters/client_channel/channel_connectivity_internal.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/completion_queue.h"
typedef enum {
WAITING,
READY_TO_CALL_BACK,
CALLING_BACK_AND_FINISHED,
} callback_phase;
typedef struct {
gpr_mu mu;
callback_phase phase;
grpc_closure on_complete;
grpc_closure on_timeout;
grpc_closure watcher_timer_init;
grpc_timer alarm;
grpc_connectivity_state state;
grpc_completion_queue *cq;
grpc_cq_completion completion_storage;
grpc_channel_element *client_channel_elem;
grpc_channel_stack *channel_stack;
grpc_error *error;
void *tag;
} state_watcher;
static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->channel_stack,
"watch_channel_connectivity");
gpr_mu_destroy(&w->mu);
gpr_free(w);
}
static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
grpc_cq_completion *ignored) {
bool should_delete = false;
state_watcher *w = (state_watcher *)pw;
gpr_mu_lock(&w->mu);
switch (w->phase) {
case WAITING:
case READY_TO_CALL_BACK:
GPR_UNREACHABLE_CODE(return );
case CALLING_BACK_AND_FINISHED:
should_delete = true;
break;
}
gpr_mu_unlock(&w->mu);
if (should_delete) {
delete_state_watcher(exec_ctx, w);
}
}
static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
bool due_to_completion, grpc_error *error) {
if (due_to_completion) {
grpc_timer_cancel(exec_ctx, &w->alarm);
} else {
grpc_channel_element *client_channel_elem = w->client_channel_elem;
grpc_client_channel_watch_connectivity_state(
exec_ctx, client_channel_elem,
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)), NULL,
&w->on_complete, NULL);
}
gpr_mu_lock(&w->mu);
if (due_to_completion) {
if (GRPC_TRACER_ON(grpc_trace_operation_failures)) {
GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
} else {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Timed out waiting for connection state change");
} else if (error == GRPC_ERROR_CANCELLED) {
error = GRPC_ERROR_NONE;
}
}
switch (w->phase) {
case WAITING:
GRPC_ERROR_REF(error);
w->error = error;
w->phase = READY_TO_CALL_BACK;
break;
case READY_TO_CALL_BACK:
if (error != GRPC_ERROR_NONE) {
GPR_ASSERT(!due_to_completion);
GRPC_ERROR_UNREF(w->error);
GRPC_ERROR_REF(error);
w->error = error;
}
w->phase = CALLING_BACK_AND_FINISHED;
grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w,
&w->completion_storage);
break;
case CALLING_BACK_AND_FINISHED:
GPR_UNREACHABLE_CODE(return );
break;
}
gpr_mu_unlock(&w->mu);
GRPC_ERROR_UNREF(error);
}
static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw,
grpc_error *error) {
partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error));
}
static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
grpc_error *error) {
partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error));
}
typedef struct watcher_timer_init_arg {
state_watcher *w;
gpr_timespec deadline;
} watcher_timer_init_arg;
static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
watcher_timer_init_arg *wa = (watcher_timer_init_arg *)arg;
grpc_timer_init(exec_ctx, &wa->w->alarm,
gpr_convert_clock_type(wa->deadline, GPR_CLOCK_MONOTONIC),
&wa->w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
gpr_free(wa);
}
void grpc_channel_watch_connectivity_state_internal(
grpc_exec_ctx *exec_ctx, grpc_channel_element *client_channel_elem,
grpc_channel_stack *channel_stack,
grpc_connectivity_state last_observed_state, gpr_timespec deadline,
grpc_completion_queue *cq, void *tag) {
state_watcher *w = (state_watcher *)gpr_malloc(sizeof(*w));
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
gpr_mu_init(&w->mu);
GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&w->on_timeout, timeout_complete, w,
grpc_schedule_on_exec_ctx);
w->phase = WAITING;
w->state = last_observed_state;
w->cq = cq;
w->tag = tag;
w->client_channel_elem = client_channel_elem;
w->channel_stack = channel_stack;
w->error = NULL;
watcher_timer_init_arg *wa =
(watcher_timer_init_arg *)gpr_malloc(sizeof(watcher_timer_init_arg));
wa->w = w;
wa->deadline = deadline;
GRPC_CLOSURE_INIT(&w->watcher_timer_init, watcher_timer_init, wa,
grpc_schedule_on_exec_ctx);
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_STACK_REF(channel_stack, "watch_channel_connectivity");
grpc_client_channel_watch_connectivity_state(
exec_ctx, client_channel_elem,
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state,
&w->on_complete, &w->watcher_timer_init);
} else {
abort();
}
}

@ -1,33 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/exec_ctx.h"
void grpc_channel_watch_connectivity_state_internal(
grpc_exec_ctx *exec_ctx, grpc_channel_element *client_channel_elem,
grpc_channel_stack *channel_stack,
grpc_connectivity_state last_observed_state, gpr_timespec deadline,
grpc_completion_queue *cq, void *tag);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H \
*/

@ -31,7 +31,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
#include "src/core/ext/filters/client_channel/connectivity_watcher.h"
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
@ -754,8 +754,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
}
chand->deadline_checking_enabled =
grpc_deadline_checking_enabled(args->channel_args);
grpc_client_channel_start_watching_connectivity(exec_ctx, elem,
chand->owning_stack);
grpc_client_channel_start_backup_polling(exec_ctx, chand->interested_parties);
return GRPC_ERROR_NONE;
}
@ -793,6 +792,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
grpc_client_channel_stop_backup_polling(exec_ctx, chand->interested_parties);
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");

@ -1,179 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/ext/filters/client_channel/connectivity_watcher.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/ext/filters/client_channel/channel_connectivity_internal.h"
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#define DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS 500
typedef struct connectivity_watcher {
grpc_timer watcher_timer;
grpc_closure check_connectivity_closure;
grpc_completion_queue* cq;
gpr_refcount refs;
size_t channel_count;
bool shutting_down;
} connectivity_watcher;
typedef struct channel_state {
grpc_channel_element* client_channel_elem;
grpc_channel_stack* channel_stack;
grpc_connectivity_state state;
} channel_state;
static gpr_once g_once = GPR_ONCE_INIT;
static gpr_mu g_watcher_mu;
static connectivity_watcher* g_watcher = NULL;
static void init_g_watcher_mu() { gpr_mu_init(&g_watcher_mu); }
static void start_watching_locked(grpc_exec_ctx* exec_ctx,
grpc_channel_element* client_channel_elem,
grpc_channel_stack* channel_stack) {
gpr_ref(&g_watcher->refs);
++g_watcher->channel_count;
channel_state* s = (channel_state*)gpr_zalloc(sizeof(channel_state));
s->client_channel_elem = client_channel_elem;
s->channel_stack = channel_stack;
s->state = GRPC_CHANNEL_IDLE;
grpc_channel_watch_connectivity_state_internal(
exec_ctx, client_channel_elem, channel_stack, s->state,
gpr_inf_future(GPR_CLOCK_MONOTONIC), g_watcher->cq, (void*)s);
}
static bool is_disabled() {
char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
bool disabled = gpr_is_true(env);
gpr_free(env);
return disabled;
}
static bool connectivity_watcher_unref(grpc_exec_ctx* exec_ctx) {
if (gpr_unref(&g_watcher->refs)) {
gpr_mu_lock(&g_watcher_mu);
grpc_completion_queue_destroy(g_watcher->cq);
gpr_free(g_watcher);
g_watcher = NULL;
gpr_mu_unlock(&g_watcher_mu);
return true;
}
return false;
}
static void check_connectivity_state(grpc_exec_ctx* exec_ctx, void* ignored,
grpc_error* error) {
grpc_event ev;
while (true) {
gpr_mu_lock(&g_watcher_mu);
if (g_watcher->shutting_down) {
// Drain cq if the watcher is shutting down
ev = grpc_completion_queue_next(
g_watcher->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
} else {
ev = grpc_completion_queue_next(g_watcher->cq,
gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL);
// Make sure we've seen 2 TIMEOUTs before going to sleep
if (ev.type == GRPC_QUEUE_TIMEOUT) {
ev = grpc_completion_queue_next(
g_watcher->cq, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_timer_init(
exec_ctx, &g_watcher->watcher_timer,
gpr_time_add(now, gpr_time_from_millis(
DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS,
GPR_TIMESPAN)),
&g_watcher->check_connectivity_closure, now);
gpr_mu_unlock(&g_watcher_mu);
break;
}
}
}
gpr_mu_unlock(&g_watcher_mu);
if (ev.type != GRPC_OP_COMPLETE) {
break;
}
channel_state* s = (channel_state*)(ev.tag);
s->state = grpc_client_channel_check_connectivity_state(
exec_ctx, s->client_channel_elem, false /* try_to_connect */);
if (s->state == GRPC_CHANNEL_SHUTDOWN) {
GRPC_CHANNEL_STACK_UNREF(exec_ctx, s->channel_stack,
"connectivity_watcher_stop_watching");
gpr_free(s);
if (connectivity_watcher_unref(exec_ctx)) {
break;
}
} else {
grpc_channel_watch_connectivity_state_internal(
exec_ctx, s->client_channel_elem, s->channel_stack, s->state,
gpr_inf_future(GPR_CLOCK_MONOTONIC), g_watcher->cq, s);
}
}
}
void grpc_client_channel_start_watching_connectivity(
grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem,
grpc_channel_stack* channel_stack) {
if (is_disabled()) return;
GRPC_CHANNEL_STACK_REF(channel_stack, "connectivity_watcher_start_watching");
gpr_once_init(&g_once, init_g_watcher_mu);
gpr_mu_lock(&g_watcher_mu);
if (g_watcher == NULL) {
g_watcher = (connectivity_watcher*)gpr_zalloc(sizeof(connectivity_watcher));
g_watcher->cq = grpc_completion_queue_create_internal(
GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING);
gpr_ref_init(&g_watcher->refs, 0);
GRPC_CLOSURE_INIT(&g_watcher->check_connectivity_closure,
check_connectivity_state, NULL,
grpc_schedule_on_exec_ctx);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_timer_init(
exec_ctx, &g_watcher->watcher_timer,
gpr_time_add(
now, gpr_time_from_millis(DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS,
GPR_TIMESPAN)),
&g_watcher->check_connectivity_closure, now);
}
start_watching_locked(exec_ctx, client_channel_elem, channel_stack);
gpr_mu_init(&g_watcher_mu);
}
void grpc_client_channel_stop_watching_connectivity(
grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem,
grpc_channel_stack* channel_stack) {
if (is_disabled()) return;
gpr_once_init(&g_once, init_g_watcher_mu);
gpr_mu_lock(&g_watcher_mu);
if (--g_watcher->channel_count == 0) {
g_watcher->shutting_down = true;
grpc_timer_cancel(exec_ctx, &g_watcher->watcher_timer);
connectivity_watcher_unref(exec_ctx);
}
gpr_mu_unlock(&g_watcher_mu);
}

@ -254,12 +254,11 @@ CORE_SOURCE_FILES = [
'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/server/chttp2_server.cc',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc',
'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/channel_connectivity_internal.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
'src/core/ext/filters/client_channel/client_channel_plugin.cc',
'src/core/ext/filters/client_channel/connectivity_watcher.cc',
'src/core/ext/filters/client_channel/connector.cc',
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
'src/core/ext/filters/client_channel/http_proxy.cc',

@ -470,8 +470,9 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) {
BuildAndStartServer();
// It needs more than kConnectivityCheckIntervalMsec time to reconnect the
// channel.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1600, GPR_TIMESPAN)));
gpr_sleep_until(gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(600 * grpc_test_slowdown_factor(), GPR_TIMESPAN)));
SendRpc(1);
}

@ -709,8 +709,9 @@ TEST_P(End2endTest, ReconnectChannel) {
RestartServer(std::shared_ptr<AuthMetadataProcessor>());
// It needs more than kConnectivityCheckIntervalMsec time to reconnect the
// channel.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1600, GPR_TIMESPAN)));
gpr_sleep_until(gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(600 * grpc_test_slowdown_factor(), GPR_TIMESPAN)));
SendRpc(stub_.get(), 1, false);
}

@ -907,16 +907,14 @@ src/core/ext/census/trace_string.h \
src/core/ext/census/tracing.cc \
src/core/ext/census/tracing.h \
src/core/ext/filters/client_channel/README.md \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/backup_poller.h \
src/core/ext/filters/client_channel/channel_connectivity.cc \
src/core/ext/filters/client_channel/channel_connectivity_internal.cc \
src/core/ext/filters/client_channel/channel_connectivity_internal.h \
src/core/ext/filters/client_channel/client_channel.cc \
src/core/ext/filters/client_channel/client_channel.h \
src/core/ext/filters/client_channel/client_channel_factory.cc \
src/core/ext/filters/client_channel/client_channel_factory.h \
src/core/ext/filters/client_channel/client_channel_plugin.cc \
src/core/ext/filters/client_channel/connectivity_watcher.cc \
src/core/ext/filters/client_channel/connectivity_watcher.h \
src/core/ext/filters/client_channel/connector.cc \
src/core/ext/filters/client_channel/connector.h \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \

@ -8467,10 +8467,9 @@
"grpc_deadline_filter"
],
"headers": [
"src/core/ext/filters/client_channel/channel_connectivity_internal.h",
"src/core/ext/filters/client_channel/backup_poller.h",
"src/core/ext/filters/client_channel/client_channel.h",
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/connectivity_watcher.h",
"src/core/ext/filters/client_channel/connector.h",
"src/core/ext/filters/client_channel/http_connect_handshaker.h",
"src/core/ext/filters/client_channel/http_proxy.h",
@ -8492,16 +8491,14 @@
"language": "c",
"name": "grpc_client_channel",
"src": [
"src/core/ext/filters/client_channel/backup_poller.cc",
"src/core/ext/filters/client_channel/backup_poller.h",
"src/core/ext/filters/client_channel/channel_connectivity.cc",
"src/core/ext/filters/client_channel/channel_connectivity_internal.cc",
"src/core/ext/filters/client_channel/channel_connectivity_internal.h",
"src/core/ext/filters/client_channel/client_channel.cc",
"src/core/ext/filters/client_channel/client_channel.h",
"src/core/ext/filters/client_channel/client_channel_factory.cc",
"src/core/ext/filters/client_channel/client_channel_factory.h",
"src/core/ext/filters/client_channel/client_channel_plugin.cc",
"src/core/ext/filters/client_channel/connectivity_watcher.cc",
"src/core/ext/filters/client_channel/connectivity_watcher.h",
"src/core/ext/filters/client_channel/connector.cc",
"src/core/ext/filters/client_channel/connector.h",
"src/core/ext/filters/client_channel/http_connect_handshaker.cc",

Loading…
Cancel
Save