Merge pull request #18917 from markdroth/subchannel_interface

Use SubchannelInterface to hide implementation from LB policy API
pull/19197/head
Mark D. Roth 6 years ago committed by GitHub
commit d26201494b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 1
      BUILD.gn
  3. 1
      build.yaml
  4. 1
      gRPC-C++.podspec
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 1
      package.xml
  8. 123
      src/core/ext/filters/client_channel/client_channel.cc
  9. 8
      src/core/ext/filters/client_channel/lb_policy.h
  10. 6
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  11. 10
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  12. 7
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  13. 91
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  14. 11
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  15. 5
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  16. 3
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  17. 26
      src/core/ext/filters/client_channel/subchannel.cc
  18. 46
      src/core/ext/filters/client_channel/subchannel.h
  19. 109
      src/core/ext/filters/client_channel/subchannel_interface.h
  20. 3
      test/core/util/test_lb_policies.cc
  21. 1
      tools/doxygen/Doxyfile.core.internal
  22. 2
      tools/run_tests/generated/sources_and_headers.json

@ -1143,6 +1143,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/server_address.h", "src/core/ext/filters/client_channel/server_address.h",
"src/core/ext/filters/client_channel/service_config.h", "src/core/ext/filters/client_channel/service_config.h",
"src/core/ext/filters/client_channel/subchannel.h", "src/core/ext/filters/client_channel/subchannel.h",
"src/core/ext/filters/client_channel/subchannel_interface.h",
"src/core/ext/filters/client_channel/subchannel_pool_interface.h", "src/core/ext/filters/client_channel/subchannel_pool_interface.h",
], ],
language = "c++", language = "c++",

@ -345,6 +345,7 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/service_config.h", "src/core/ext/filters/client_channel/service_config.h",
"src/core/ext/filters/client_channel/subchannel.cc", "src/core/ext/filters/client_channel/subchannel.cc",
"src/core/ext/filters/client_channel/subchannel.h", "src/core/ext/filters/client_channel/subchannel.h",
"src/core/ext/filters/client_channel/subchannel_interface.h",
"src/core/ext/filters/client_channel/subchannel_pool_interface.cc", "src/core/ext/filters/client_channel/subchannel_pool_interface.cc",
"src/core/ext/filters/client_channel/subchannel_pool_interface.h", "src/core/ext/filters/client_channel/subchannel_pool_interface.h",
"src/core/ext/filters/deadline/deadline_filter.cc", "src/core/ext/filters/deadline/deadline_filter.cc",

@ -595,6 +595,7 @@ filegroups:
- src/core/ext/filters/client_channel/server_address.h - src/core/ext/filters/client_channel/server_address.h
- src/core/ext/filters/client_channel/service_config.h - src/core/ext/filters/client_channel/service_config.h
- src/core/ext/filters/client_channel/subchannel.h - src/core/ext/filters/client_channel/subchannel.h
- src/core/ext/filters/client_channel/subchannel_interface.h
- src/core/ext/filters/client_channel/subchannel_pool_interface.h - src/core/ext/filters/client_channel/subchannel_pool_interface.h
src: src:
- src/core/ext/filters/client_channel/backup_poller.cc - src/core/ext/filters/client_channel/backup_poller.cc

@ -399,6 +399,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/server_address.h', 'src/core/ext/filters/client_channel/server_address.h',
'src/core/ext/filters/client_channel/service_config.h', 'src/core/ext/filters/client_channel/service_config.h',
'src/core/ext/filters/client_channel/subchannel.h', 'src/core/ext/filters/client_channel/subchannel.h',
'src/core/ext/filters/client_channel/subchannel_interface.h',
'src/core/ext/filters/client_channel/subchannel_pool_interface.h', 'src/core/ext/filters/client_channel/subchannel_pool_interface.h',
'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/client_channel/health/health.pb.h', 'src/core/ext/filters/client_channel/health/health.pb.h',

@ -371,6 +371,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/server_address.h', 'src/core/ext/filters/client_channel/server_address.h',
'src/core/ext/filters/client_channel/service_config.h', 'src/core/ext/filters/client_channel/service_config.h',
'src/core/ext/filters/client_channel/subchannel.h', 'src/core/ext/filters/client_channel/subchannel.h',
'src/core/ext/filters/client_channel/subchannel_interface.h',
'src/core/ext/filters/client_channel/subchannel_pool_interface.h', 'src/core/ext/filters/client_channel/subchannel_pool_interface.h',
'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/client_channel/health/health.pb.h', 'src/core/ext/filters/client_channel/health/health.pb.h',
@ -1023,6 +1024,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/server_address.h', 'src/core/ext/filters/client_channel/server_address.h',
'src/core/ext/filters/client_channel/service_config.h', 'src/core/ext/filters/client_channel/service_config.h',
'src/core/ext/filters/client_channel/subchannel.h', 'src/core/ext/filters/client_channel/subchannel.h',
'src/core/ext/filters/client_channel/subchannel_interface.h',
'src/core/ext/filters/client_channel/subchannel_pool_interface.h', 'src/core/ext/filters/client_channel/subchannel_pool_interface.h',
'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/client_channel/health/health.pb.h', 'src/core/ext/filters/client_channel/health/health.pb.h',

@ -305,6 +305,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/server_address.h ) s.files += %w( src/core/ext/filters/client_channel/server_address.h )
s.files += %w( src/core/ext/filters/client_channel/service_config.h ) s.files += %w( src/core/ext/filters/client_channel/service_config.h )
s.files += %w( src/core/ext/filters/client_channel/subchannel.h ) s.files += %w( src/core/ext/filters/client_channel/subchannel.h )
s.files += %w( src/core/ext/filters/client_channel/subchannel_interface.h )
s.files += %w( src/core/ext/filters/client_channel/subchannel_pool_interface.h ) s.files += %w( src/core/ext/filters/client_channel/subchannel_pool_interface.h )
s.files += %w( src/core/ext/filters/deadline/deadline_filter.h ) s.files += %w( src/core/ext/filters/deadline/deadline_filter.h )
s.files += %w( src/core/ext/filters/client_channel/health/health.pb.h ) s.files += %w( src/core/ext/filters/client_channel/health/health.pb.h )

@ -310,6 +310,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/server_address.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/server_address.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/service_config.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/service_config.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/subchannel.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/subchannel.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/subchannel_interface.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/subchannel_pool_interface.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/subchannel_pool_interface.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/deadline/deadline_filter.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/deadline/deadline_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/health/health.pb.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/health/health.pb.h" role="src" />

@ -51,6 +51,7 @@
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
@ -174,6 +175,7 @@ class ChannelData {
private: private:
class ConnectivityStateAndPickerSetter; class ConnectivityStateAndPickerSetter;
class ServiceConfigSetter; class ServiceConfigSetter;
class GrpcSubchannel;
class ClientChannelControlHelper; class ClientChannelControlHelper;
class ExternalConnectivityWatcher { class ExternalConnectivityWatcher {
@ -221,7 +223,7 @@ class ChannelData {
~ChannelData(); ~ChannelData();
static bool ProcessResolverResultLocked( static bool ProcessResolverResultLocked(
void* arg, Resolver::Result* result, const char** lb_policy_name, void* arg, const Resolver::Result& result, const char** lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config, RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error); grpc_error** service_config_error);
@ -270,6 +272,7 @@ class ChannelData {
OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy_; OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy_;
grpc_connectivity_state_tracker state_tracker_; grpc_connectivity_state_tracker state_tracker_;
ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_; ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
UniquePtr<char> health_check_service_name_;
RefCountedPtr<ServiceConfig> saved_service_config_; RefCountedPtr<ServiceConfig> saved_service_config_;
bool received_first_resolver_result_ = false; bool received_first_resolver_result_ = false;
@ -954,6 +957,65 @@ void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
&self->chand_->state_tracker_, self->state_, &self->my_closure_); &self->chand_->state_tracker_, self->state_, &self->my_closure_);
} }
//
// ChannelData::GrpcSubchannel
//
// This class is a wrapper for Subchannel that hides details of the
// channel's implementation (such as the health check service name) from
// the LB policy API.
//
// Note that no synchronization is needed here, because even if the
// underlying subchannel is shared between channels, this wrapper will only
// be used within one channel, so it will always be synchronized by the
// control plane combiner.
class ChannelData::GrpcSubchannel : public SubchannelInterface {
public:
GrpcSubchannel(Subchannel* subchannel,
UniquePtr<char> health_check_service_name)
: subchannel_(subchannel),
health_check_service_name_(std::move(health_check_service_name)) {}
~GrpcSubchannel() { GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB"); }
grpc_connectivity_state CheckConnectivityState(
RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
override {
RefCountedPtr<ConnectedSubchannel> tmp;
auto retval = subchannel_->CheckConnectivityState(
health_check_service_name_.get(), &tmp);
*connected_subchannel = std::move(tmp);
return retval;
}
void WatchConnectivityState(
grpc_connectivity_state initial_state,
UniquePtr<ConnectivityStateWatcher> watcher) override {
subchannel_->WatchConnectivityState(
initial_state,
UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
std::move(watcher));
}
void CancelConnectivityStateWatch(
ConnectivityStateWatcher* watcher) override {
subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
watcher);
}
void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
channelz::SubchannelNode* channelz_node() override {
return subchannel_->channelz_node();
}
void ResetBackoff() override { subchannel_->ResetBackoff(); }
private:
Subchannel* subchannel_;
UniquePtr<char> health_check_service_name_;
};
// //
// ChannelData::ClientChannelControlHelper // ChannelData::ClientChannelControlHelper
// //
@ -970,15 +1032,26 @@ class ChannelData::ClientChannelControlHelper
"ClientChannelControlHelper"); "ClientChannelControlHelper");
} }
Subchannel* CreateSubchannel(const grpc_channel_args& args) override { RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override {
bool inhibit_health_checking = grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
UniquePtr<char> health_check_service_name;
if (!inhibit_health_checking) {
health_check_service_name.reset(
gpr_strdup(chand_->health_check_service_name_.get()));
}
static const char* args_to_remove[] = {GRPC_ARG_INHIBIT_HEALTH_CHECKING};
grpc_arg arg = SubchannelPoolInterface::CreateChannelArg( grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
chand_->subchannel_pool_.get()); chand_->subchannel_pool_.get());
grpc_channel_args* new_args = grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
grpc_channel_args_copy_and_add(&args, &arg, 1); &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1);
Subchannel* subchannel = Subchannel* subchannel =
chand_->client_channel_factory_->CreateSubchannel(new_args); chand_->client_channel_factory_->CreateSubchannel(new_args);
grpc_channel_args_destroy(new_args); grpc_channel_args_destroy(new_args);
return subchannel; if (subchannel == nullptr) return nullptr;
return MakeRefCounted<GrpcSubchannel>(subchannel,
std::move(health_check_service_name));
} }
grpc_channel* CreateChannel(const char* target, grpc_channel* CreateChannel(const char* target,
@ -1211,14 +1284,14 @@ void ChannelData::ProcessLbPolicy(
// Synchronous callback from ResolvingLoadBalancingPolicy to process a // Synchronous callback from ResolvingLoadBalancingPolicy to process a
// resolver result update. // resolver result update.
bool ChannelData::ProcessResolverResultLocked( bool ChannelData::ProcessResolverResultLocked(
void* arg, Resolver::Result* result, const char** lb_policy_name, void* arg, const Resolver::Result& result, const char** lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config, RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error) { grpc_error** service_config_error) {
ChannelData* chand = static_cast<ChannelData*>(arg); ChannelData* chand = static_cast<ChannelData*>(arg);
RefCountedPtr<ServiceConfig> service_config; RefCountedPtr<ServiceConfig> service_config;
// If resolver did not return a service config or returned an invalid service // If resolver did not return a service config or returned an invalid service
// config, we need a fallback service config. // config, we need a fallback service config.
if (result->service_config_error != GRPC_ERROR_NONE) { if (result.service_config_error != GRPC_ERROR_NONE) {
// If the service config was invalid, then fallback to the saved service // If the service config was invalid, then fallback to the saved service
// config. If there is no saved config either, use the default service // config. If there is no saved config either, use the default service
// config. // config.
@ -1239,7 +1312,7 @@ bool ChannelData::ProcessResolverResultLocked(
} }
service_config = chand->default_service_config_; service_config = chand->default_service_config_;
} }
} else if (result->service_config == nullptr) { } else if (result.service_config == nullptr) {
if (chand->default_service_config_ != nullptr) { if (chand->default_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
@ -1250,11 +1323,11 @@ bool ChannelData::ProcessResolverResultLocked(
service_config = chand->default_service_config_; service_config = chand->default_service_config_;
} }
} else { } else {
service_config = result->service_config; service_config = result.service_config;
} }
*service_config_error = GRPC_ERROR_REF(result->service_config_error); *service_config_error = GRPC_ERROR_REF(result.service_config_error);
if (service_config == nullptr && if (service_config == nullptr &&
result->service_config_error != GRPC_ERROR_NONE) { result.service_config_error != GRPC_ERROR_NONE) {
return false; return false;
} }
// Process service config. // Process service config.
@ -1267,19 +1340,6 @@ bool ChannelData::ProcessResolverResultLocked(
service_config->GetGlobalParsedConfig( service_config->GetGlobalParsedConfig(
internal::ClientChannelServiceConfigParser::ParserIndex())); internal::ClientChannelServiceConfigParser::ParserIndex()));
} }
// TODO(roth): Eliminate this hack as part of hiding health check
// service name from LB policy API. As part of this, change the API
// for this function to pass in result as a const reference.
if (parsed_service_config != nullptr &&
parsed_service_config->health_check_service_name() != nullptr) {
grpc_arg new_arg = grpc_channel_arg_string_create(
const_cast<char*>("grpc.temp.health_check"),
const_cast<char*>(parsed_service_config->health_check_service_name()));
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(result->args, &new_arg, 1);
grpc_channel_args_destroy(result->args);
result->args = new_args;
}
// Check if the config has changed. // Check if the config has changed.
const bool service_config_changed = const bool service_config_changed =
((service_config == nullptr) != ((service_config == nullptr) !=
@ -1296,6 +1356,14 @@ bool ChannelData::ProcessResolverResultLocked(
"chand=%p: resolver returned updated service config: \"%s\"", "chand=%p: resolver returned updated service config: \"%s\"",
chand, service_config_json.get()); chand, service_config_json.get());
} }
// Save health check service name.
if (service_config != nullptr) {
chand->health_check_service_name_.reset(
gpr_strdup(parsed_service_config->health_check_service_name()));
} else {
chand->health_check_service_name_.reset();
}
// Save service config.
chand->saved_service_config_ = std::move(service_config); chand->saved_service_config_ = std::move(service_config);
} }
// We want to set the service config at least once. This should not really be // We want to set the service config at least once. This should not really be
@ -1314,7 +1382,7 @@ bool ChannelData::ProcessResolverResultLocked(
chand->saved_service_config_); chand->saved_service_config_);
} }
UniquePtr<char> processed_lb_policy_name; UniquePtr<char> processed_lb_policy_name;
chand->ProcessLbPolicy(*result, parsed_service_config, chand->ProcessLbPolicy(result, parsed_service_config,
&processed_lb_policy_name, lb_policy_config); &processed_lb_policy_name, lb_policy_config);
// Swap out the data used by GetChannelInfo(). // Swap out the data used by GetChannelInfo().
{ {
@ -1336,8 +1404,9 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
LoadBalancingPolicy::PickResult result = LoadBalancingPolicy::PickResult result =
picker_->Pick(LoadBalancingPolicy::PickArgs()); picker_->Pick(LoadBalancingPolicy::PickArgs());
if (result.connected_subchannel != nullptr) { if (result.connected_subchannel != nullptr) {
result.connected_subchannel->Ping(op->send_ping.on_initiate, ConnectedSubchannel* connected_subchannel =
op->send_ping.on_ack); static_cast<ConnectedSubchannel*>(result.connected_subchannel.get());
connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
} else { } else {
if (result.error == GRPC_ERROR_NONE) { if (result.error == GRPC_ERROR_NONE) {
result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(

@ -24,7 +24,7 @@
#include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_interface.h"
#include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -128,7 +128,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Used only if type is PICK_COMPLETE. Will be set to the selected /// Used only if type is PICK_COMPLETE. Will be set to the selected
/// subchannel, or nullptr if the LB policy decides to drop the call. /// subchannel, or nullptr if the LB policy decides to drop the call.
RefCountedPtr<ConnectedSubchannel> connected_subchannel; RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel;
/// Used only if type is PICK_TRANSIENT_FAILURE. /// Used only if type is PICK_TRANSIENT_FAILURE.
/// Error to be set when returning a transient failure. /// Error to be set when returning a transient failure.
@ -184,8 +184,8 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
virtual ~ChannelControlHelper() = default; virtual ~ChannelControlHelper() = default;
/// Creates a new subchannel with the specified channel args. /// Creates a new subchannel with the specified channel args.
virtual Subchannel* CreateSubchannel(const grpc_channel_args& args) virtual RefCountedPtr<SubchannelInterface> CreateSubchannel(
GRPC_ABSTRACT; const grpc_channel_args& args) GRPC_ABSTRACT;
/// Creates a channel with the specified target and channel args. /// Creates a channel with the specified target and channel args.
/// This can be used in cases where the LB policy needs to create a /// This can be used in cases where the LB policy needs to create a

@ -293,7 +293,8 @@ class GrpcLb : public LoadBalancingPolicy {
explicit Helper(RefCountedPtr<GrpcLb> parent) explicit Helper(RefCountedPtr<GrpcLb> parent)
: parent_(std::move(parent)) {} : parent_(std::move(parent)) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override; RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target, grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override; const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, void UpdateState(grpc_connectivity_state state,
@ -620,7 +621,8 @@ bool GrpcLb::Helper::CalledByCurrentChild() const {
return child_ == parent_->child_policy_.get(); return child_ == parent_->child_policy_.get();
} }
Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) { RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
const grpc_channel_args& args) {
if (parent_->shutting_down_ || if (parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) { (!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr; return nullptr;

@ -68,8 +68,9 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData( PickFirstSubchannelData(
SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>* SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
subchannel_list, subchannel_list,
const ServerAddress& address, Subchannel* subchannel) const ServerAddress& address,
: SubchannelData(subchannel_list, address, subchannel) {} RefCountedPtr<SubchannelInterface> subchannel)
: SubchannelData(subchannel_list, address, std::move(subchannel)) {}
void ProcessConnectivityChangeLocked( void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override; grpc_connectivity_state connectivity_state) override;
@ -112,7 +113,8 @@ class PickFirst : public LoadBalancingPolicy {
class Picker : public SubchannelPicker { class Picker : public SubchannelPicker {
public: public:
explicit Picker(RefCountedPtr<ConnectedSubchannel> connected_subchannel) explicit Picker(
RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel)
: connected_subchannel_(std::move(connected_subchannel)) {} : connected_subchannel_(std::move(connected_subchannel)) {}
PickResult Pick(PickArgs args) override { PickResult Pick(PickArgs args) override {
@ -123,7 +125,7 @@ class PickFirst : public LoadBalancingPolicy {
} }
private: private:
RefCountedPtr<ConnectedSubchannel> connected_subchannel_; RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
}; };
// Helper class to ensure that any function that modifies the child refs // Helper class to ensure that any function that modifies the child refs

@ -83,8 +83,9 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobinSubchannelData( RoundRobinSubchannelData(
SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>* SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
subchannel_list, subchannel_list,
const ServerAddress& address, Subchannel* subchannel) const ServerAddress& address,
: SubchannelData(subchannel_list, address, subchannel) {} RefCountedPtr<SubchannelInterface> subchannel)
: SubchannelData(subchannel_list, address, std::move(subchannel)) {}
grpc_connectivity_state connectivity_state() const { grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_; return last_connectivity_state_;
@ -156,7 +157,7 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobin* parent_; RoundRobin* parent_;
size_t last_picked_index_; size_t last_picked_index_;
InlinedVector<RefCountedPtr<ConnectedSubchannel>, 10> subchannels_; InlinedVector<RefCountedPtr<ConnectedSubchannelInterface>, 10> subchannels_;
}; };
// Helper class to ensure that any function that modifies the child refs // Helper class to ensure that any function that modifies the child refs

@ -27,7 +27,10 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/server_address.h"
// TODO(roth): Should not need the include of subchannel.h here, since
// that implementation should be hidden from the LB policy API.
#include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_interface.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/gprpp/abstract.h"
@ -88,11 +91,11 @@ class SubchannelData {
} }
// Returns a pointer to the subchannel. // Returns a pointer to the subchannel.
Subchannel* subchannel() const { return subchannel_; } SubchannelInterface* subchannel() const { return subchannel_.get(); }
// Returns the connected subchannel. Will be null if the subchannel // Returns the connected subchannel. Will be null if the subchannel
// is not connected. // is not connected.
ConnectedSubchannel* connected_subchannel() const { ConnectedSubchannelInterface* connected_subchannel() const {
return connected_subchannel_.get(); return connected_subchannel_.get();
} }
@ -102,8 +105,8 @@ class SubchannelData {
// calling CancelConnectivityWatchLocked()). // calling CancelConnectivityWatchLocked()).
grpc_connectivity_state CheckConnectivityStateLocked() { grpc_connectivity_state CheckConnectivityStateLocked() {
GPR_ASSERT(pending_watcher_ == nullptr); GPR_ASSERT(pending_watcher_ == nullptr);
connectivity_state_ = subchannel()->CheckConnectivityState( connectivity_state_ =
subchannel_list_->health_check_service_name(), &connected_subchannel_); subchannel()->CheckConnectivityState(&connected_subchannel_);
return connectivity_state_; return connectivity_state_;
} }
@ -128,7 +131,8 @@ class SubchannelData {
protected: protected:
SubchannelData( SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list, SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
const ServerAddress& address, Subchannel* subchannel); const ServerAddress& address,
RefCountedPtr<SubchannelInterface> subchannel);
virtual ~SubchannelData(); virtual ~SubchannelData();
@ -140,7 +144,7 @@ class SubchannelData {
private: private:
// Watcher for subchannel connectivity state. // Watcher for subchannel connectivity state.
class Watcher : public Subchannel::ConnectivityStateWatcher { class Watcher : public SubchannelInterface::ConnectivityStateWatcher {
public: public:
Watcher( Watcher(
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data, SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
@ -150,9 +154,9 @@ class SubchannelData {
~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); } ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); }
void OnConnectivityStateChange( void OnConnectivityStateChange(grpc_connectivity_state new_state,
grpc_connectivity_state new_state, RefCountedPtr<ConnectedSubchannelInterface>
RefCountedPtr<ConnectedSubchannel> connected_subchannel) override; connected_subchannel) override;
grpc_pollset_set* interested_parties() override { grpc_pollset_set* interested_parties() override {
return subchannel_list_->policy()->interested_parties(); return subchannel_list_->policy()->interested_parties();
@ -169,7 +173,7 @@ class SubchannelData {
RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>> RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
subchannel_list, subchannel_list,
grpc_connectivity_state state, grpc_connectivity_state state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel); RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel);
~Updater() { ~Updater() {
subchannel_list_.reset(DEBUG_LOCATION, "Watcher::Updater dtor"); subchannel_list_.reset(DEBUG_LOCATION, "Watcher::Updater dtor");
@ -182,7 +186,7 @@ class SubchannelData {
RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>> RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
subchannel_list_; subchannel_list_;
const grpc_connectivity_state state_; const grpc_connectivity_state state_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_; RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
grpc_closure closure_; grpc_closure closure_;
}; };
@ -196,12 +200,12 @@ class SubchannelData {
// Backpointer to owning subchannel list. Not owned. // Backpointer to owning subchannel list. Not owned.
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_; SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
// The subchannel. // The subchannel.
Subchannel* subchannel_; RefCountedPtr<SubchannelInterface> subchannel_;
// Will be non-null when the subchannel's state is being watched. // Will be non-null when the subchannel's state is being watched.
Subchannel::ConnectivityStateWatcher* pending_watcher_ = nullptr; SubchannelInterface::ConnectivityStateWatcher* pending_watcher_ = nullptr;
// Data updated by the watcher. // Data updated by the watcher.
grpc_connectivity_state connectivity_state_; grpc_connectivity_state connectivity_state_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_; RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
}; };
// A list of subchannels. // A list of subchannels.
@ -235,9 +239,6 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
// Accessors. // Accessors.
LoadBalancingPolicy* policy() const { return policy_; } LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; } TraceFlag* tracer() const { return tracer_; }
const char* health_check_service_name() const {
return health_check_service_name_.get();
}
// Resets connection backoff of all subchannels. // Resets connection backoff of all subchannels.
// TODO(roth): We will probably need to rethink this as part of moving // TODO(roth): We will probably need to rethink this as part of moving
@ -275,8 +276,6 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
TraceFlag* tracer_; TraceFlag* tracer_;
UniquePtr<char> health_check_service_name_;
grpc_combiner* combiner_; grpc_combiner* combiner_;
// The list of subchannels. // The list of subchannels.
@ -300,7 +299,7 @@ template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher:: void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
OnConnectivityStateChange( OnConnectivityStateChange(
grpc_connectivity_state new_state, grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel) { RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel) {
// Will delete itself. // Will delete itself.
New<Updater>(subchannel_data_, New<Updater>(subchannel_data_,
subchannel_list_->Ref(DEBUG_LOCATION, "Watcher::Updater"), subchannel_list_->Ref(DEBUG_LOCATION, "Watcher::Updater"),
@ -314,7 +313,7 @@ SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>> RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
subchannel_list, subchannel_list,
grpc_connectivity_state state, grpc_connectivity_state state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel) RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel)
: subchannel_data_(subchannel_data), : subchannel_data_(subchannel_data),
subchannel_list_(std::move(subchannel_list)), subchannel_list_(std::move(subchannel_list)),
state_(state), state_(state),
@ -336,7 +335,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
"connected_subchannel=%p, shutting_down=%d, pending_watcher=%p", "connected_subchannel=%p, shutting_down=%d, pending_watcher=%p",
sd->subchannel_list_->tracer()->name(), sd->subchannel_list_->tracer()->name(),
sd->subchannel_list_->policy(), sd->subchannel_list_, sd->Index(), sd->subchannel_list_->policy(), sd->subchannel_list_, sd->Index(),
sd->subchannel_list_->num_subchannels(), sd->subchannel_, sd->subchannel_list_->num_subchannels(), sd->subchannel_.get(),
grpc_connectivity_state_name(self->state_), grpc_connectivity_state_name(self->state_),
self->connected_subchannel_.get(), self->connected_subchannel_.get(),
sd->subchannel_list_->shutting_down(), sd->pending_watcher_); sd->subchannel_list_->shutting_down(), sd->pending_watcher_);
@ -360,9 +359,9 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
template <typename SubchannelListType, typename SubchannelDataType> template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData( SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list, SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
const ServerAddress& address, Subchannel* subchannel) const ServerAddress& address, RefCountedPtr<SubchannelInterface> subchannel)
: subchannel_list_(subchannel_list), : subchannel_list_(subchannel_list),
subchannel_(subchannel), subchannel_(std::move(subchannel)),
// We assume that the current state is IDLE. If not, we'll get a // We assume that the current state is IDLE. If not, we'll get a
// callback telling us that. // callback telling us that.
connectivity_state_(GRPC_CHANNEL_IDLE) {} connectivity_state_(GRPC_CHANNEL_IDLE) {}
@ -382,10 +381,9 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
" (subchannel %p): unreffing subchannel", " (subchannel %p): unreffing subchannel",
subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_); subchannel_.get());
} }
GRPC_SUBCHANNEL_UNREF(subchannel_, reason); subchannel_.reset();
subchannel_ = nullptr;
connected_subchannel_.reset(); connected_subchannel_.reset();
} }
} }
@ -407,16 +405,16 @@ void SubchannelData<SubchannelListType,
" (subchannel %p): starting watch (from %s)", " (subchannel %p): starting watch (from %s)",
subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_, grpc_connectivity_state_name(connectivity_state_)); subchannel_.get(),
grpc_connectivity_state_name(connectivity_state_));
} }
GPR_ASSERT(pending_watcher_ == nullptr); GPR_ASSERT(pending_watcher_ == nullptr);
pending_watcher_ = pending_watcher_ =
New<Watcher>(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher")); New<Watcher>(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
subchannel_->WatchConnectivityState( subchannel_->WatchConnectivityState(
connectivity_state_, connectivity_state_,
UniquePtr<char>( UniquePtr<SubchannelInterface::ConnectivityStateWatcher>(
gpr_strdup(subchannel_list_->health_check_service_name())), pending_watcher_));
UniquePtr<Subchannel::ConnectivityStateWatcher>(pending_watcher_));
} }
template <typename SubchannelListType, typename SubchannelDataType> template <typename SubchannelListType, typename SubchannelDataType>
@ -428,11 +426,10 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
" (subchannel %p): canceling connectivity watch (%s)", " (subchannel %p): canceling connectivity watch (%s)",
subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_, reason); subchannel_.get(), reason);
} }
if (pending_watcher_ != nullptr) { if (pending_watcher_ != nullptr) {
subchannel_->CancelConnectivityStateWatch( subchannel_->CancelConnectivityStateWatch(pending_watcher_);
subchannel_list_->health_check_service_name(), pending_watcher_);
pending_watcher_ = nullptr; pending_watcher_ = nullptr;
} }
} }
@ -463,25 +460,12 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
tracer_->name(), policy, this, addresses.size()); tracer_->name(), policy, this, addresses.size());
} }
subchannels_.reserve(addresses.size()); subchannels_.reserve(addresses.size());
// Find health check service name.
const bool inhibit_health_checking = grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
if (!inhibit_health_checking) {
const char* health_check_service_name = grpc_channel_arg_get_string(
grpc_channel_args_find(&args, "grpc.temp.health_check"));
if (health_check_service_name != nullptr) {
health_check_service_name_.reset(gpr_strdup(health_check_service_name));
}
}
// We need to remove the LB addresses in order to be able to compare the // We need to remove the LB addresses in order to be able to compare the
// subchannel keys of subchannels from a different batch of addresses. // subchannel keys of subchannels from a different batch of addresses.
// We also remove the health-checking-related args, since we are
// handling that here.
// We remove the service config, since it will be passed into the // We remove the service config, since it will be passed into the
// subchannel via call context. // subchannel via call context.
static const char* keys_to_remove[] = { static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
GRPC_ARG_SUBCHANNEL_ADDRESS, "grpc.temp.health_check", GRPC_ARG_SERVICE_CONFIG};
GRPC_ARG_INHIBIT_HEALTH_CHECKING, GRPC_ARG_SERVICE_CONFIG};
// Create a subchannel for each address. // Create a subchannel for each address.
for (size_t i = 0; i < addresses.size(); i++) { for (size_t i = 0; i < addresses.size(); i++) {
// TODO(roth): we should ideally hide this from the LB policy code. In // TODO(roth): we should ideally hide this from the LB policy code. In
@ -504,7 +488,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
&args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
args_to_add.data(), args_to_add.size()); args_to_add.data(), args_to_add.size());
gpr_free(args_to_add[subchannel_address_arg_index].value.string); gpr_free(args_to_add[subchannel_address_arg_index].value.string);
Subchannel* subchannel = helper->CreateSubchannel(*new_args); RefCountedPtr<SubchannelInterface> subchannel =
helper->CreateSubchannel(*new_args);
grpc_channel_args_destroy(new_args); grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) { if (subchannel == nullptr) {
// Subchannel could not be created. // Subchannel could not be created.
@ -523,11 +508,11 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR "[%s %p] subchannel list %p index %" PRIuPTR
": Created subchannel %p for address uri %s", ": Created subchannel %p for address uri %s",
tracer_->name(), policy_, this, subchannels_.size(), subchannel, tracer_->name(), policy_, this, subchannels_.size(),
address_uri); subchannel.get(), address_uri);
gpr_free(address_uri); gpr_free(address_uri);
} }
subchannels_.emplace_back(this, addresses[i], subchannel); subchannels_.emplace_back(this, addresses[i], std::move(subchannel));
} }
} }

@ -334,7 +334,8 @@ class XdsLb : public LoadBalancingPolicy {
explicit FallbackHelper(RefCountedPtr<XdsLb> parent) explicit FallbackHelper(RefCountedPtr<XdsLb> parent)
: parent_(std::move(parent)) {} : parent_(std::move(parent)) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override; RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target, grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override; const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, void UpdateState(grpc_connectivity_state state,
@ -374,7 +375,8 @@ class XdsLb : public LoadBalancingPolicy {
explicit Helper(RefCountedPtr<LocalityEntry> entry) explicit Helper(RefCountedPtr<LocalityEntry> entry)
: entry_(std::move(entry)) {} : entry_(std::move(entry)) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override; RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target, grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override; const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, void UpdateState(grpc_connectivity_state state,
@ -579,7 +581,7 @@ bool XdsLb::FallbackHelper::CalledByCurrentFallback() const {
return child_ == parent_->fallback_policy_.get(); return child_ == parent_->fallback_policy_.get();
} }
Subchannel* XdsLb::FallbackHelper::CreateSubchannel( RefCountedPtr<SubchannelInterface> XdsLb::FallbackHelper::CreateSubchannel(
const grpc_channel_args& args) { const grpc_channel_args& args) {
if (parent_->shutting_down_ || if (parent_->shutting_down_ ||
(!CalledByPendingFallback() && !CalledByCurrentFallback())) { (!CalledByPendingFallback() && !CalledByCurrentFallback())) {
@ -1997,7 +1999,8 @@ bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByCurrentChild() const {
return child_ == entry_->child_policy_.get(); return child_ == entry_->child_policy_.get();
} }
Subchannel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel( RefCountedPtr<SubchannelInterface>
XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel(
const grpc_channel_args& args) { const grpc_channel_args& args) {
if (entry_->parent_->shutting_down_ || if (entry_->parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) { (!CalledByPendingChild() && !CalledByCurrentChild())) {

@ -106,7 +106,8 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
RefCountedPtr<ResolvingLoadBalancingPolicy> parent) RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
: parent_(std::move(parent)) {} : parent_(std::move(parent)) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override { RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override {
if (parent_->resolver_ == nullptr) return nullptr; // Shutting down. if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr; if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(args); return parent_->channel_control_helper()->CreateSubchannel(args);
@ -536,7 +537,7 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
if (process_resolver_result_ != nullptr) { if (process_resolver_result_ != nullptr) {
grpc_error* service_config_error = GRPC_ERROR_NONE; grpc_error* service_config_error = GRPC_ERROR_NONE;
service_config_changed = process_resolver_result_( service_config_changed = process_resolver_result_(
process_resolver_result_user_data_, &result, &lb_policy_name, process_resolver_result_user_data_, result, &lb_policy_name,
&lb_policy_config, &service_config_error); &lb_policy_config, &service_config_error);
if (service_config_error != GRPC_ERROR_NONE) { if (service_config_error != GRPC_ERROR_NONE) {
service_config_error_string = service_config_error_string =

@ -69,7 +69,8 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
// empty, it means that we don't have a valid service config to use, and we // empty, it means that we don't have a valid service config to use, and we
// should set the channel to be in TRANSIENT_FAILURE. // should set the channel to be in TRANSIENT_FAILURE.
typedef bool (*ProcessResolverResultCallback)( typedef bool (*ProcessResolverResultCallback)(
void* user_data, Resolver::Result* result, const char** lb_policy_name, void* user_data, const Resolver::Result& result,
const char** lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config, RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error); grpc_error** service_config_error);
// If error is set when this returns, then construction failed, and // If error is set when this returns, then construction failed, and

@ -83,7 +83,7 @@ ConnectedSubchannel::ConnectedSubchannel(
grpc_channel_stack* channel_stack, const grpc_channel_args* args, grpc_channel_stack* channel_stack, const grpc_channel_args* args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel, RefCountedPtr<channelz::SubchannelNode> channelz_subchannel,
intptr_t socket_uuid) intptr_t socket_uuid)
: RefCounted<ConnectedSubchannel>(&grpc_trace_stream_refcount), : ConnectedSubchannelInterface(&grpc_trace_stream_refcount),
channel_stack_(channel_stack), channel_stack_(channel_stack),
args_(grpc_channel_args_copy(args)), args_(grpc_channel_args_copy(args)),
channelz_subchannel_(std::move(channelz_subchannel)), channelz_subchannel_(std::move(channelz_subchannel)),
@ -376,25 +376,17 @@ class Subchannel::ConnectedSubchannelStateWatcher {
void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked( void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
UniquePtr<ConnectivityStateWatcher> watcher) { UniquePtr<ConnectivityStateWatcher> watcher) {
watcher->next_ = head_; watchers_.insert(MakePair(watcher.get(), std::move(watcher)));
head_ = watcher.release();
} }
void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked( void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
ConnectivityStateWatcher* watcher) { ConnectivityStateWatcher* watcher) {
for (ConnectivityStateWatcher** w = &head_; *w != nullptr; w = &(*w)->next_) { watchers_.erase(watcher);
if (*w == watcher) {
*w = watcher->next_;
Delete(watcher);
return;
}
}
GPR_UNREACHABLE_CODE(return );
} }
void Subchannel::ConnectivityStateWatcherList::NotifyLocked( void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
Subchannel* subchannel, grpc_connectivity_state state) { Subchannel* subchannel, grpc_connectivity_state state) {
for (ConnectivityStateWatcher* w = head_; w != nullptr; w = w->next_) { for (const auto& p : watchers_) {
RefCountedPtr<ConnectedSubchannel> connected_subchannel; RefCountedPtr<ConnectedSubchannel> connected_subchannel;
if (state == GRPC_CHANNEL_READY) { if (state == GRPC_CHANNEL_READY) {
connected_subchannel = subchannel->connected_subchannel_; connected_subchannel = subchannel->connected_subchannel_;
@ -407,15 +399,7 @@ void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
// the notification into the client_channel control-plane combiner // the notification into the client_channel control-plane combiner
// before processing it. But if we ever have any other callers here, // before processing it. But if we ever have any other callers here,
// we will probably need to change this. // we will probably need to change this.
w->OnConnectivityStateChange(state, std::move(connected_subchannel)); p.second->OnConnectivityStateChange(state, std::move(connected_subchannel));
}
}
void Subchannel::ConnectivityStateWatcherList::Clear() {
while (head_ != nullptr) {
ConnectivityStateWatcher* next = head_->next_;
Delete(head_);
head_ = next;
} }
} }

@ -23,6 +23,7 @@
#include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/connector.h" #include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/ext/filters/client_channel/subchannel_interface.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/backoff/backoff.h" #include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
@ -69,7 +70,7 @@ namespace grpc_core {
class SubchannelCall; class SubchannelCall;
class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> { class ConnectedSubchannel : public ConnectedSubchannelInterface {
public: public:
struct CallArgs { struct CallArgs {
grpc_polling_entity* pollent; grpc_polling_entity* pollent;
@ -96,7 +97,7 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
grpc_error** error); grpc_error** error);
grpc_channel_stack* channel_stack() const { return channel_stack_; } grpc_channel_stack* channel_stack() const { return channel_stack_; }
const grpc_channel_args* args() const { return args_; } const grpc_channel_args* args() const override { return args_; }
channelz::SubchannelNode* channelz_subchannel() const { channelz::SubchannelNode* channelz_subchannel() const {
return channelz_subchannel_.get(); return channelz_subchannel_.get();
} }
@ -176,37 +177,9 @@ class SubchannelCall {
// A subchannel that knows how to connect to exactly one target address. It // A subchannel that knows how to connect to exactly one target address. It
// provides a target for load balancing. // provides a target for load balancing.
class Subchannel { class Subchannel {
private:
class ConnectivityStateWatcherList; // Forward declaration.
public: public:
class ConnectivityStateWatcher { typedef SubchannelInterface::ConnectivityStateWatcher
public: ConnectivityStateWatcher;
virtual ~ConnectivityStateWatcher() = default;
// Will be invoked whenever the subchannel's connectivity state
// changes. There will be only one invocation of this method on a
// given watcher instance at any given time.
//
// When the state changes to READY, connected_subchannel will
// contain a ref to the connected subchannel. When it changes from
// READY to some other state, the implementation must release its
// ref to the connected subchannel.
virtual void OnConnectivityStateChange(
grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel) // NOLINT
GRPC_ABSTRACT;
virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
private:
// For access to next_.
friend class Subchannel::ConnectivityStateWatcherList;
ConnectivityStateWatcher* next_ = nullptr;
};
// The ctor and dtor are not intended to use directly. // The ctor and dtor are not intended to use directly.
Subchannel(SubchannelKey* key, grpc_connector* connector, Subchannel(SubchannelKey* key, grpc_connector* connector,
@ -296,12 +269,15 @@ class Subchannel {
// Notifies all watchers in the list about a change to state. // Notifies all watchers in the list about a change to state.
void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state); void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
void Clear(); void Clear() { watchers_.clear(); }
bool empty() const { return head_ == nullptr; } bool empty() const { return watchers_.empty(); }
private: private:
ConnectivityStateWatcher* head_ = nullptr; // TODO(roth): This could be a set instead of a map if we had a set
// implementation.
Map<ConnectivityStateWatcher*, UniquePtr<ConnectivityStateWatcher>>
watchers_;
}; };
// A map that tracks ConnectivityStateWatchers using a particular health // A map that tracks ConnectivityStateWatchers using a particular health

@ -0,0 +1,109 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INTERFACE_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INTERFACE_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
namespace grpc_core {
// TODO(roth): In a subsequent PR, remove this from this API.
class ConnectedSubchannelInterface
: public RefCounted<ConnectedSubchannelInterface> {
public:
virtual const grpc_channel_args* args() const GRPC_ABSTRACT;
protected:
template <typename TraceFlagT = TraceFlag>
explicit ConnectedSubchannelInterface(TraceFlagT* trace_flag = nullptr)
: RefCounted<ConnectedSubchannelInterface>(trace_flag) {}
};
class SubchannelInterface : public RefCounted<SubchannelInterface> {
public:
class ConnectivityStateWatcher {
public:
virtual ~ConnectivityStateWatcher() = default;
// Will be invoked whenever the subchannel's connectivity state
// changes. There will be only one invocation of this method on a
// given watcher instance at any given time.
//
// When the state changes to READY, connected_subchannel will
// contain a ref to the connected subchannel. When it changes from
// READY to some other state, the implementation must release its
// ref to the connected subchannel.
virtual void OnConnectivityStateChange(
grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannelInterface>
connected_subchannel) // NOLINT
GRPC_ABSTRACT;
// TODO(roth): Remove this as soon as we move to EventManager-based
// polling.
virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
virtual ~SubchannelInterface() = default;
// Returns the current connectivity state of the subchannel.
virtual grpc_connectivity_state CheckConnectivityState(
RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
GRPC_ABSTRACT;
// Starts watching the subchannel's connectivity state.
// The first callback to the watcher will be delivered when the
// subchannel's connectivity state becomes a value other than
// initial_state, which may happen immediately.
// Subsequent callbacks will be delivered as the subchannel's state
// changes.
// The watcher will be destroyed either when the subchannel is
// destroyed or when CancelConnectivityStateWatch() is called.
// There can be only one watcher of a given subchannel. It is not
// valid to call this method a second time without first cancelling
// the previous watcher using CancelConnectivityStateWatch().
virtual void WatchConnectivityState(
grpc_connectivity_state initial_state,
UniquePtr<ConnectivityStateWatcher> watcher) GRPC_ABSTRACT;
// Cancels a connectivity state watch.
// If the watcher has already been destroyed, this is a no-op.
virtual void CancelConnectivityStateWatch(ConnectivityStateWatcher* watcher)
GRPC_ABSTRACT;
// Attempt to connect to the backend. Has no effect if already connected.
virtual void AttemptToConnect() GRPC_ABSTRACT;
// TODO(roth): These methods should be removed from this interface to
// bettter hide grpc-specific functionality from the LB policy API.
virtual channelz::SubchannelNode* channelz_node() GRPC_ABSTRACT;
virtual void ResetBackoff() GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INTERFACE_H */

@ -143,7 +143,8 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
InterceptRecvTrailingMetadataCallback cb, void* user_data) InterceptRecvTrailingMetadataCallback cb, void* user_data)
: parent_(std::move(parent)), cb_(cb), user_data_(user_data) {} : parent_(std::move(parent)), cb_(cb), user_data_(user_data) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override { RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override {
return parent_->channel_control_helper()->CreateSubchannel(args); return parent_->channel_control_helper()->CreateSubchannel(args);
} }

@ -974,6 +974,7 @@ src/core/ext/filters/client_channel/service_config.cc \
src/core/ext/filters/client_channel/service_config.h \ src/core/ext/filters/client_channel/service_config.h \
src/core/ext/filters/client_channel/subchannel.cc \ src/core/ext/filters/client_channel/subchannel.cc \
src/core/ext/filters/client_channel/subchannel.h \ src/core/ext/filters/client_channel/subchannel.h \
src/core/ext/filters/client_channel/subchannel_interface.h \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \ src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.h \ src/core/ext/filters/client_channel/subchannel_pool_interface.h \
src/core/ext/filters/deadline/deadline_filter.cc \ src/core/ext/filters/deadline/deadline_filter.cc \

@ -8912,6 +8912,7 @@
"src/core/ext/filters/client_channel/server_address.h", "src/core/ext/filters/client_channel/server_address.h",
"src/core/ext/filters/client_channel/service_config.h", "src/core/ext/filters/client_channel/service_config.h",
"src/core/ext/filters/client_channel/subchannel.h", "src/core/ext/filters/client_channel/subchannel.h",
"src/core/ext/filters/client_channel/subchannel_interface.h",
"src/core/ext/filters/client_channel/subchannel_pool_interface.h" "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
], ],
"is_filegroup": true, "is_filegroup": true,
@ -8968,6 +8969,7 @@
"src/core/ext/filters/client_channel/service_config.h", "src/core/ext/filters/client_channel/service_config.h",
"src/core/ext/filters/client_channel/subchannel.cc", "src/core/ext/filters/client_channel/subchannel.cc",
"src/core/ext/filters/client_channel/subchannel.h", "src/core/ext/filters/client_channel/subchannel.h",
"src/core/ext/filters/client_channel/subchannel_interface.h",
"src/core/ext/filters/client_channel/subchannel_pool_interface.cc", "src/core/ext/filters/client_channel/subchannel_pool_interface.cc",
"src/core/ext/filters/client_channel/subchannel_pool_interface.h" "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
], ],

Loading…
Cancel
Save