Merge pull request #12878 from markdroth/pick_first_subchannel_list

Refactor subchannel_list code out of RR and use it in PF.
pull/13069/head
Mark D. Roth 8 years ago committed by GitHub
commit 662ec97674
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      BUILD
  2. 2
      CMakeLists.txt
  3. 2
      Makefile
  4. 1
      binding.gyp
  5. 10
      build.yaml
  6. 2
      config.m4
  7. 1
      config.w32
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 2
      grpc.gyp
  11. 2
      include/grpc/impl/codegen/connectivity_state.h
  12. 2
      package.xml
  13. 4
      src/core/ext/filters/client_channel/client_channel.cc
  14. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  15. 686
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  16. 452
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  17. 265
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
  18. 153
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  19. 4
      src/core/ext/filters/client_channel/subchannel.h
  20. 3
      src/core/lib/transport/connectivity_state.cc
  21. 1
      src/python/grpcio/grpc_core_dependencies.py
  22. 4
      test/core/client_channel/lb_policies_test.c
  23. 4
      test/cpp/end2end/client_lb_end2end_test.cc
  24. 2
      tools/doxygen/Doxyfile.core.internal
  25. 25
      tools/run_tests/generated/sources_and_headers.json

17
BUILD

@ -1074,6 +1074,21 @@ grpc_cc_library(
], ],
) )
grpc_cc_library(
name = "grpc_lb_subchannel_list",
srcs = [
"src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc",
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/subchannel_list.h",
],
language = "c++",
deps = [
"grpc_base",
"grpc_client_channel",
],
)
grpc_cc_library( grpc_cc_library(
name = "grpc_lb_policy_pick_first", name = "grpc_lb_policy_pick_first",
srcs = [ srcs = [
@ -1083,6 +1098,7 @@ grpc_cc_library(
deps = [ deps = [
"grpc_base", "grpc_base",
"grpc_client_channel", "grpc_client_channel",
"grpc_lb_subchannel_list",
], ],
) )
@ -1095,6 +1111,7 @@ grpc_cc_library(
deps = [ deps = [
"grpc_base", "grpc_base",
"grpc_client_channel", "grpc_client_channel",
"grpc_lb_subchannel_list",
], ],
) )

@ -1192,6 +1192,7 @@ add_library(grpc
third_party/nanopb/pb_encode.c third_party/nanopb/pb_encode.c
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
@ -2358,6 +2359,7 @@ add_library(grpc_unsecure
third_party/nanopb/pb_decode.c third_party/nanopb/pb_decode.c
third_party/nanopb/pb_encode.c third_party/nanopb/pb_encode.c
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
src/core/ext/census/base_resources.cc src/core/ext/census/base_resources.cc
src/core/ext/census/context.cc src/core/ext/census/context.cc

@ -3192,6 +3192,7 @@ LIBGRPC_SRC = \
third_party/nanopb/pb_encode.c \ third_party/nanopb/pb_encode.c \
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc \ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \
@ -4322,6 +4323,7 @@ LIBGRPC_UNSECURE_SRC = \
third_party/nanopb/pb_decode.c \ third_party/nanopb/pb_decode.c \
third_party/nanopb/pb_encode.c \ third_party/nanopb/pb_encode.c \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/census/base_resources.cc \ src/core/ext/census/base_resources.cc \
src/core/ext/census/context.cc \ src/core/ext/census/context.cc \

@ -892,6 +892,7 @@
'third_party/nanopb/pb_encode.c', 'third_party/nanopb/pb_encode.c',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',

@ -590,6 +590,7 @@ filegroups:
uses: uses:
- grpc_base - grpc_base
- grpc_client_channel - grpc_client_channel
- grpc_lb_subchannel_list
- name: grpc_lb_policy_round_robin - name: grpc_lb_policy_round_robin
src: src:
- src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc - src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@ -597,6 +598,15 @@ filegroups:
uses: uses:
- grpc_base - grpc_base
- grpc_client_channel - grpc_client_channel
- grpc_lb_subchannel_list
- name: grpc_lb_subchannel_list
headers:
- src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
src:
- src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
uses:
- grpc_base
- grpc_client_channel
- name: grpc_max_age_filter - name: grpc_max_age_filter
headers: headers:
- src/core/ext/filters/max_age/max_age_filter.h - src/core/ext/filters/max_age/max_age_filter.h

@ -317,6 +317,7 @@ if test "$PHP_GRPC" != "no"; then
third_party/nanopb/pb_encode.c \ third_party/nanopb/pb_encode.c \
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc \ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \
@ -660,6 +661,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/census) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/census)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/census/gen) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/census/gen)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/grpclb) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/grpclb)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/pick_first) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/pick_first)

@ -294,6 +294,7 @@ if (PHP_GRPC != "no") {
"third_party\\nanopb\\pb_encode.c " + "third_party\\nanopb\\pb_encode.c " +
"src\\core\\ext\\filters\\client_channel\\resolver\\fake\\fake_resolver.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\fake\\fake_resolver.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\pick_first\\pick_first.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\pick_first\\pick_first.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\subchannel_list.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin\\round_robin.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin\\round_robin.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\dns_resolver_ares.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\dns_resolver_ares.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_ev_driver_posix.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_ev_driver_posix.cc " +

@ -448,6 +448,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
'src/core/ext/filters/load_reporting/server_load_reporting_filter.h', 'src/core/ext/filters/load_reporting/server_load_reporting_filter.h',
@ -704,6 +705,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',
@ -951,6 +953,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
'src/core/ext/filters/load_reporting/server_load_reporting_filter.h', 'src/core/ext/filters/load_reporting/server_load_reporting_filter.h',

@ -383,6 +383,7 @@ Gem::Specification.new do |s|
s.files += %w( third_party/nanopb/pb_decode.h ) s.files += %w( third_party/nanopb/pb_decode.h )
s.files += %w( third_party/nanopb/pb_encode.h ) s.files += %w( third_party/nanopb/pb_encode.h )
s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h ) s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/subchannel_list.h )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h )
s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_filter.h ) s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_filter.h )
@ -642,6 +643,7 @@ Gem::Specification.new do |s|
s.files += %w( third_party/nanopb/pb_encode.c ) s.files += %w( third_party/nanopb/pb_encode.c )
s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc )

@ -458,6 +458,7 @@
'third_party/nanopb/pb_encode.c', 'third_party/nanopb/pb_encode.c',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',
@ -1135,6 +1136,7 @@
'third_party/nanopb/pb_decode.c', 'third_party/nanopb/pb_decode.c',
'third_party/nanopb/pb_encode.c', 'third_party/nanopb/pb_encode.c',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/census/base_resources.cc', 'src/core/ext/census/base_resources.cc',
'src/core/ext/census/context.cc', 'src/core/ext/census/context.cc',

@ -25,8 +25,6 @@ extern "C" {
/** Connectivity state of a channel. */ /** Connectivity state of a channel. */
typedef enum { typedef enum {
/** channel has just been initialized */
GRPC_CHANNEL_INIT = -1,
/** channel is idle */ /** channel is idle */
GRPC_CHANNEL_IDLE, GRPC_CHANNEL_IDLE,
/** channel is connecting */ /** channel is connecting */

@ -395,6 +395,7 @@
<file baseinstalldir="/" name="third_party/nanopb/pb_decode.h" role="src" /> <file baseinstalldir="/" name="third_party/nanopb/pb_decode.h" role="src" />
<file baseinstalldir="/" name="third_party/nanopb/pb_encode.h" role="src" /> <file baseinstalldir="/" name="third_party/nanopb/pb_encode.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_filter.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_filter.h" role="src" />
@ -654,6 +655,7 @@
<file baseinstalldir="/" name="third_party/nanopb/pb_encode.c" role="src" /> <file baseinstalldir="/" name="third_party/nanopb/pb_encode.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc" role="src" />

@ -898,7 +898,7 @@ static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx,
call_data *calld = (call_data *)elem->call_data; call_data *calld = (call_data *)elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s", "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
elem->channel_data, calld, calld->waiting_for_pick_batches_count, elem->channel_data, calld, calld->waiting_for_pick_batches_count,
grpc_error_string(error)); grpc_error_string(error));
} }
@ -940,7 +940,7 @@ static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx,
channel_data *chand = (channel_data *)elem->channel_data; channel_data *chand = (channel_data *)elem->channel_data;
call_data *calld = (call_data *)elem->call_data; call_data *calld = (call_data *)elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIuPTR
" pending batches to subchannel_call=%p", " pending batches to subchannel_call=%p",
chand, calld, calld->waiting_for_pick_batches_count, chand, calld, calld->waiting_for_pick_batches_count,
calld->subchannel_call); calld->subchannel_call);

@ -611,7 +611,6 @@ static void update_lb_connectivity_status_locked(
case GRPC_CHANNEL_SHUTDOWN: case GRPC_CHANNEL_SHUTDOWN:
GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
break; break;
case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY: case GRPC_CHANNEL_READY:
@ -1790,7 +1789,6 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
// embedded RR policy. Note that the current RR policy, if any, will stay in // embedded RR policy. Note that the current RR policy, if any, will stay in
// effect until an update from the new lb_call is received. // effect until an update from the new lb_call is received.
switch (glb_policy->lb_channel_connectivity) { switch (glb_policy->lb_channel_connectivity) {
case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_TRANSIENT_FAILURE: { case GRPC_CHANNEL_TRANSIENT_FAILURE: {
/* resub. */ /* resub. */

@ -20,6 +20,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/ext/filters/client_channel/subchannel_index.h"
@ -42,103 +43,73 @@ typedef struct {
/** base policy: must be first */ /** base policy: must be first */
grpc_lb_policy base; grpc_lb_policy base;
/** all our subchannels */ /** all our subchannels */
grpc_subchannel **subchannels; grpc_lb_subchannel_list *subchannel_list;
grpc_subchannel **new_subchannels; /** latest pending subchannel list */
size_t num_subchannels; grpc_lb_subchannel_list *latest_pending_subchannel_list;
size_t num_new_subchannels; /** selected subchannel in \a subchannel_list */
grpc_lb_subchannel_data *selected;
grpc_closure connectivity_changed;
/** remaining members are protected by the combiner */
/** the selected channel */
grpc_connected_subchannel *selected;
/** the subchannel key for \a selected, or NULL if \a selected not set */
const grpc_subchannel_key *selected_key;
/** have we started picking? */ /** have we started picking? */
bool started_picking; bool started_picking;
/** are we shut down? */ /** are we shut down? */
bool shutdown; bool shutdown;
/** are we updating the selected subchannel? */
bool updating_selected;
/** are we updating the subchannel candidates? */
bool updating_subchannels;
/** args from the latest update received while already updating, or NULL */
grpc_lb_policy_args *pending_update_args;
/** which subchannel are we watching? */
size_t checking_subchannel;
/** what is the connectivity of that channel? */
grpc_connectivity_state checking_connectivity;
/** list of picks that are waiting on connectivity */ /** list of picks that are waiting on connectivity */
pending_pick *pending_picks; pending_pick *pending_picks;
/** our connectivity state tracker */ /** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker; grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy; } pick_first_lb_policy;
static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
GPR_ASSERT(p->subchannel_list == NULL);
GPR_ASSERT(p->latest_pending_subchannel_list == NULL);
GPR_ASSERT(p->pending_picks == NULL); GPR_ASSERT(p->pending_picks == NULL);
for (size_t i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy");
}
if (p->selected != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
"picked_first_destroy");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
grpc_subchannel_index_unref();
if (p->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
gpr_free(p->pending_update_args);
}
gpr_free(p->subchannels);
gpr_free(p->new_subchannels);
gpr_free(p); gpr_free(p);
grpc_subchannel_index_unref();
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p); gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p);
} }
} }
static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx, static void shutdown_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p,
pick_first_lb_policy *p) { grpc_error *error) {
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
}
p->shutdown = true;
pending_pick *pp; pending_pick *pp;
while ((pp = p->pending_picks) != NULL) { while ((pp = p->pending_picks) != NULL) {
p->pending_picks = pp->next; p->pending_picks = pp->next;
*pp->target = NULL; *pp->target = NULL;
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error));
exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
gpr_free(pp); gpr_free(pp);
} }
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"shutdown");
if (p->subchannel_list != NULL) {
grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
"pf_shutdown");
p->subchannel_list = NULL;
}
if (p->latest_pending_subchannel_list != NULL) {
grpc_lb_subchannel_list_shutdown_and_unref(
exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown");
p->latest_pending_subchannel_list = NULL;
}
GRPC_ERROR_UNREF(error);
} }
static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; shutdown_locked(exec_ctx, (pick_first_lb_policy *)pol,
p->shutdown = true; GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"));
fail_pending_picks_for_shutdown(exec_ctx, p);
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown");
/* cancel subscription */
if (p->selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
} else if (p->num_subchannels > 0 && p->started_picking) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
}
} }
static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target, grpc_connected_subchannel **target,
grpc_error *error) { grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp; pending_pick *pp = p->pending_picks;
pp = p->pending_picks;
p->pending_picks = NULL; p->pending_picks = NULL;
while (pp != NULL) { while (pp != NULL) {
pending_pick *next = pp->next; pending_pick *next = pp->next;
@ -162,8 +133,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_eq,
grpc_error *error) { grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp; pending_pick *pp = p->pending_picks;
pp = p->pending_picks;
p->pending_picks = NULL; p->pending_picks = NULL;
while (pp != NULL) { while (pp != NULL) {
pending_pick *next = pp->next; pending_pick *next = pp->next;
@ -185,15 +155,12 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void start_picking_locked(grpc_exec_ctx *exec_ctx, static void start_picking_locked(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p) { pick_first_lb_policy *p) {
p->started_picking = true; p->started_picking = true;
if (p->subchannels != NULL) { if (p->subchannel_list != NULL && p->subchannel_list->num_subchannels > 0) {
GPR_ASSERT(p->num_subchannels > 0); p->subchannel_list->checking_subchannel = 0;
p->checking_subchannel = 0; grpc_lb_subchannel_list_ref_for_connectivity_watch(
p->checking_connectivity = GRPC_CHANNEL_IDLE; p->subchannel_list, "connectivity_watch+start_picking");
GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_lb_subchannel_data_start_connectivity_watch(
grpc_subchannel_notify_on_state_change( exec_ctx, &p->subchannel_list->subchannels[0]);
exec_ctx, p->subchannels[p->checking_subchannel],
p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
} }
} }
@ -210,19 +177,17 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_call_context_element *context, void **user_data, grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) { grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp; // If we have a selected subchannel already, return synchronously.
/* Check atomically for a selected channel */
if (p->selected != NULL) { if (p->selected != NULL) {
*target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel,
"picked");
return 1; return 1;
} }
// No subchannel selected yet, so handle asynchronously.
/* No subchannel selected yet, so try again */
if (!p->started_picking) { if (!p->started_picking) {
start_picking_locked(exec_ctx, p); start_picking_locked(exec_ctx, p);
} }
pp = (pending_pick *)gpr_malloc(sizeof(*pp)); pending_pick *pp = (pending_pick *)gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks; pp->next = p->pending_picks;
pp->target = target; pp->target = target;
pp->initial_metadata_flags = pick_args->initial_metadata_flags; pp->initial_metadata_flags = pick_args->initial_metadata_flags;
@ -231,19 +196,15 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
return 0; return 0;
} }
static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx, static void destroy_unselected_subchannels_locked(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p) { pick_first_lb_policy *p) {
size_t num_subchannels = p->num_subchannels; for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
grpc_subchannel **subchannels = p->subchannels; grpc_lb_subchannel_data *sd = &p->subchannel_list->subchannels[i];
if (p->selected != sd) {
p->num_subchannels = 0; grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
p->subchannels = NULL; "selected_different_subchannel");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); }
for (size_t i = 0; i < num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
} }
gpr_free(subchannels);
} }
static grpc_connectivity_state pf_check_connectivity_locked( static grpc_connectivity_state pf_check_connectivity_locked(
@ -265,46 +226,24 @@ static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) { grpc_closure *closure) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
if (p->selected) { if (p->selected) {
grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); grpc_connected_subchannel_ping(exec_ctx, p->selected->connected_subchannel,
closure);
} else { } else {
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_CLOSURE_SCHED(exec_ctx, closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
} }
} }
/* unsubscribe all subchannels */ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, grpc_error *error);
pick_first_lb_policy *p) {
if (p->num_subchannels > 0) {
GPR_ASSERT(p->selected == NULL);
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p unsubscribing from subchannel %p",
(void *)p, (void *)p->subchannels[p->checking_subchannel]);
}
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
p->updating_subchannels = true;
} else if (p->selected != NULL) {
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG,
"Pick First %p unsubscribing from selected subchannel %p",
(void *)p, (void *)p->selected);
}
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
p->updating_selected = true;
}
}
/* true upon success */
static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_args *args) { const grpc_lb_policy_args *args) {
pick_first_lb_policy *p = (pick_first_lb_policy *)policy; pick_first_lb_policy *p = (pick_first_lb_policy *)policy;
const grpc_arg *arg = const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) { if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
if (p->subchannels == NULL) { if (p->subchannel_list == NULL) {
// If we don't have a current subchannel list, go into TRANSIENT FAILURE. // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set( grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
@ -321,270 +260,222 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
} }
const grpc_lb_addresses *addresses = const grpc_lb_addresses *addresses =
(const grpc_lb_addresses *)arg->value.pointer.p; (const grpc_lb_addresses *)arg->value.pointer.p;
if (addresses->num_addresses == 0) { if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
// Empty update. Unsubscribe from all current subchannels and put the gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
// channel in TRANSIENT_FAILURE. (void *)p, (unsigned long)addresses->num_addresses);
}
grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create(
exec_ctx, &p->base, &grpc_lb_pick_first_trace, addresses, args,
pf_connectivity_changed_locked);
if (subchannel_list->num_subchannels == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
// subchannels and put the channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set( grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty"); "pf_update_empty");
stop_connectivity_watchers(exec_ctx, p); if (p->subchannel_list != NULL) {
return; grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
"sl_shutdown_empty_update");
} }
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { p->subchannel_list = subchannel_list; // Empty list.
gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", p->selected = NULL;
(void *)p, (unsigned long)addresses->num_addresses); return;
}
grpc_subchannel_args *sc_args = (grpc_subchannel_args *)gpr_zalloc(
sizeof(*sc_args) * addresses->num_addresses);
/* We remove the following keys in order for subchannel keys belonging to
* subchannels point to the same address to match. */
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
GRPC_ARG_LB_ADDRESSES};
size_t sc_args_count = 0;
/* Create list of subchannel args for new addresses in \a args. */
for (size_t i = 0; i < addresses->num_addresses; i++) {
// If there were any balancer, we would have chosen grpclb policy instead.
GPR_ASSERT(!addresses->addresses[i].is_balancer);
if (addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
} }
grpc_arg addr_arg = if (p->selected == NULL) {
grpc_create_subchannel_address_arg(&addresses->addresses[i].address); // We don't yet have a selected subchannel, so replace the current
grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( // subchannel list immediately.
args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, if (p->subchannel_list != NULL) {
1); grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
gpr_free(addr_arg.value.string); "pf_update_before_selected");
sc_args[sc_args_count++].args = new_args;
} }
p->subchannel_list = subchannel_list;
/* Check if p->selected is amongst them. If so, we are done. */ } else {
if (p->selected != NULL) { // We do have a selected subchannel.
GPR_ASSERT(p->selected_key != NULL); // Check if it's present in the new list. If so, we're done.
for (size_t i = 0; i < sc_args_count; i++) { for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]); grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i];
const bool found_selected = if (sd->subchannel == p->selected->subchannel) {
grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0;
grpc_subchannel_key_destroy(exec_ctx, ith_sc_key);
if (found_selected) {
// The currently selected subchannel is in the update: we are done. // The currently selected subchannel is in the update: we are done.
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"Pick First %p found already selected subchannel %p amongst " "Pick First %p found already selected subchannel %p "
"updates. Update done.", "at update index %" PRIuPTR " of %" PRIuPTR "; update done",
(void *)p, (void *)p->selected); p, p->selected->subchannel, i,
} subchannel_list->num_subchannels);
for (size_t j = 0; j < sc_args_count; j++) { }
grpc_channel_args_destroy(exec_ctx, grpc_lb_subchannel_list_ref_for_connectivity_watch(
(grpc_channel_args *)sc_args[j].args); subchannel_list, "connectivity_watch+replace_selected");
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
if (p->subchannel_list != NULL) {
grpc_lb_subchannel_list_shutdown_and_unref(
exec_ctx, p->subchannel_list, "pf_update_includes_selected");
}
p->subchannel_list = subchannel_list;
if (p->selected->connected_subchannel != NULL) {
sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected->connected_subchannel, "pf_update_includes_selected");
}
p->selected = sd;
destroy_unselected_subchannels_locked(exec_ctx, p);
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
if (p->latest_pending_subchannel_list != NULL) {
grpc_lb_subchannel_list_shutdown_and_unref(
exec_ctx, p->latest_pending_subchannel_list,
"pf_update_includes_selected+outdated");
p->latest_pending_subchannel_list = NULL;
} }
gpr_free(sc_args);
return; return;
} }
} }
} // Not keeping the previous selected subchannel, so set the latest
// We only check for already running updates here because if the previous // pending subchannel list to the new subchannel list. We will wait
// steps were successful, the update can be considered done without any // for it to report READY before swapping it into the current
// interference (ie, no callbacks were scheduled). // subchannel list.
if (p->updating_selected || p->updating_subchannels) { if (p->latest_pending_subchannel_list != NULL) {
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_DEBUG,
"Update already in progress for pick first %p. Deferring update.", "Pick First %p Shutting down latest pending subchannel list "
(void *)p); "%p, about to be replaced by newer latest %p",
} (void *)p, (void *)p->latest_pending_subchannel_list,
if (p->pending_update_args != NULL) { (void *)subchannel_list);
grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
gpr_free(p->pending_update_args);
} }
p->pending_update_args = grpc_lb_subchannel_list_shutdown_and_unref(
(grpc_lb_policy_args *)gpr_zalloc(sizeof(*p->pending_update_args)); exec_ctx, p->latest_pending_subchannel_list,
p->pending_update_args->client_channel_factory = "sl_outdated_dont_smash");
args->client_channel_factory;
p->pending_update_args->args = grpc_channel_args_copy(args->args);
p->pending_update_args->combiner = args->combiner;
return;
} }
/* Create the subchannels for the new subchannel args/addresses. */ p->latest_pending_subchannel_list = subchannel_list;
grpc_subchannel **new_subchannels =
(grpc_subchannel **)gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
size_t num_new_subchannels = 0;
for (size_t i = 0; i < sc_args_count; i++) {
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args[i]);
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(GPR_INFO,
"Pick First %p created subchannel %p for address uri %s",
(void *)p, (void *)subchannel, address_uri);
gpr_free(address_uri);
}
grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args);
if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel;
}
gpr_free(sc_args);
if (num_new_subchannels == 0) {
gpr_free(new_subchannels);
// Empty update. Unsubscribe from all current subchannels and put the
// channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"),
"pf_update_no_valid_addresses");
stop_connectivity_watchers(exec_ctx, p);
return;
} }
// If we've started picking, start trying to connect to the first
/* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */ // subchannel in the new list.
stop_connectivity_watchers(exec_ctx, p);
/* Save new subchannels. The switch over will happen in
* pf_connectivity_changed_locked */
if (p->updating_selected || p->updating_subchannels) {
p->num_new_subchannels = num_new_subchannels;
p->new_subchannels = new_subchannels;
} else { /* nothing is updating. Get things moving from here */
p->num_subchannels = num_new_subchannels;
p->subchannels = new_subchannels;
p->new_subchannels = NULL;
p->num_new_subchannels = 0;
if (p->started_picking) { if (p->started_picking) {
p->checking_subchannel = 0; grpc_lb_subchannel_list_ref_for_connectivity_watch(
p->checking_connectivity = GRPC_CHANNEL_IDLE; subchannel_list, "connectivity_watch+update");
grpc_subchannel_notify_on_state_change( grpc_lb_subchannel_data_start_connectivity_watch(
exec_ctx, p->subchannels[p->checking_subchannel], exec_ctx, &subchannel_list->subchannels[0]);
p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
}
} }
} }
static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)arg; grpc_lb_subchannel_data *sd = (grpc_lb_subchannel_data *)arg;
grpc_subchannel *selected_subchannel; pick_first_lb_policy *p = (pick_first_lb_policy *)sd->subchannel_list->policy;
pending_pick *pp;
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(
GPR_DEBUG,
"Pick First %p connectivity changed. Updating selected: %d; Updating "
"subchannels: %d; Checking %lu index (%lu total); State: %d; ",
(void *)p, p->updating_selected, p->updating_subchannels,
(unsigned long)p->checking_subchannel,
(unsigned long)p->num_subchannels, p->checking_connectivity);
}
bool restart = false;
if (p->updating_selected && error != GRPC_ERROR_NONE) {
/* Captured the unsubscription for p->selected */
GPR_ASSERT(p->selected != NULL);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
"pf_update_connectivity");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p", gpr_log(GPR_DEBUG,
(void *)p, (void *)p->selected); "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR
} " of %" PRIuPTR
p->updating_selected = false; "), subchannel_list %p: state=%s p->shutdown=%d "
if (p->num_new_subchannels == 0) { "sd->subchannel_list->shutting_down=%d error=%s",
p->selected = NULL; (void *)p, (void *)sd->subchannel,
sd->subchannel_list->checking_subchannel,
sd->subchannel_list->num_subchannels, (void *)sd->subchannel_list,
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
p->shutdown, sd->subchannel_list->shutting_down,
grpc_error_string(error));
}
// If the policy is shutting down, unref and return.
if (p->shutdown) {
grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_shutdown");
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "pf_shutdown");
return; return;
} }
restart = true; // If the subchannel list is shutting down, stop watching.
} if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
if (p->updating_subchannels && error != GRPC_ERROR_NONE) { grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
/* Captured the unsubscription for the checking subchannel */ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_sl_shutdown");
GPR_ASSERT(p->selected == NULL); grpc_lb_subchannel_list_unref_for_connectivity_watch(
for (size_t i = 0; i < p->num_subchannels; i++) { exec_ctx, sd->subchannel_list, "pf_sl_shutdown");
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
"pf_update_connectivity");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p,
(void *)p->subchannels[i]);
}
}
gpr_free(p->subchannels);
p->subchannels = NULL;
p->num_subchannels = 0;
p->updating_subchannels = false;
if (p->num_new_subchannels == 0) return;
restart = true;
}
if (restart) {
p->selected = NULL;
p->selected_key = NULL;
GPR_ASSERT(p->new_subchannels != NULL);
GPR_ASSERT(p->num_new_subchannels > 0);
p->num_subchannels = p->num_new_subchannels;
p->subchannels = p->new_subchannels;
p->num_new_subchannels = 0;
p->new_subchannels = NULL;
if (p->started_picking) {
/* If we were picking, continue to do so over the new subchannels,
* starting from the 0th index. */
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
/* reuses the weak ref from start_picking_locked */
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
}
if (p->pending_update_args != NULL) {
const grpc_lb_policy_args *args = p->pending_update_args;
p->pending_update_args = NULL;
pf_update_locked(exec_ctx, &p->base, args);
}
return; return;
} }
GRPC_ERROR_REF(error); // If we're still here, the notification must be for a subchannel in
if (p->shutdown) { // either the current or latest pending subchannel lists.
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
GRPC_ERROR_UNREF(error); sd->subchannel_list == p->latest_pending_subchannel_list);
return; // Update state.
} else if (p->selected != NULL) { sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { // Handle updates for the currently selected subchannel.
if (p->selected == sd) {
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list != NULL) {
p->selected = NULL;
grpc_lb_subchannel_list_shutdown_and_unref(
exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update");
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = NULL;
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
} else {
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* if the selected channel goes bad, we're done */ /* if the selected channel goes bad, we're done */
p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN; sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN;
} }
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
p->checking_connectivity, GRPC_ERROR_REF(error), sd->curr_connectivity_state,
"selected_changed"); GRPC_ERROR_REF(error), "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) { if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
grpc_connected_subchannel_notify_on_state_change( // Renew notification.
exec_ctx, p->selected, p->base.interested_parties, grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
&p->checking_connectivity, &p->connectivity_changed);
} else { } else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "pf_selected_shutdown");
shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error));
} }
} else { }
loop: return;
switch (p->checking_connectivity) { }
case GRPC_CHANNEL_INIT: // If we get here, there are two possible cases:
GPR_UNREACHABLE_CODE(return ); // 1. We do not currently have a selected subchannel, and the update is
case GRPC_CHANNEL_READY: // for a subchannel in p->subchannel_list that we're trying to
// connect to. The goal here is to find a subchannel that we can
// select.
// 2. We do currently have a selected subchannel, and the update is
// for a subchannel in p->latest_pending_subchannel_list. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
}
while (true) {
switch (sd->curr_connectivity_state) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list.
if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != NULL);
grpc_lb_subchannel_list_shutdown_and_unref(
exec_ctx, p->subchannel_list, "finish_update");
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = NULL;
}
// Cases 1 and 2.
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, GRPC_ERROR_NONE, GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
"connecting_ready"); "connecting_ready");
selected_subchannel = p->subchannels[p->checking_subchannel]; sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(sd->subchannel),
grpc_subchannel_get_connected_subchannel(selected_subchannel), "connected");
"picked_first"); p->selected = sd;
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void *)p,
"Pick First %p selected subchannel %p (connected %p)", (void *)sd->subchannel);
(void *)p, (void *)selected_subchannel, (void *)p->selected); }
} // Drop all other subchannels, since we are now connected.
p->selected_key = grpc_subchannel_get_key(selected_subchannel); destroy_unselected_subchannels_locked(exec_ctx, p);
/* drop the pick list: we are connected now */ // Update any calls that were waiting for a pick.
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); pending_pick *pp;
destroy_subchannels_locked(exec_ctx, p);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) { while ((pp = p->pending_picks)) {
p->pending_picks = pp->next; p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected->connected_subchannel, "picked");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p", "Servicing pending pick with selected subchannel %p",
@ -593,71 +484,86 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp); gpr_free(pp);
} }
grpc_connected_subchannel_notify_on_state_change( // Renew notification.
exec_ctx, p->selected, p->base.interested_parties, grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
&p->checking_connectivity, &p->connectivity_changed); return;
break; }
case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_TRANSIENT_FAILURE: {
p->checking_subchannel = do {
(p->checking_subchannel + 1) % p->num_subchannels; sd->subchannel_list->checking_subchannel =
if (p->checking_subchannel == 0) { (sd->subchannel_list->checking_subchannel + 1) %
/* only trigger transient failure when we've tried all alternatives sd->subchannel_list->num_subchannels;
*/ sd = &sd->subchannel_list
->subchannels[sd->subchannel_list->checking_subchannel];
} while (sd->subchannel == NULL);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->subchannel_list->checking_subchannel == 0 &&
sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set( grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure"); GRPC_ERROR_REF(error), "connecting_transient_failure");
} }
sd->curr_connectivity_state =
grpc_subchannel_check_connectivity(sd->subchannel, &error);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
p->checking_connectivity = grpc_subchannel_check_connectivity( if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
p->subchannels[p->checking_subchannel], &error); // Reuses the connectivity refs from the previous watch.
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
grpc_subchannel_notify_on_state_change( return;
exec_ctx, p->subchannels[p->checking_subchannel], }
p->base.interested_parties, &p->checking_connectivity, break; // Go back to top of loop.
&p->connectivity_changed);
} else {
goto loop;
} }
break;
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set( grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error), "connecting_changed"); GRPC_ERROR_REF(error), "connecting_changed");
grpc_subchannel_notify_on_state_change( }
exec_ctx, p->subchannels[p->checking_subchannel], // Renew notification.
p->base.interested_parties, &p->checking_connectivity, grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
&p->connectivity_changed); return;
break; }
case GRPC_CHANNEL_SHUTDOWN: case GRPC_CHANNEL_SHUTDOWN: {
p->num_subchannels--; grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], "pf_candidate_shutdown");
p->subchannels[p->num_subchannels]); // Advance to next subchannel and check its state.
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], grpc_lb_subchannel_data *original_sd = sd;
"pick_first"); do {
if (p->num_subchannels == 0) { sd->subchannel_list->checking_subchannel =
grpc_connectivity_state_set( (sd->subchannel_list->checking_subchannel + 1) %
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, sd->subchannel_list->num_subchannels;
sd = &sd->subchannel_list
->subchannels[sd->subchannel_list->checking_subchannel];
} while (sd->subchannel == NULL && sd != original_sd);
if (sd == original_sd) {
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "pf_candidate_shutdown");
shutdown_locked(exec_ctx, p,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick first exhausted channels", &error, 1), "Pick first exhausted channels", &error, 1));
"no_more_channels"); return;
fail_pending_picks_for_shutdown(exec_ctx, p); }
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, if (sd->subchannel_list == p->subchannel_list) {
"pick_first_connectivity");
} else {
grpc_connectivity_state_set( grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "subchannel_failed"); GRPC_ERROR_REF(error), "subchannel_failed");
p->checking_subchannel %= p->num_subchannels; }
sd->curr_connectivity_state =
grpc_subchannel_check_connectivity(sd->subchannel, &error);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
p->checking_connectivity = grpc_subchannel_check_connectivity( if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
p->subchannels[p->checking_subchannel], &error); // Reuses the connectivity refs from the previous watch.
goto loop; grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
return;
}
// For any other state, go back to top of loop.
// We will reuse the connectivity refs from the previous watch.
} }
} }
} }
GRPC_ERROR_UNREF(error);
} }
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
@ -687,8 +593,6 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
pf_update_locked(exec_ctx, &p->base, args); pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
grpc_subchannel_index_ref(); grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,
grpc_combiner_scheduler(args->combiner));
return &p->base; return &p->base;
} }

@ -28,6 +28,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/ext/filters/client_channel/subchannel_index.h"
@ -64,12 +65,11 @@ typedef struct pending_pick {
grpc_closure *on_complete; grpc_closure *on_complete;
} pending_pick; } pending_pick;
typedef struct rr_subchannel_list rr_subchannel_list;
typedef struct round_robin_lb_policy { typedef struct round_robin_lb_policy {
/** base policy: must be first */ /** base policy: must be first */
grpc_lb_policy base; grpc_lb_policy base;
rr_subchannel_list *subchannel_list; grpc_lb_subchannel_list *subchannel_list;
/** have we started picking? */ /** have we started picking? */
bool started_picking; bool started_picking;
@ -89,157 +89,9 @@ typedef struct round_robin_lb_policy {
* lists if they equal \a latest_pending_subchannel_list. In other words, * lists if they equal \a latest_pending_subchannel_list. In other words,
* racing callbacks that reference outdated subchannel lists won't perform any * racing callbacks that reference outdated subchannel lists won't perform any
* update. */ * update. */
rr_subchannel_list *latest_pending_subchannel_list; grpc_lb_subchannel_list *latest_pending_subchannel_list;
} round_robin_lb_policy; } round_robin_lb_policy;
typedef struct {
/** backpointer to owning subchannel list */
rr_subchannel_list *subchannel_list;
/** subchannel itself */
grpc_subchannel *subchannel;
/** notification that connectivity has changed on subchannel */
grpc_closure connectivity_changed_closure;
/** last observed connectivity. Not updated by
* \a grpc_subchannel_notify_on_state_change. Used to determine the previous
* state while processing the new state in \a rr_connectivity_changed */
grpc_connectivity_state prev_connectivity_state;
/** current connectivity state. Updated by \a
* grpc_subchannel_notify_on_state_change */
grpc_connectivity_state curr_connectivity_state;
/** connectivity state to be updated by the watcher, not guarded by
* the combiner. Will be moved to curr_connectivity_state inside of
* the combiner by rr_connectivity_changed_locked(). */
grpc_connectivity_state pending_connectivity_state_unsafe;
/** the subchannel's target user data */
void *user_data;
/** vtable to operate over \a user_data */
const grpc_lb_user_data_vtable *user_data_vtable;
} subchannel_data;
struct rr_subchannel_list {
/** backpointer to owning policy */
round_robin_lb_policy *policy;
/** all our subchannels */
size_t num_subchannels;
subchannel_data *subchannels;
/** how many subchannels are in state READY */
size_t num_ready;
/** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
/** how many subchannels are in state SHUTDOWN */
size_t num_shutdown;
/** how many subchannels are in state IDLE */
size_t num_idle;
/** There will be one ref for each entry in subchannels for which there is a
* pending connectivity state watcher callback. */
gpr_refcount refcount;
/** Is this list shutting down? This may be true due to the shutdown of the
* policy itself or because a newer update has arrived while this one hadn't
* finished processing. */
bool shutting_down;
};
static rr_subchannel_list *rr_subchannel_list_create(round_robin_lb_policy *p,
size_t num_subchannels) {
rr_subchannel_list *subchannel_list =
(rr_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list));
subchannel_list->policy = p;
subchannel_list->subchannels =
(subchannel_data *)gpr_zalloc(sizeof(subchannel_data) * num_subchannels);
subchannel_list->num_subchannels = num_subchannels;
gpr_ref_init(&subchannel_list->refcount, 1);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Created subchannel list %p for %lu subchannels",
(void *)p, (void *)subchannel_list, (unsigned long)num_subchannels);
}
return subchannel_list;
}
static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
rr_subchannel_list *subchannel_list) {
GPR_ASSERT(subchannel_list->shutting_down);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p",
(void *)subchannel_list->policy, (void *)subchannel_list);
}
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
subchannel_data *sd = &subchannel_list->subchannels[i];
if (sd->subchannel != NULL) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel,
"rr_subchannel_list_destroy");
}
sd->subchannel = NULL;
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
sd->user_data = NULL;
}
}
gpr_free(subchannel_list->subchannels);
gpr_free(subchannel_list);
}
static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list,
const char *reason) {
gpr_ref_non_zero(&subchannel_list->refcount);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu (%s)",
(void *)subchannel_list->policy, (void *)subchannel_list,
(unsigned long)(count - 1), (unsigned long)count, reason);
}
}
static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
rr_subchannel_list *subchannel_list,
const char *reason) {
const bool done = gpr_unref(&subchannel_list->refcount);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu (%s)",
(void *)subchannel_list->policy, (void *)subchannel_list,
(unsigned long)(count + 1), (unsigned long)count, reason);
}
if (done) {
rr_subchannel_list_destroy(exec_ctx, subchannel_list);
}
}
/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The
* watcher's callback will ultimately unref \a subchannel_list. */
static void rr_subchannel_list_shutdown_and_unref(
grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list,
const char *reason) {
GPR_ASSERT(!subchannel_list->shutting_down);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down subchannel_list %p (%s)",
(void *)subchannel_list->policy, (void *)subchannel_list, reason);
}
GPR_ASSERT(!subchannel_list->shutting_down);
subchannel_list->shutting_down = true;
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
subchannel_data *sd = &subchannel_list->subchannels[i];
if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe.
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(
GPR_DEBUG,
"[RR %p] Unsubscribing from subchannel %p as part of shutting down "
"subchannel_list %p",
(void *)subchannel_list->policy, (void *)sd->subchannel,
(void *)subchannel_list);
}
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
NULL,
&sd->connectivity_changed_closure);
}
}
rr_subchannel_list_unref(exec_ctx, subchannel_list, reason);
}
/** Returns the index into p->subchannel_list->subchannels of the next /** Returns the index into p->subchannel_list->subchannels of the next
* subchannel in READY state, or p->subchannel_list->num_subchannels if no * subchannel in READY state, or p->subchannel_list->num_subchannels if no
* subchannel is READY. * subchannel is READY.
@ -299,8 +151,8 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p,
"[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
(void *)p, (unsigned long)last_ready_index, (void *)p, (unsigned long)last_ready_index,
(void *)p->subchannel_list->subchannels[last_ready_index].subchannel, (void *)p->subchannel_list->subchannels[last_ready_index].subchannel,
(void *)grpc_subchannel_get_connected_subchannel( (void *)p->subchannel_list->subchannels[last_ready_index]
p->subchannel_list->subchannels[last_ready_index].subchannel)); .connected_subchannel);
} }
} }
@ -310,47 +162,47 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p", gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p",
(void *)pol, (void *)pol); (void *)pol, (void *)pol);
} }
GPR_ASSERT(p->subchannel_list == NULL);
GPR_ASSERT(p->latest_pending_subchannel_list == NULL);
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
grpc_subchannel_index_unref(); grpc_subchannel_index_unref();
gpr_free(p); gpr_free(p);
} }
static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx, static void shutdown_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p,
round_robin_lb_policy *p) { grpc_error *error) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
}
p->shutdown = true;
pending_pick *pp; pending_pick *pp;
while ((pp = p->pending_picks) != NULL) { while ((pp = p->pending_picks) != NULL) {
p->pending_picks = pp->next; p->pending_picks = pp->next;
*pp->target = NULL; *pp->target = NULL;
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error));
exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
gpr_free(pp); gpr_free(pp);
} }
} grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { "rr_shutdown");
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; if (p->subchannel_list != NULL) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p",
(void *)pol, (void *)pol);
}
p->shutdown = true;
fail_pending_picks_for_shutdown(exec_ctx, p);
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
const bool latest_is_current =
p->subchannel_list == p->latest_pending_subchannel_list;
rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
"sl_shutdown_rr_shutdown"); "sl_shutdown_rr_shutdown");
p->subchannel_list = NULL; p->subchannel_list = NULL;
if (!latest_is_current && p->latest_pending_subchannel_list != NULL && }
!p->latest_pending_subchannel_list->shutting_down) { if (p->latest_pending_subchannel_list != NULL) {
rr_subchannel_list_shutdown_and_unref(exec_ctx, grpc_lb_subchannel_list_shutdown_and_unref(
p->latest_pending_subchannel_list, exec_ctx, p->latest_pending_subchannel_list,
"sl_shutdown_pending_rr_shutdown"); "sl_shutdown_pending_rr_shutdown");
p->latest_pending_subchannel_list = NULL; p->latest_pending_subchannel_list = NULL;
} }
GRPC_ERROR_UNREF(error);
}
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
shutdown_locked(exec_ctx, p,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
} }
static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
@ -405,13 +257,10 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) { round_robin_lb_policy *p) {
p->started_picking = true; p->started_picking = true;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
subchannel_data *sd = &p->subchannel_list->subchannels[i]; grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked"); "connectivity_watch");
rr_subchannel_list_ref(sd->subchannel_list, "started_picking"); grpc_lb_subchannel_data_start_connectivity_watch(
grpc_subchannel_notify_on_state_change( exec_ctx, &p->subchannel_list->subchannels[i]);
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->pending_connectivity_state_unsafe,
&sd->connectivity_changed_closure);
} }
} }
@ -436,10 +285,10 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
if (next_ready_index < p->subchannel_list->num_subchannels) { if (next_ready_index < p->subchannel_list->num_subchannels) {
/* readily available, report right away */ /* readily available, report right away */
subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index]; grpc_lb_subchannel_data *sd =
*target = GRPC_CONNECTED_SUBCHANNEL_REF( &p->subchannel_list->subchannels[next_ready_index];
grpc_subchannel_get_connected_subchannel(sd->subchannel), *target =
"rr_picked"); GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked");
if (user_data != NULL) { if (user_data != NULL) {
*user_data = sd->user_data; *user_data = sd->user_data;
} }
@ -470,8 +319,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
return 0; return 0;
} }
static void update_state_counters_locked(subchannel_data *sd) { static void update_state_counters_locked(grpc_lb_subchannel_data *sd) {
rr_subchannel_list *subchannel_list = sd->subchannel_list; grpc_lb_subchannel_list *subchannel_list = sd->subchannel_list;
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(subchannel_list->num_ready > 0); GPR_ASSERT(subchannel_list->num_ready > 0);
--subchannel_list->num_ready; --subchannel_list->num_ready;
@ -485,6 +334,7 @@ static void update_state_counters_locked(subchannel_data *sd) {
GPR_ASSERT(subchannel_list->num_idle > 0); GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle; --subchannel_list->num_idle;
} }
sd->prev_connectivity_state = sd->curr_connectivity_state;
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
++subchannel_list->num_ready; ++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@ -497,12 +347,12 @@ static void update_state_counters_locked(subchannel_data *sd) {
} }
/** Sets the policy's connectivity status based on that of the passed-in \a sd /** Sets the policy's connectivity status based on that of the passed-in \a sd
* (the subchannel_data associted with the updated subchannel) and the * (the grpc_lb_subchannel_data associted with the updated subchannel) and the
* subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be
* used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the
* connectivity status set. */ * connectivity status set. */
static grpc_connectivity_state update_lb_connectivity_status_locked( static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, grpc_error *error) {
/* In priority order. The first rule to match terminates the search (ie, if we /* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled). * are on rule n, all previous rules were unfulfilled).
* *
@ -524,8 +374,8 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* CHECK: p->num_idle == p->subchannel_list->num_subchannels. * CHECK: p->num_idle == p->subchannel_list->num_subchannels.
*/ */
grpc_connectivity_state new_state = sd->curr_connectivity_state; grpc_connectivity_state new_state = sd->curr_connectivity_state;
rr_subchannel_list *subchannel_list = sd->subchannel_list; grpc_lb_subchannel_list *subchannel_list = sd->subchannel_list;
round_robin_lb_policy *p = subchannel_list->policy; round_robin_lb_policy *p = (round_robin_lb_policy *)subchannel_list->policy;
if (subchannel_list->num_ready > 0) { /* 1) READY */ if (subchannel_list->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready"); GRPC_ERROR_NONE, "rr_ready");
@ -561,8 +411,9 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
subchannel_data *sd = (subchannel_data *)arg; grpc_lb_subchannel_data *sd = (grpc_lb_subchannel_data *)arg;
round_robin_lb_policy *p = sd->subchannel_list->policy; round_robin_lb_policy *p =
(round_robin_lb_policy *)sd->subchannel_list->policy;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log( gpr_log(
GPR_DEBUG, GPR_DEBUG,
@ -577,65 +428,50 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
} }
// If the policy is shutting down, unref and return. // If the policy is shutting down, unref and return.
if (p->shutdown) { if (p->shutdown) {
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
"pol_shutdown+started_picking"); grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_shutdown");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "rr_shutdown");
return; return;
} }
if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) { // If the subchannel list is shutting down, stop watching.
// the subchannel list associated with sd has been discarded. This callback if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
// corresponds to the unsubscription. The unrefs correspond to the picking grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
// ref (start_picking_locked or update_started_picking). grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_sl_shutdown");
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, grpc_lb_subchannel_list_unref_for_connectivity_watch(
"sl_shutdown+started_picking"); exec_ctx, sd->subchannel_list, "rr_sl_shutdown");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown+picking");
return;
}
// Dispose of outdated subchannel lists.
if (sd->subchannel_list != p->subchannel_list &&
sd->subchannel_list != p->latest_pending_subchannel_list) {
const char *reason = NULL;
if (sd->subchannel_list->shutting_down) {
reason = "sl_outdated_straggler";
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, reason);
} else {
reason = "sl_outdated";
rr_subchannel_list_shutdown_and_unref(exec_ctx, sd->subchannel_list,
reason);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, reason);
return; return;
} }
// If we're still here, the notification must be for a subchannel in
// either the current or latest pending subchannel lists.
GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
sd->subchannel_list == p->latest_pending_subchannel_list);
// Now that we're inside the combiner, copy the pending connectivity // Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to // state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner. // curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Update state counters and determine new overall state. // Update state counters and determine new overall state.
update_state_counters_locked(sd); update_state_counters_locked(sd);
sd->prev_connectivity_state = sd->curr_connectivity_state;
const grpc_connectivity_state new_policy_connectivity_state = const grpc_connectivity_state new_policy_connectivity_state =
update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
// If the sd's new state is SHUTDOWN, unref the subchannel, and if the new // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new
// policy's state is SHUTDOWN, clean up. // policy's state is SHUTDOWN, clean up.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
sd->subchannel = NULL; grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
if (sd->user_data != NULL) { "rr_connectivity_shutdown");
GPR_ASSERT(sd->user_data_vtable != NULL); grpc_lb_subchannel_list_unref_for_connectivity_watch(
sd->user_data_vtable->destroy(exec_ctx, sd->user_data); exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown");
sd->user_data = NULL;
}
if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
// The policy is shutting down. Fail all of the pending picks. shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error));
fail_pending_picks_for_shutdown(exec_ctx, p); }
}
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
"sd_shutdown+started_picking");
// unref the "rr_connectivity_update" weak ref from start_picking.
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
"rr_connectivity_sd_shutdown");
} else { // sd not in SHUTDOWN } else { // sd not in SHUTDOWN
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
if (sd->connected_subchannel == NULL) {
sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(sd->subchannel),
"connected");
}
if (sd->subchannel_list != p->subchannel_list) { if (sd->subchannel_list != p->subchannel_list) {
// promote sd->subchannel_list to p->subchannel_list. // promote sd->subchannel_list to p->subchannel_list.
// sd->subchannel_list must be equal to // sd->subchannel_list must be equal to
@ -656,8 +492,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
} }
if (p->subchannel_list != NULL) { if (p->subchannel_list != NULL) {
// dispose of the current subchannel_list // dispose of the current subchannel_list
rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, grpc_lb_subchannel_list_shutdown_and_unref(
"sl_phase_out_shutdown"); exec_ctx, p->subchannel_list, "sl_phase_out_shutdown");
} }
p->subchannel_list = p->latest_pending_subchannel_list; p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = NULL; p->latest_pending_subchannel_list = NULL;
@ -667,7 +503,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */ * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
subchannel_data *selected = grpc_lb_subchannel_data *selected =
&p->subchannel_list->subchannels[next_ready_index]; &p->subchannel_list->subchannels[next_ready_index];
if (p->pending_picks != NULL) { if (p->pending_picks != NULL) {
// if the selected subchannel is going to be used for the pending // if the selected subchannel is going to be used for the pending
@ -678,8 +514,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) { while ((pp = p->pending_picks)) {
p->pending_picks = pp->next; p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel), selected->connected_subchannel, "rr_picked");
"rr_picked");
if (pp->user_data != NULL) { if (pp->user_data != NULL) {
*pp->user_data = selected->user_data; *pp->user_data = selected->user_data;
} }
@ -694,12 +529,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(pp); gpr_free(pp);
} }
} }
/* renew notification: reuses the "rr_connectivity_update" weak ref on the // Renew notification.
* policy as well as the sd->subchannel_list ref. */ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->pending_connectivity_state_unsafe,
&sd->connectivity_changed_closure);
} }
} }
@ -723,13 +554,12 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
if (next_ready_index < p->subchannel_list->num_subchannels) { if (next_ready_index < p->subchannel_list->num_subchannels) {
subchannel_data *selected = grpc_lb_subchannel_data *selected =
&p->subchannel_list->subchannels[next_ready_index]; &p->subchannel_list->subchannels[next_ready_index];
grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel), selected->connected_subchannel, "rr_ping");
"rr_picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure); grpc_connected_subchannel_ping(exec_ctx, target, closure);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping");
} else { } else {
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected")); "Round Robin not connected"));
@ -742,130 +572,68 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_arg *arg = const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) { if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", p);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
// Otherwise, keep using the current subchannel list (ignore this update).
if (p->subchannel_list == NULL) { if (p->subchannel_list == NULL) {
// If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set( grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"rr_update_missing"); "rr_update_missing");
} else {
// otherwise, keep using the current subchannel list (ignore this update).
gpr_log(GPR_ERROR,
"[RR %p] No valid LB addresses channel arg for update, ignoring.",
(void *)p);
} }
return; return;
} }
grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
rr_subchannel_list *subchannel_list = if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
rr_subchannel_list_create(p, addresses->num_addresses); gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p,
if (addresses->num_addresses == 0) { addresses->num_addresses);
}
grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create(
exec_ctx, &p->base, &grpc_lb_round_robin_trace, addresses, args,
rr_connectivity_changed_locked);
if (subchannel_list->num_subchannels == 0) {
grpc_connectivity_state_set( grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"rr_update_empty"); "rr_update_empty");
if (p->subchannel_list != NULL) { if (p->subchannel_list != NULL) {
rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
"sl_shutdown_empty_update"); "sl_shutdown_empty_update");
} }
p->subchannel_list = subchannel_list; // empty list p->subchannel_list = subchannel_list; // empty list
return; return;
} }
size_t subchannel_index = 0; if (p->started_picking) {
if (p->latest_pending_subchannel_list != NULL && p->started_picking) { if (p->latest_pending_subchannel_list != NULL) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"[RR %p] Shutting down latest pending subchannel list %p, about " "[RR %p] Shutting down latest pending subchannel list %p, "
"to be replaced by newer latest %p", "about to be replaced by newer latest %p",
(void *)p, (void *)p->latest_pending_subchannel_list, (void *)p, (void *)p->latest_pending_subchannel_list,
(void *)subchannel_list); (void *)subchannel_list);
} }
rr_subchannel_list_shutdown_and_unref( grpc_lb_subchannel_list_shutdown_and_unref(
exec_ctx, p->latest_pending_subchannel_list, "sl_outdated_dont_smash"); exec_ctx, p->latest_pending_subchannel_list, "sl_outdated");
} }
p->latest_pending_subchannel_list = subchannel_list; p->latest_pending_subchannel_list = subchannel_list;
grpc_subchannel_args sc_args; for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
/* We need to remove the LB addresses in order to be able to compare the /* Watch every new subchannel. A subchannel list becomes active the
* subchannel keys of subchannels from a different batch of addresses. */
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
GRPC_ARG_LB_ADDRESSES};
/* Create subchannels for addresses in the update. */
for (size_t i = 0; i < addresses->num_addresses; i++) {
// If there were any balancer, we would have chosen grpclb policy instead.
GPR_ASSERT(!addresses->addresses[i].is_balancer);
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
1);
gpr_free(addr_arg.value.string);
sc_args.args = new_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
grpc_channel_args_destroy(exec_ctx, new_args);
grpc_error *error;
// Get the connectivity state of the subchannel. Already existing ones may
// be in a state other than INIT.
const grpc_connectivity_state subchannel_connectivity_state =
grpc_subchannel_check_connectivity(subchannel, &error);
if (error != GRPC_ERROR_NONE) {
// The subchannel is in error (e.g. shutting down). Ignore it.
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error");
GRPC_ERROR_UNREF(error);
continue;
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(
GPR_DEBUG,
"[RR %p] index %lu: Created subchannel %p for address uri %s into "
"subchannel_list %p. Connectivity state %s",
(void *)p, (unsigned long)subchannel_index, (void *)subchannel,
address_uri, (void *)subchannel_list,
grpc_connectivity_state_name(subchannel_connectivity_state));
gpr_free(address_uri);
}
subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++];
sd->subchannel_list = subchannel_list;
sd->subchannel = subchannel;
GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
rr_connectivity_changed_locked, sd,
grpc_combiner_scheduler(args->combiner));
/* use some sentinel value outside of the range of
* grpc_connectivity_state to signal an undefined previous state. We
* won't be referring to this value again and it'll be overwritten after
* the first call to rr_connectivity_changed_locked */
sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
sd->curr_connectivity_state = subchannel_connectivity_state;
sd->user_data_vtable = addresses->user_data_vtable;
if (sd->user_data_vtable != NULL) {
sd->user_data =
sd->user_data_vtable->copy(addresses->addresses[i].user_data);
}
if (p->started_picking) {
rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking");
GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update");
/* 2. Watch every new subchannel. A subchannel list becomes active the
* moment one of its subchannels is READY. At that moment, we swap * moment one of its subchannels is READY. At that moment, we swap
* p->subchannel_list for sd->subchannel_list, provided the subchannel * p->subchannel_list for sd->subchannel_list, provided the subchannel
* list is still valid (ie, isn't shutting down) */ * list is still valid (ie, isn't shutting down) */
grpc_subchannel_notify_on_state_change( grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list,
exec_ctx, sd->subchannel, p->base.interested_parties, "connectivity_watch");
&sd->pending_connectivity_state_unsafe, grpc_lb_subchannel_data_start_connectivity_watch(
&sd->connectivity_changed_closure); exec_ctx, &subchannel_list->subchannels[i]);
}
} }
if (!p->started_picking) { } else {
// The policy isn't picking yet. Save the update for later, disposing of // The policy isn't picking yet. Save the update for later, disposing of
// previous version if any. // previous version if any.
if (p->subchannel_list != NULL) { if (p->subchannel_list != NULL) {
rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, grpc_lb_subchannel_list_shutdown_and_unref(
"rr_update_before_started_picking"); exec_ctx, p->subchannel_list, "rr_update_before_started_picking");
} }
p->subchannel_list = subchannel_list; p->subchannel_list = subchannel_list;
p->latest_pending_subchannel_list = NULL;
} }
} }

@ -0,0 +1,265 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <string.h>
#include <grpc/support/alloc.h>
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx,
grpc_lb_subchannel_data *sd,
const char *reason) {
if (sd->subchannel != NULL) {
if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
gpr_log(
GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR
" of %" PRIuPTR " (subchannel %p): unreffing subchannel",
sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels),
sd->subchannel_list->num_subchannels, sd->subchannel);
}
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, reason);
sd->subchannel = NULL;
if (sd->connected_subchannel != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, sd->connected_subchannel,
reason);
sd->connected_subchannel = NULL;
}
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
sd->user_data = NULL;
}
}
}
void grpc_lb_subchannel_data_start_connectivity_watch(
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) {
if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
gpr_log(GPR_DEBUG,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): requesting connectivity change notification",
sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
sd->subchannel_list,
(size_t)(sd - sd->subchannel_list->subchannels),
sd->subchannel_list->num_subchannels, sd->subchannel);
}
sd->connectivity_notification_pending = true;
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, sd->subchannel_list->policy->interested_parties,
&sd->pending_connectivity_state_unsafe,
&sd->connectivity_changed_closure);
}
void grpc_lb_subchannel_data_stop_connectivity_watch(
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) {
if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
gpr_log(
GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): stopping connectivity watch",
sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels),
sd->subchannel_list->num_subchannels, sd->subchannel);
}
GPR_ASSERT(sd->connectivity_notification_pending);
sd->connectivity_notification_pending = false;
}
grpc_lb_subchannel_list *grpc_lb_subchannel_list_create(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer,
const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args,
grpc_iomgr_cb_func connectivity_changed_cb) {
grpc_lb_subchannel_list *subchannel_list =
(grpc_lb_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list));
if (GRPC_TRACER_ON(*tracer)) {
gpr_log(GPR_DEBUG,
"[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
tracer->name, p, subchannel_list, addresses->num_addresses);
}
subchannel_list->policy = p;
subchannel_list->tracer = tracer;
gpr_ref_init(&subchannel_list->refcount, 1);
subchannel_list->subchannels = (grpc_lb_subchannel_data *)gpr_zalloc(
sizeof(grpc_lb_subchannel_data) * addresses->num_addresses);
// We need to remove the LB addresses in order to be able to compare the
// subchannel keys of subchannels from a different batch of addresses.
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
GRPC_ARG_LB_ADDRESSES};
// Create a subchannel for each address.
grpc_subchannel_args sc_args;
size_t subchannel_index = 0;
for (size_t i = 0; i < addresses->num_addresses; i++) {
// If there were any balancer, we would have chosen grpclb policy instead.
GPR_ASSERT(!addresses->addresses[i].is_balancer);
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
1);
gpr_free(addr_arg.value.string);
sc_args.args = new_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
grpc_channel_args_destroy(exec_ctx, new_args);
if (subchannel == NULL) {
// Subchannel could not be created.
if (GRPC_TRACER_ON(*tracer)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(GPR_DEBUG,
"[%s %p] could not create subchannel for address uri %s, "
"ignoring",
tracer->name, subchannel_list->policy, address_uri);
gpr_free(address_uri);
}
continue;
}
if (GRPC_TRACER_ON(*tracer)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR
": Created subchannel %p for address uri %s",
tracer->name, p, subchannel_list, subchannel_index, subchannel,
address_uri);
gpr_free(address_uri);
}
grpc_lb_subchannel_data *sd =
&subchannel_list->subchannels[subchannel_index++];
sd->subchannel_list = subchannel_list;
sd->subchannel = subchannel;
GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
connectivity_changed_cb, sd,
grpc_combiner_scheduler(args->combiner));
// We assume that the current state is IDLE. If not, we'll get a
// callback telling us that.
sd->prev_connectivity_state = GRPC_CHANNEL_IDLE;
sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE;
sd->user_data_vtable = addresses->user_data_vtable;
if (sd->user_data_vtable != NULL) {
sd->user_data =
sd->user_data_vtable->copy(addresses->addresses[i].user_data);
}
}
subchannel_list->num_subchannels = subchannel_index;
subchannel_list->num_idle = subchannel_index;
return subchannel_list;
}
static void subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
grpc_lb_subchannel_list *subchannel_list) {
if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p",
subchannel_list->tracer->name, subchannel_list->policy,
subchannel_list);
}
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i];
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
"subchannel_list_destroy");
}
gpr_free(subchannel_list->subchannels);
gpr_free(subchannel_list);
}
void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list,
const char *reason) {
gpr_ref_non_zero(&subchannel_list->refcount);
if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p REF %lu->%lu (%s)",
subchannel_list->tracer->name, subchannel_list->policy,
subchannel_list, (unsigned long)(count - 1), (unsigned long)count,
reason);
}
}
void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
grpc_lb_subchannel_list *subchannel_list,
const char *reason) {
const bool done = gpr_unref(&subchannel_list->refcount);
if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)",
subchannel_list->tracer->name, subchannel_list->policy,
subchannel_list, (unsigned long)(count + 1), (unsigned long)count,
reason);
}
if (done) {
subchannel_list_destroy(exec_ctx, subchannel_list);
}
}
void grpc_lb_subchannel_list_ref_for_connectivity_watch(
grpc_lb_subchannel_list *subchannel_list, const char *reason) {
GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason);
grpc_lb_subchannel_list_ref(subchannel_list, reason);
}
void grpc_lb_subchannel_list_unref_for_connectivity_watch(
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list,
const char *reason) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, subchannel_list->policy, reason);
grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason);
}
static void subchannel_data_cancel_connectivity_watch(
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) {
if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
gpr_log(
GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): canceling connectivity watch (%s)",
sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels),
sd->subchannel_list->num_subchannels, sd->subchannel, reason);
}
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
&sd->connectivity_changed_closure);
}
void grpc_lb_subchannel_list_shutdown_and_unref(
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list,
const char *reason) {
if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
subchannel_list->tracer->name, subchannel_list->policy,
subchannel_list, reason);
}
GPR_ASSERT(!subchannel_list->shutting_down);
subchannel_list->shutting_down = true;
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i];
// If there's a pending notification for this subchannel, cancel it;
// the callback is responsible for unreffing the subchannel.
// Otherwise, unref the subchannel directly.
if (sd->connectivity_notification_pending) {
subchannel_data_cancel_connectivity_watch(exec_ctx, sd, reason);
} else if (sd->subchannel != NULL) {
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, reason);
}
}
grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason);
}

@ -0,0 +1,153 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h"
// TODO(roth): This code is intended to be shared between pick_first and
// round_robin. However, the interface needs more work to provide clean
// encapsulation. For example, the structs here have some fields that are
// only used in one of the two (e.g., the state counters in
// grpc_lb_subchannel_list and the prev_connectivity_state field in
// grpc_lb_subchannel_data are only used in round_robin, and the
// checking_subchannel field in grpc_lb_subchannel_list is only used by
// pick_first). Also, there is probably some code duplication between the
// connectivity state notification callback code in both pick_first and
// round_robin that could be refactored and moved here. In a future PR,
// need to clean this up.
#ifdef __cplusplus
extern "C" {
#endif
typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list;
typedef struct {
/** backpointer to owning subchannel list */
grpc_lb_subchannel_list *subchannel_list;
/** subchannel itself */
grpc_subchannel *subchannel;
grpc_connected_subchannel *connected_subchannel;
/** Is a connectivity notification pending? */
bool connectivity_notification_pending;
/** notification that connectivity has changed on subchannel */
grpc_closure connectivity_changed_closure;
/** previous and current connectivity states. Updated by \a
* \a connectivity_changed_closure based on
* \a pending_connectivity_state_unsafe. */
grpc_connectivity_state prev_connectivity_state;
grpc_connectivity_state curr_connectivity_state;
/** connectivity state to be updated by
* grpc_subchannel_notify_on_state_change(), not guarded by
* the combiner. To be copied to \a curr_connectivity_state by
* \a connectivity_changed_closure. */
grpc_connectivity_state pending_connectivity_state_unsafe;
/** the subchannel's target user data */
void *user_data;
/** vtable to operate over \a user_data */
const grpc_lb_user_data_vtable *user_data_vtable;
} grpc_lb_subchannel_data;
/// Unrefs the subchannel contained in sd.
void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx,
grpc_lb_subchannel_data *sd,
const char *reason);
/// Starts watching the connectivity state of the subchannel.
/// The connectivity_changed_cb callback must invoke either
/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call
/// grpc_lb_subchannel_data_start_connectivity_watch().
void grpc_lb_subchannel_data_start_connectivity_watch(
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd);
/// Stops watching the connectivity state of the subchannel.
void grpc_lb_subchannel_data_stop_connectivity_watch(
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd);
struct grpc_lb_subchannel_list {
/** backpointer to owning policy */
grpc_lb_policy *policy;
grpc_tracer_flag *tracer;
/** all our subchannels */
size_t num_subchannels;
grpc_lb_subchannel_data *subchannels;
/** Index into subchannels of the one we're currently checking.
* Used when connecting to subchannels serially instead of in parallel. */
// TODO(roth): When we have time, we can probably make this go away
// and compute the index dynamically by subtracting
// subchannel_list->subchannels from the subchannel_data pointer.
size_t checking_subchannel;
/** how many subchannels are in state READY */
size_t num_ready;
/** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
/** how many subchannels are in state SHUTDOWN */
size_t num_shutdown;
/** how many subchannels are in state IDLE */
size_t num_idle;
/** There will be one ref for each entry in subchannels for which there is a
* pending connectivity state watcher callback. */
gpr_refcount refcount;
/** Is this list shutting down? This may be true due to the shutdown of the
* policy itself or because a newer update has arrived while this one hadn't
* finished processing. */
bool shutting_down;
};
grpc_lb_subchannel_list *grpc_lb_subchannel_list_create(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer,
const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args,
grpc_iomgr_cb_func connectivity_changed_cb);
void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list,
const char *reason);
void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
grpc_lb_subchannel_list *subchannel_list,
const char *reason);
/// Takes and releases refs needed for a connectivity notification.
/// This includes a ref to subchannel_list and a weak ref to the LB policy.
void grpc_lb_subchannel_list_ref_for_connectivity_watch(
grpc_lb_subchannel_list *subchannel_list, const char *reason);
void grpc_lb_subchannel_list_unref_for_connectivity_watch(
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list,
const char *reason);
/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The
/// connectivity state notification callback will ultimately unref it.
void grpc_lb_subchannel_list_shutdown_and_unref(
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list,
const char *reason);
#ifdef __cplusplus
}
#endif
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */

@ -127,8 +127,8 @@ void grpc_connected_subchannel_process_transport_op(
grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_connectivity_state grpc_subchannel_check_connectivity(
grpc_subchannel *channel, grpc_error **error); grpc_subchannel *channel, grpc_error **error);
/** call notify when the connectivity state of a channel changes from *state. /** Calls notify when the connectivity state of a channel becomes different
Updates *state with the new state of the channel */ from *state. Updates *state with the new state of the channel. */
void grpc_subchannel_notify_on_state_change( void grpc_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, grpc_exec_ctx *exec_ctx, grpc_subchannel *channel,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_pollset_set *interested_parties, grpc_connectivity_state *state,

@ -29,8 +29,6 @@ grpc_tracer_flag grpc_connectivity_state_trace =
const char *grpc_connectivity_state_name(grpc_connectivity_state state) { const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
switch (state) { switch (state) {
case GRPC_CHANNEL_INIT:
return "INIT";
case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_IDLE:
return "IDLE"; return "IDLE";
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING:
@ -174,7 +172,6 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_name(state), reason, error, error_string); grpc_connectivity_state_name(state), reason, error, error_string);
} }
switch (state) { switch (state) {
case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_READY: case GRPC_CHANNEL_READY:

@ -293,6 +293,7 @@ CORE_SOURCE_FILES = [
'third_party/nanopb/pb_encode.c', 'third_party/nanopb/pb_encode.c',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',

@ -53,8 +53,8 @@ typedef struct request_sequences {
size_t n; /* number of iterations */ size_t n; /* number of iterations */
int *connections; /* indexed by the interation number, value is the index of int *connections; /* indexed by the interation number, value is the index of
the server it connected to or -1 if none */ the server it connected to or -1 if none */
int *connectivity_states; /* indexed by the interation number, value is the /* indexed by the interation number, value is the client connectivity state */
client connectivity state */ grpc_connectivity_state *connectivity_states;
} request_sequences; } request_sequences;
typedef void (*verifier_fn)(const servers_fixture *, grpc_channel *, typedef void (*verifier_fn)(const servers_fixture *, grpc_channel *,

@ -305,7 +305,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
ports.clear(); ports.clear();
SetNextResolution(ports); SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET none *******"); gpr_log(GPR_INFO, "****** SET none *******");
grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; grpc_connectivity_state channel_state;
do { do {
channel_state = channel_->GetState(true /* try to connect */); channel_state = channel_->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY); } while (channel_state == GRPC_CHANNEL_READY);
@ -481,7 +481,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
// An empty update will result in the channel going into TRANSIENT_FAILURE. // An empty update will result in the channel going into TRANSIENT_FAILURE.
ports.clear(); ports.clear();
SetNextResolution(ports); SetNextResolution(ports);
grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; grpc_connectivity_state channel_state;
do { do {
channel_state = channel_->GetState(true /* try to connect */); channel_state = channel_->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY); } while (channel_state == GRPC_CHANNEL_READY);

@ -935,6 +935,8 @@ src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balan
src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc \
src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \
src/core/ext/filters/client_channel/lb_policy_factory.cc \ src/core/ext/filters/client_channel/lb_policy_factory.cc \
src/core/ext/filters/client_channel/lb_policy_factory.h \ src/core/ext/filters/client_channel/lb_policy_factory.h \
src/core/ext/filters/client_channel/lb_policy_registry.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \

@ -8684,7 +8684,8 @@
"deps": [ "deps": [
"gpr", "gpr",
"grpc_base", "grpc_base",
"grpc_client_channel" "grpc_client_channel",
"grpc_lb_subchannel_list"
], ],
"headers": [], "headers": [],
"is_filegroup": true, "is_filegroup": true,
@ -8700,7 +8701,8 @@
"deps": [ "deps": [
"gpr", "gpr",
"grpc_base", "grpc_base",
"grpc_client_channel" "grpc_client_channel",
"grpc_lb_subchannel_list"
], ],
"headers": [], "headers": [],
"is_filegroup": true, "is_filegroup": true,
@ -8712,6 +8714,25 @@
"third_party": false, "third_party": false,
"type": "filegroup" "type": "filegroup"
}, },
{
"deps": [
"gpr",
"grpc_base",
"grpc_client_channel"
],
"headers": [
"src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
],
"is_filegroup": true,
"language": "c",
"name": "grpc_lb_subchannel_list",
"src": [
"src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc",
"src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
],
"third_party": false,
"type": "filegroup"
},
{ {
"deps": [ "deps": [
"gpr", "gpr",

Loading…
Cancel
Save