Use SubchannelInterface to hide implementation from LB policy API.

pull/18917/head
Mark D. Roth 6 years ago
parent 9ecce9fd22
commit 28ccd61cf5
  1. 1
      BUILD
  2. 1
      BUILD.gn
  3. 1
      build.yaml
  4. 1
      gRPC-C++.podspec
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 1
      package.xml
  8. 123
      src/core/ext/filters/client_channel/client_channel.cc
  9. 8
      src/core/ext/filters/client_channel/lb_policy.h
  10. 6
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  11. 10
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  12. 7
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  13. 91
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  14. 11
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  15. 5
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  16. 3
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  17. 26
      src/core/ext/filters/client_channel/subchannel.cc
  18. 46
      src/core/ext/filters/client_channel/subchannel.h
  19. 109
      src/core/ext/filters/client_channel/subchannel_interface.h
  20. 3
      test/core/util/test_lb_policies.cc
  21. 1
      tools/doxygen/Doxyfile.core.internal
  22. 2
      tools/run_tests/generated/sources_and_headers.json

@ -1143,6 +1143,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/server_address.h",
"src/core/ext/filters/client_channel/service_config.h",
"src/core/ext/filters/client_channel/subchannel.h",
"src/core/ext/filters/client_channel/subchannel_interface.h",
"src/core/ext/filters/client_channel/subchannel_pool_interface.h",
],
language = "c++",

@ -345,6 +345,7 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/service_config.h",
"src/core/ext/filters/client_channel/subchannel.cc",
"src/core/ext/filters/client_channel/subchannel.h",
"src/core/ext/filters/client_channel/subchannel_interface.h",
"src/core/ext/filters/client_channel/subchannel_pool_interface.cc",
"src/core/ext/filters/client_channel/subchannel_pool_interface.h",
"src/core/ext/filters/deadline/deadline_filter.cc",

@ -595,6 +595,7 @@ filegroups:
- src/core/ext/filters/client_channel/server_address.h
- src/core/ext/filters/client_channel/service_config.h
- src/core/ext/filters/client_channel/subchannel.h
- src/core/ext/filters/client_channel/subchannel_interface.h
- src/core/ext/filters/client_channel/subchannel_pool_interface.h
src:
- src/core/ext/filters/client_channel/backup_poller.cc

@ -399,6 +399,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/server_address.h',
'src/core/ext/filters/client_channel/service_config.h',
'src/core/ext/filters/client_channel/subchannel.h',
'src/core/ext/filters/client_channel/subchannel_interface.h',
'src/core/ext/filters/client_channel/subchannel_pool_interface.h',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/client_channel/health/health.pb.h',

@ -371,6 +371,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/server_address.h',
'src/core/ext/filters/client_channel/service_config.h',
'src/core/ext/filters/client_channel/subchannel.h',
'src/core/ext/filters/client_channel/subchannel_interface.h',
'src/core/ext/filters/client_channel/subchannel_pool_interface.h',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/client_channel/health/health.pb.h',
@ -1023,6 +1024,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/server_address.h',
'src/core/ext/filters/client_channel/service_config.h',
'src/core/ext/filters/client_channel/subchannel.h',
'src/core/ext/filters/client_channel/subchannel_interface.h',
'src/core/ext/filters/client_channel/subchannel_pool_interface.h',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/client_channel/health/health.pb.h',

@ -305,6 +305,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/server_address.h )
s.files += %w( src/core/ext/filters/client_channel/service_config.h )
s.files += %w( src/core/ext/filters/client_channel/subchannel.h )
s.files += %w( src/core/ext/filters/client_channel/subchannel_interface.h )
s.files += %w( src/core/ext/filters/client_channel/subchannel_pool_interface.h )
s.files += %w( src/core/ext/filters/deadline/deadline_filter.h )
s.files += %w( src/core/ext/filters/client_channel/health/health.pb.h )

@ -310,6 +310,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/server_address.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/service_config.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/subchannel.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/subchannel_interface.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/subchannel_pool_interface.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/deadline/deadline_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/health/health.pb.h" role="src" />

@ -51,6 +51,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
@ -174,6 +175,7 @@ class ChannelData {
private:
class ConnectivityStateAndPickerSetter;
class ServiceConfigSetter;
class GrpcSubchannel;
class ClientChannelControlHelper;
class ExternalConnectivityWatcher {
@ -221,7 +223,7 @@ class ChannelData {
~ChannelData();
static bool ProcessResolverResultLocked(
void* arg, Resolver::Result* result, const char** lb_policy_name,
void* arg, const Resolver::Result& result, const char** lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error);
@ -270,6 +272,7 @@ class ChannelData {
OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy_;
grpc_connectivity_state_tracker state_tracker_;
ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
UniquePtr<char> health_check_service_name_;
RefCountedPtr<ServiceConfig> saved_service_config_;
bool received_first_resolver_result_ = false;
@ -954,6 +957,65 @@ void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
&self->chand_->state_tracker_, self->state_, &self->my_closure_);
}
//
// ChannelData::GrpcSubchannel
//
// This class is a wrapper for Subchannel that hides details of the
// channel's implementation (such as the health check service name) from
// the LB policy API.
//
// Note that no synchronization is needed here, because even if the
// underlying subchannel is shared between channels, this wrapper will only
// be used within one channel, so it will always be synchronized by the
// control plane combiner.
class ChannelData::GrpcSubchannel : public SubchannelInterface {
public:
GrpcSubchannel(Subchannel* subchannel,
UniquePtr<char> health_check_service_name)
: subchannel_(subchannel),
health_check_service_name_(std::move(health_check_service_name)) {}
~GrpcSubchannel() { GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB"); }
grpc_connectivity_state CheckConnectivityState(
RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
override {
RefCountedPtr<ConnectedSubchannel> tmp;
auto retval = subchannel_->CheckConnectivityState(
health_check_service_name_.get(), &tmp);
*connected_subchannel = std::move(tmp);
return retval;
}
void WatchConnectivityState(
grpc_connectivity_state initial_state,
UniquePtr<ConnectivityStateWatcher> watcher) override {
subchannel_->WatchConnectivityState(
initial_state,
UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
std::move(watcher));
}
void CancelConnectivityStateWatch(
ConnectivityStateWatcher* watcher) override {
subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
watcher);
}
void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
channelz::SubchannelNode* channelz_node() override {
return subchannel_->channelz_node();
}
void ResetBackoff() override { subchannel_->ResetBackoff(); }
private:
Subchannel* subchannel_;
UniquePtr<char> health_check_service_name_;
};
//
// ChannelData::ClientChannelControlHelper
//
@ -970,15 +1032,26 @@ class ChannelData::ClientChannelControlHelper
"ClientChannelControlHelper");
}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override {
bool inhibit_health_checking = grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
UniquePtr<char> health_check_service_name;
if (!inhibit_health_checking) {
health_check_service_name.reset(
gpr_strdup(chand_->health_check_service_name_.get()));
}
static const char* args_to_remove[] = {GRPC_ARG_INHIBIT_HEALTH_CHECKING};
grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
chand_->subchannel_pool_.get());
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(&args, &arg, 1);
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1);
Subchannel* subchannel =
chand_->client_channel_factory_->CreateSubchannel(new_args);
grpc_channel_args_destroy(new_args);
return subchannel;
if (subchannel == nullptr) return nullptr;
return MakeRefCounted<GrpcSubchannel>(subchannel,
std::move(health_check_service_name));
}
grpc_channel* CreateChannel(const char* target,
@ -1211,14 +1284,14 @@ void ChannelData::ProcessLbPolicy(
// Synchronous callback from ResolvingLoadBalancingPolicy to process a
// resolver result update.
bool ChannelData::ProcessResolverResultLocked(
void* arg, Resolver::Result* result, const char** lb_policy_name,
void* arg, const Resolver::Result& result, const char** lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error) {
ChannelData* chand = static_cast<ChannelData*>(arg);
RefCountedPtr<ServiceConfig> service_config;
// If resolver did not return a service config or returned an invalid service
// config, we need a fallback service config.
if (result->service_config_error != GRPC_ERROR_NONE) {
if (result.service_config_error != GRPC_ERROR_NONE) {
// If the service config was invalid, then fallback to the saved service
// config. If there is no saved config either, use the default service
// config.
@ -1239,7 +1312,7 @@ bool ChannelData::ProcessResolverResultLocked(
}
service_config = chand->default_service_config_;
}
} else if (result->service_config == nullptr) {
} else if (result.service_config == nullptr) {
if (chand->default_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
@ -1250,11 +1323,11 @@ bool ChannelData::ProcessResolverResultLocked(
service_config = chand->default_service_config_;
}
} else {
service_config = result->service_config;
service_config = result.service_config;
}
*service_config_error = GRPC_ERROR_REF(result->service_config_error);
*service_config_error = GRPC_ERROR_REF(result.service_config_error);
if (service_config == nullptr &&
result->service_config_error != GRPC_ERROR_NONE) {
result.service_config_error != GRPC_ERROR_NONE) {
return false;
}
// Process service config.
@ -1267,19 +1340,6 @@ bool ChannelData::ProcessResolverResultLocked(
service_config->GetGlobalParsedConfig(
internal::ClientChannelServiceConfigParser::ParserIndex()));
}
// TODO(roth): Eliminate this hack as part of hiding health check
// service name from LB policy API. As part of this, change the API
// for this function to pass in result as a const reference.
if (parsed_service_config != nullptr &&
parsed_service_config->health_check_service_name() != nullptr) {
grpc_arg new_arg = grpc_channel_arg_string_create(
const_cast<char*>("grpc.temp.health_check"),
const_cast<char*>(parsed_service_config->health_check_service_name()));
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(result->args, &new_arg, 1);
grpc_channel_args_destroy(result->args);
result->args = new_args;
}
// Check if the config has changed.
const bool service_config_changed =
((service_config == nullptr) !=
@ -1296,6 +1356,14 @@ bool ChannelData::ProcessResolverResultLocked(
"chand=%p: resolver returned updated service config: \"%s\"",
chand, service_config_json.get());
}
// Save health check service name.
if (service_config != nullptr) {
chand->health_check_service_name_.reset(
gpr_strdup(parsed_service_config->health_check_service_name()));
} else {
chand->health_check_service_name_.reset();
}
// Save service config.
chand->saved_service_config_ = std::move(service_config);
}
// We want to set the service config at least once. This should not really be
@ -1314,7 +1382,7 @@ bool ChannelData::ProcessResolverResultLocked(
chand->saved_service_config_);
}
UniquePtr<char> processed_lb_policy_name;
chand->ProcessLbPolicy(*result, parsed_service_config,
chand->ProcessLbPolicy(result, parsed_service_config,
&processed_lb_policy_name, lb_policy_config);
// Swap out the data used by GetChannelInfo().
{
@ -1336,8 +1404,9 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
LoadBalancingPolicy::PickResult result =
picker_->Pick(LoadBalancingPolicy::PickArgs());
if (result.connected_subchannel != nullptr) {
result.connected_subchannel->Ping(op->send_ping.on_initiate,
op->send_ping.on_ack);
ConnectedSubchannel* connected_subchannel =
static_cast<ConnectedSubchannel*>(result.connected_subchannel.get());
connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
} else {
if (result.error == GRPC_ERROR_NONE) {
result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(

@ -24,7 +24,7 @@
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_interface.h"
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -128,7 +128,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Used only if type is PICK_COMPLETE. Will be set to the selected
/// subchannel, or nullptr if the LB policy decides to drop the call.
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel;
/// Used only if type is PICK_TRANSIENT_FAILURE.
/// Error to be set when returning a transient failure.
@ -184,8 +184,8 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
virtual ~ChannelControlHelper() = default;
/// Creates a new subchannel with the specified channel args.
virtual Subchannel* CreateSubchannel(const grpc_channel_args& args)
GRPC_ABSTRACT;
virtual RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) GRPC_ABSTRACT;
/// Creates a channel with the specified target and channel args.
/// This can be used in cases where the LB policy needs to create a

@ -293,7 +293,8 @@ class GrpcLb : public LoadBalancingPolicy {
explicit Helper(RefCountedPtr<GrpcLb> parent)
: parent_(std::move(parent)) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state,
@ -620,7 +621,8 @@ bool GrpcLb::Helper::CalledByCurrentChild() const {
return child_ == parent_->child_policy_.get();
}
Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
const grpc_channel_args& args) {
if (parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;

@ -68,8 +68,9 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData(
SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
subchannel_list,
const ServerAddress& address, Subchannel* subchannel)
: SubchannelData(subchannel_list, address, subchannel) {}
const ServerAddress& address,
RefCountedPtr<SubchannelInterface> subchannel)
: SubchannelData(subchannel_list, address, std::move(subchannel)) {}
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override;
@ -112,7 +113,8 @@ class PickFirst : public LoadBalancingPolicy {
class Picker : public SubchannelPicker {
public:
explicit Picker(RefCountedPtr<ConnectedSubchannel> connected_subchannel)
explicit Picker(
RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel)
: connected_subchannel_(std::move(connected_subchannel)) {}
PickResult Pick(PickArgs args) override {
@ -123,7 +125,7 @@ class PickFirst : public LoadBalancingPolicy {
}
private:
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
};
// Helper class to ensure that any function that modifies the child refs

@ -83,8 +83,9 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobinSubchannelData(
SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
subchannel_list,
const ServerAddress& address, Subchannel* subchannel)
: SubchannelData(subchannel_list, address, subchannel) {}
const ServerAddress& address,
RefCountedPtr<SubchannelInterface> subchannel)
: SubchannelData(subchannel_list, address, std::move(subchannel)) {}
grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_;
@ -156,7 +157,7 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobin* parent_;
size_t last_picked_index_;
InlinedVector<RefCountedPtr<ConnectedSubchannel>, 10> subchannels_;
InlinedVector<RefCountedPtr<ConnectedSubchannelInterface>, 10> subchannels_;
};
// Helper class to ensure that any function that modifies the child refs

@ -27,7 +27,10 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/server_address.h"
// TODO(roth): Should not need the include of subchannel.h here, since
// that implementation should be hidden from the LB policy API.
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_interface.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/abstract.h"
@ -88,11 +91,11 @@ class SubchannelData {
}
// Returns a pointer to the subchannel.
Subchannel* subchannel() const { return subchannel_; }
SubchannelInterface* subchannel() const { return subchannel_.get(); }
// Returns the connected subchannel. Will be null if the subchannel
// is not connected.
ConnectedSubchannel* connected_subchannel() const {
ConnectedSubchannelInterface* connected_subchannel() const {
return connected_subchannel_.get();
}
@ -102,8 +105,8 @@ class SubchannelData {
// calling CancelConnectivityWatchLocked()).
grpc_connectivity_state CheckConnectivityStateLocked() {
GPR_ASSERT(pending_watcher_ == nullptr);
connectivity_state_ = subchannel()->CheckConnectivityState(
subchannel_list_->health_check_service_name(), &connected_subchannel_);
connectivity_state_ =
subchannel()->CheckConnectivityState(&connected_subchannel_);
return connectivity_state_;
}
@ -128,7 +131,8 @@ class SubchannelData {
protected:
SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
const ServerAddress& address, Subchannel* subchannel);
const ServerAddress& address,
RefCountedPtr<SubchannelInterface> subchannel);
virtual ~SubchannelData();
@ -140,7 +144,7 @@ class SubchannelData {
private:
// Watcher for subchannel connectivity state.
class Watcher : public Subchannel::ConnectivityStateWatcher {
class Watcher : public SubchannelInterface::ConnectivityStateWatcher {
public:
Watcher(
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
@ -150,9 +154,9 @@ class SubchannelData {
~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); }
void OnConnectivityStateChange(
grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel) override;
void OnConnectivityStateChange(grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannelInterface>
connected_subchannel) override;
grpc_pollset_set* interested_parties() override {
return subchannel_list_->policy()->interested_parties();
@ -169,7 +173,7 @@ class SubchannelData {
RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
subchannel_list,
grpc_connectivity_state state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel);
RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel);
~Updater() {
subchannel_list_.reset(DEBUG_LOCATION, "Watcher::Updater dtor");
@ -182,7 +186,7 @@ class SubchannelData {
RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
subchannel_list_;
const grpc_connectivity_state state_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
grpc_closure closure_;
};
@ -196,12 +200,12 @@ class SubchannelData {
// Backpointer to owning subchannel list. Not owned.
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
// The subchannel.
Subchannel* subchannel_;
RefCountedPtr<SubchannelInterface> subchannel_;
// Will be non-null when the subchannel's state is being watched.
Subchannel::ConnectivityStateWatcher* pending_watcher_ = nullptr;
SubchannelInterface::ConnectivityStateWatcher* pending_watcher_ = nullptr;
// Data updated by the watcher.
grpc_connectivity_state connectivity_state_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
};
// A list of subchannels.
@ -235,9 +239,6 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
// Accessors.
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }
const char* health_check_service_name() const {
return health_check_service_name_.get();
}
// Resets connection backoff of all subchannels.
// TODO(roth): We will probably need to rethink this as part of moving
@ -275,8 +276,6 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
TraceFlag* tracer_;
UniquePtr<char> health_check_service_name_;
grpc_combiner* combiner_;
// The list of subchannels.
@ -300,7 +299,7 @@ template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
OnConnectivityStateChange(
grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel) {
// Will delete itself.
New<Updater>(subchannel_data_,
subchannel_list_->Ref(DEBUG_LOCATION, "Watcher::Updater"),
@ -314,7 +313,7 @@ SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
subchannel_list,
grpc_connectivity_state state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel)
RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel)
: subchannel_data_(subchannel_data),
subchannel_list_(std::move(subchannel_list)),
state_(state),
@ -336,7 +335,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
"connected_subchannel=%p, shutting_down=%d, pending_watcher=%p",
sd->subchannel_list_->tracer()->name(),
sd->subchannel_list_->policy(), sd->subchannel_list_, sd->Index(),
sd->subchannel_list_->num_subchannels(), sd->subchannel_,
sd->subchannel_list_->num_subchannels(), sd->subchannel_.get(),
grpc_connectivity_state_name(self->state_),
self->connected_subchannel_.get(),
sd->subchannel_list_->shutting_down(), sd->pending_watcher_);
@ -360,9 +359,9 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
const ServerAddress& address, Subchannel* subchannel)
const ServerAddress& address, RefCountedPtr<SubchannelInterface> subchannel)
: subchannel_list_(subchannel_list),
subchannel_(subchannel),
subchannel_(std::move(subchannel)),
// We assume that the current state is IDLE. If not, we'll get a
// callback telling us that.
connectivity_state_(GRPC_CHANNEL_IDLE) {}
@ -382,10 +381,9 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
" (subchannel %p): unreffing subchannel",
subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_);
subchannel_.get());
}
GRPC_SUBCHANNEL_UNREF(subchannel_, reason);
subchannel_ = nullptr;
subchannel_.reset();
connected_subchannel_.reset();
}
}
@ -407,16 +405,16 @@ void SubchannelData<SubchannelListType,
" (subchannel %p): starting watch (from %s)",
subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_, grpc_connectivity_state_name(connectivity_state_));
subchannel_.get(),
grpc_connectivity_state_name(connectivity_state_));
}
GPR_ASSERT(pending_watcher_ == nullptr);
pending_watcher_ =
New<Watcher>(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
subchannel_->WatchConnectivityState(
connectivity_state_,
UniquePtr<char>(
gpr_strdup(subchannel_list_->health_check_service_name())),
UniquePtr<Subchannel::ConnectivityStateWatcher>(pending_watcher_));
UniquePtr<SubchannelInterface::ConnectivityStateWatcher>(
pending_watcher_));
}
template <typename SubchannelListType, typename SubchannelDataType>
@ -428,11 +426,10 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
" (subchannel %p): canceling connectivity watch (%s)",
subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_, reason);
subchannel_.get(), reason);
}
if (pending_watcher_ != nullptr) {
subchannel_->CancelConnectivityStateWatch(
subchannel_list_->health_check_service_name(), pending_watcher_);
subchannel_->CancelConnectivityStateWatch(pending_watcher_);
pending_watcher_ = nullptr;
}
}
@ -463,25 +460,12 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
tracer_->name(), policy, this, addresses.size());
}
subchannels_.reserve(addresses.size());
// Find health check service name.
const bool inhibit_health_checking = grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
if (!inhibit_health_checking) {
const char* health_check_service_name = grpc_channel_arg_get_string(
grpc_channel_args_find(&args, "grpc.temp.health_check"));
if (health_check_service_name != nullptr) {
health_check_service_name_.reset(gpr_strdup(health_check_service_name));
}
}
// We need to remove the LB addresses in order to be able to compare the
// subchannel keys of subchannels from a different batch of addresses.
// We also remove the health-checking-related args, since we are
// handling that here.
// We remove the service config, since it will be passed into the
// subchannel via call context.
static const char* keys_to_remove[] = {
GRPC_ARG_SUBCHANNEL_ADDRESS, "grpc.temp.health_check",
GRPC_ARG_INHIBIT_HEALTH_CHECKING, GRPC_ARG_SERVICE_CONFIG};
static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
GRPC_ARG_SERVICE_CONFIG};
// Create a subchannel for each address.
for (size_t i = 0; i < addresses.size(); i++) {
// TODO(roth): we should ideally hide this from the LB policy code. In
@ -504,7 +488,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
&args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
args_to_add.data(), args_to_add.size());
gpr_free(args_to_add[subchannel_address_arg_index].value.string);
Subchannel* subchannel = helper->CreateSubchannel(*new_args);
RefCountedPtr<SubchannelInterface> subchannel =
helper->CreateSubchannel(*new_args);
grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) {
// Subchannel could not be created.
@ -523,11 +508,11 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR
": Created subchannel %p for address uri %s",
tracer_->name(), policy_, this, subchannels_.size(), subchannel,
address_uri);
tracer_->name(), policy_, this, subchannels_.size(),
subchannel.get(), address_uri);
gpr_free(address_uri);
}
subchannels_.emplace_back(this, addresses[i], subchannel);
subchannels_.emplace_back(this, addresses[i], std::move(subchannel));
}
}

@ -334,7 +334,8 @@ class XdsLb : public LoadBalancingPolicy {
explicit FallbackHelper(RefCountedPtr<XdsLb> parent)
: parent_(std::move(parent)) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state,
@ -374,7 +375,8 @@ class XdsLb : public LoadBalancingPolicy {
explicit Helper(RefCountedPtr<LocalityEntry> entry)
: entry_(std::move(entry)) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state,
@ -579,7 +581,7 @@ bool XdsLb::FallbackHelper::CalledByCurrentFallback() const {
return child_ == parent_->fallback_policy_.get();
}
Subchannel* XdsLb::FallbackHelper::CreateSubchannel(
RefCountedPtr<SubchannelInterface> XdsLb::FallbackHelper::CreateSubchannel(
const grpc_channel_args& args) {
if (parent_->shutting_down_ ||
(!CalledByPendingFallback() && !CalledByCurrentFallback())) {
@ -1997,7 +1999,8 @@ bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByCurrentChild() const {
return child_ == entry_->child_policy_.get();
}
Subchannel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel(
RefCountedPtr<SubchannelInterface>
XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel(
const grpc_channel_args& args) {
if (entry_->parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {

@ -106,7 +106,8 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
: parent_(std::move(parent)) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override {
if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(args);
@ -536,7 +537,7 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
if (process_resolver_result_ != nullptr) {
grpc_error* service_config_error = GRPC_ERROR_NONE;
service_config_changed = process_resolver_result_(
process_resolver_result_user_data_, &result, &lb_policy_name,
process_resolver_result_user_data_, result, &lb_policy_name,
&lb_policy_config, &service_config_error);
if (service_config_error != GRPC_ERROR_NONE) {
service_config_error_string =

@ -69,7 +69,8 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
// empty, it means that we don't have a valid service config to use, and we
// should set the channel to be in TRANSIENT_FAILURE.
typedef bool (*ProcessResolverResultCallback)(
void* user_data, Resolver::Result* result, const char** lb_policy_name,
void* user_data, const Resolver::Result& result,
const char** lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error);
// If error is set when this returns, then construction failed, and

@ -83,7 +83,7 @@ ConnectedSubchannel::ConnectedSubchannel(
grpc_channel_stack* channel_stack, const grpc_channel_args* args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel,
intptr_t socket_uuid)
: RefCounted<ConnectedSubchannel>(&grpc_trace_stream_refcount),
: ConnectedSubchannelInterface(&grpc_trace_stream_refcount),
channel_stack_(channel_stack),
args_(grpc_channel_args_copy(args)),
channelz_subchannel_(std::move(channelz_subchannel)),
@ -376,25 +376,17 @@ class Subchannel::ConnectedSubchannelStateWatcher {
void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
UniquePtr<ConnectivityStateWatcher> watcher) {
watcher->next_ = head_;
head_ = watcher.release();
watchers_.insert(MakePair(watcher.get(), std::move(watcher)));
}
void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
ConnectivityStateWatcher* watcher) {
for (ConnectivityStateWatcher** w = &head_; *w != nullptr; w = &(*w)->next_) {
if (*w == watcher) {
*w = watcher->next_;
Delete(watcher);
return;
}
}
GPR_UNREACHABLE_CODE(return );
watchers_.erase(watcher);
}
void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
Subchannel* subchannel, grpc_connectivity_state state) {
for (ConnectivityStateWatcher* w = head_; w != nullptr; w = w->next_) {
for (const auto& p : watchers_) {
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
if (state == GRPC_CHANNEL_READY) {
connected_subchannel = subchannel->connected_subchannel_;
@ -407,15 +399,7 @@ void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
// the notification into the client_channel control-plane combiner
// before processing it. But if we ever have any other callers here,
// we will probably need to change this.
w->OnConnectivityStateChange(state, std::move(connected_subchannel));
}
}
void Subchannel::ConnectivityStateWatcherList::Clear() {
while (head_ != nullptr) {
ConnectivityStateWatcher* next = head_->next_;
Delete(head_);
head_ = next;
p.second->OnConnectivityStateChange(state, std::move(connected_subchannel));
}
}

@ -23,6 +23,7 @@
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/ext/filters/client_channel/subchannel_interface.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_stack.h"
@ -69,7 +70,7 @@ namespace grpc_core {
class SubchannelCall;
class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
class ConnectedSubchannel : public ConnectedSubchannelInterface {
public:
struct CallArgs {
grpc_polling_entity* pollent;
@ -96,7 +97,7 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
grpc_error** error);
grpc_channel_stack* channel_stack() const { return channel_stack_; }
const grpc_channel_args* args() const { return args_; }
const grpc_channel_args* args() const override { return args_; }
channelz::SubchannelNode* channelz_subchannel() const {
return channelz_subchannel_.get();
}
@ -176,37 +177,9 @@ class SubchannelCall {
// A subchannel that knows how to connect to exactly one target address. It
// provides a target for load balancing.
class Subchannel {
private:
class ConnectivityStateWatcherList; // Forward declaration.
public:
class ConnectivityStateWatcher {
public:
virtual ~ConnectivityStateWatcher() = default;
// Will be invoked whenever the subchannel's connectivity state
// changes. There will be only one invocation of this method on a
// given watcher instance at any given time.
//
// When the state changes to READY, connected_subchannel will
// contain a ref to the connected subchannel. When it changes from
// READY to some other state, the implementation must release its
// ref to the connected subchannel.
virtual void OnConnectivityStateChange(
grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel) // NOLINT
GRPC_ABSTRACT;
virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
private:
// For access to next_.
friend class Subchannel::ConnectivityStateWatcherList;
ConnectivityStateWatcher* next_ = nullptr;
};
typedef SubchannelInterface::ConnectivityStateWatcher
ConnectivityStateWatcher;
// The ctor and dtor are not intended to use directly.
Subchannel(SubchannelKey* key, grpc_connector* connector,
@ -296,12 +269,15 @@ class Subchannel {
// Notifies all watchers in the list about a change to state.
void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
void Clear();
void Clear() { watchers_.clear(); }
bool empty() const { return head_ == nullptr; }
bool empty() const { return watchers_.empty(); }
private:
ConnectivityStateWatcher* head_ = nullptr;
// TODO(roth): This could be a set instead of a map if we had a set
// implementation.
Map<ConnectivityStateWatcher*, UniquePtr<ConnectivityStateWatcher>>
watchers_;
};
// A map that tracks ConnectivityStateWatchers using a particular health

@ -0,0 +1,109 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INTERFACE_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INTERFACE_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
namespace grpc_core {
// TODO(roth): In a subsequent PR, remove this from this API.
class ConnectedSubchannelInterface
: public RefCounted<ConnectedSubchannelInterface> {
public:
virtual const grpc_channel_args* args() const GRPC_ABSTRACT;
protected:
template <typename TraceFlagT = TraceFlag>
explicit ConnectedSubchannelInterface(TraceFlagT* trace_flag = nullptr)
: RefCounted<ConnectedSubchannelInterface>(trace_flag) {}
};
class SubchannelInterface : public RefCounted<SubchannelInterface> {
public:
class ConnectivityStateWatcher {
public:
virtual ~ConnectivityStateWatcher() = default;
// Will be invoked whenever the subchannel's connectivity state
// changes. There will be only one invocation of this method on a
// given watcher instance at any given time.
//
// When the state changes to READY, connected_subchannel will
// contain a ref to the connected subchannel. When it changes from
// READY to some other state, the implementation must release its
// ref to the connected subchannel.
virtual void OnConnectivityStateChange(
grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannelInterface>
connected_subchannel) // NOLINT
GRPC_ABSTRACT;
// TODO(roth): Remove this as soon as we move to EventManager-based
// polling.
virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
virtual ~SubchannelInterface() = default;
// Returns the current connectivity state of the subchannel.
virtual grpc_connectivity_state CheckConnectivityState(
RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
GRPC_ABSTRACT;
// Starts watching the subchannel's connectivity state.
// The first callback to the watcher will be delivered when the
// subchannel's connectivity state becomes a value other than
// initial_state, which may happen immediately.
// Subsequent callbacks will be delivered as the subchannel's state
// changes.
// The watcher will be destroyed either when the subchannel is
// destroyed or when CancelConnectivityStateWatch() is called.
// There can be only one watcher of a given subchannel. It is not
// valid to call this method a second time without first cancelling
// the previous watcher using CancelConnectivityStateWatch().
virtual void WatchConnectivityState(
grpc_connectivity_state initial_state,
UniquePtr<ConnectivityStateWatcher> watcher) GRPC_ABSTRACT;
// Cancels a connectivity state watch.
// If the watcher has already been destroyed, this is a no-op.
virtual void CancelConnectivityStateWatch(ConnectivityStateWatcher* watcher)
GRPC_ABSTRACT;
// Attempt to connect to the backend. Has no effect if already connected.
virtual void AttemptToConnect() GRPC_ABSTRACT;
// TODO(roth): These methods should be removed from this interface to
// bettter hide grpc-specific functionality from the LB policy API.
virtual channelz::SubchannelNode* channelz_node() GRPC_ABSTRACT;
virtual void ResetBackoff() GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INTERFACE_H */

@ -143,7 +143,8 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
InterceptRecvTrailingMetadataCallback cb, void* user_data)
: parent_(std::move(parent)), cb_(cb), user_data_(user_data) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override {
return parent_->channel_control_helper()->CreateSubchannel(args);
}

@ -974,6 +974,7 @@ src/core/ext/filters/client_channel/service_config.cc \
src/core/ext/filters/client_channel/service_config.h \
src/core/ext/filters/client_channel/subchannel.cc \
src/core/ext/filters/client_channel/subchannel.h \
src/core/ext/filters/client_channel/subchannel_interface.h \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.h \
src/core/ext/filters/deadline/deadline_filter.cc \

@ -8912,6 +8912,7 @@
"src/core/ext/filters/client_channel/server_address.h",
"src/core/ext/filters/client_channel/service_config.h",
"src/core/ext/filters/client_channel/subchannel.h",
"src/core/ext/filters/client_channel/subchannel_interface.h",
"src/core/ext/filters/client_channel/subchannel_pool_interface.h"
],
"is_filegroup": true,
@ -8968,6 +8969,7 @@
"src/core/ext/filters/client_channel/service_config.h",
"src/core/ext/filters/client_channel/subchannel.cc",
"src/core/ext/filters/client_channel/subchannel.h",
"src/core/ext/filters/client_channel/subchannel_interface.h",
"src/core/ext/filters/client_channel/subchannel_pool_interface.cc",
"src/core/ext/filters/client_channel/subchannel_pool_interface.h"
],

Loading…
Cancel
Save