diff --git a/BUILD b/BUILD index 63744760bfd..e0827d46cb1 100644 --- a/BUILD +++ b/BUILD @@ -1143,6 +1143,7 @@ grpc_cc_library( "src/core/ext/filters/client_channel/server_address.h", "src/core/ext/filters/client_channel/service_config.h", "src/core/ext/filters/client_channel/subchannel.h", + "src/core/ext/filters/client_channel/subchannel_interface.h", "src/core/ext/filters/client_channel/subchannel_pool_interface.h", ], language = "c++", diff --git a/BUILD.gn b/BUILD.gn index 14e5c3f0b5f..258cdc9f873 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -345,6 +345,7 @@ config("grpc_config") { "src/core/ext/filters/client_channel/service_config.h", "src/core/ext/filters/client_channel/subchannel.cc", "src/core/ext/filters/client_channel/subchannel.h", + "src/core/ext/filters/client_channel/subchannel_interface.h", "src/core/ext/filters/client_channel/subchannel_pool_interface.cc", "src/core/ext/filters/client_channel/subchannel_pool_interface.h", "src/core/ext/filters/deadline/deadline_filter.cc", diff --git a/build.yaml b/build.yaml index 529afecbb33..5c085f18787 100644 --- a/build.yaml +++ b/build.yaml @@ -595,6 +595,7 @@ filegroups: - src/core/ext/filters/client_channel/server_address.h - src/core/ext/filters/client_channel/service_config.h - src/core/ext/filters/client_channel/subchannel.h + - src/core/ext/filters/client_channel/subchannel_interface.h - src/core/ext/filters/client_channel/subchannel_pool_interface.h src: - src/core/ext/filters/client_channel/backup_poller.cc diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 0ce31532629..3b463cf7b90 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -399,6 +399,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/server_address.h', 'src/core/ext/filters/client_channel/service_config.h', 'src/core/ext/filters/client_channel/subchannel.h', + 'src/core/ext/filters/client_channel/subchannel_interface.h', 'src/core/ext/filters/client_channel/subchannel_pool_interface.h', 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/client_channel/health/health.pb.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 8ad00aad096..85add8fb7a2 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -371,6 +371,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/server_address.h', 'src/core/ext/filters/client_channel/service_config.h', 'src/core/ext/filters/client_channel/subchannel.h', + 'src/core/ext/filters/client_channel/subchannel_interface.h', 'src/core/ext/filters/client_channel/subchannel_pool_interface.h', 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/client_channel/health/health.pb.h', @@ -1023,6 +1024,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/server_address.h', 'src/core/ext/filters/client_channel/service_config.h', 'src/core/ext/filters/client_channel/subchannel.h', + 'src/core/ext/filters/client_channel/subchannel_interface.h', 'src/core/ext/filters/client_channel/subchannel_pool_interface.h', 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/client_channel/health/health.pb.h', diff --git a/grpc.gemspec b/grpc.gemspec index 0bbf10fb361..e9d215bf10f 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -305,6 +305,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/server_address.h ) s.files += %w( src/core/ext/filters/client_channel/service_config.h ) s.files += %w( src/core/ext/filters/client_channel/subchannel.h ) + s.files += %w( src/core/ext/filters/client_channel/subchannel_interface.h ) s.files += %w( src/core/ext/filters/client_channel/subchannel_pool_interface.h ) s.files += %w( src/core/ext/filters/deadline/deadline_filter.h ) s.files += %w( src/core/ext/filters/client_channel/health/health.pb.h ) diff --git a/package.xml b/package.xml index eca74c8f167..fd712698a62 100644 --- a/package.xml +++ b/package.xml @@ -310,6 +310,7 @@ + diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 5717d3e66d2..6b6045c98f6 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -51,6 +51,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" @@ -174,6 +175,7 @@ class ChannelData { private: class ConnectivityStateAndPickerSetter; class ServiceConfigSetter; + class GrpcSubchannel; class ClientChannelControlHelper; class ExternalConnectivityWatcher { @@ -221,7 +223,7 @@ class ChannelData { ~ChannelData(); static bool ProcessResolverResultLocked( - void* arg, Resolver::Result* result, const char** lb_policy_name, + void* arg, const Resolver::Result& result, const char** lb_policy_name, RefCountedPtr* lb_policy_config, grpc_error** service_config_error); @@ -270,6 +272,7 @@ class ChannelData { OrphanablePtr resolving_lb_policy_; grpc_connectivity_state_tracker state_tracker_; ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_; + UniquePtr health_check_service_name_; RefCountedPtr saved_service_config_; bool received_first_resolver_result_ = false; @@ -954,6 +957,65 @@ void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked( &self->chand_->state_tracker_, self->state_, &self->my_closure_); } +// +// ChannelData::GrpcSubchannel +// + +// This class is a wrapper for Subchannel that hides details of the +// channel's implementation (such as the health check service name) from +// the LB policy API. +// +// Note that no synchronization is needed here, because even if the +// underlying subchannel is shared between channels, this wrapper will only +// be used within one channel, so it will always be synchronized by the +// control plane combiner. +class ChannelData::GrpcSubchannel : public SubchannelInterface { + public: + GrpcSubchannel(Subchannel* subchannel, + UniquePtr health_check_service_name) + : subchannel_(subchannel), + health_check_service_name_(std::move(health_check_service_name)) {} + + ~GrpcSubchannel() { GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB"); } + + grpc_connectivity_state CheckConnectivityState( + RefCountedPtr* connected_subchannel) + override { + RefCountedPtr tmp; + auto retval = subchannel_->CheckConnectivityState( + health_check_service_name_.get(), &tmp); + *connected_subchannel = std::move(tmp); + return retval; + } + + void WatchConnectivityState( + grpc_connectivity_state initial_state, + UniquePtr watcher) override { + subchannel_->WatchConnectivityState( + initial_state, + UniquePtr(gpr_strdup(health_check_service_name_.get())), + std::move(watcher)); + } + + void CancelConnectivityStateWatch( + ConnectivityStateWatcher* watcher) override { + subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(), + watcher); + } + + void AttemptToConnect() override { subchannel_->AttemptToConnect(); } + + channelz::SubchannelNode* channelz_node() override { + return subchannel_->channelz_node(); + } + + void ResetBackoff() override { subchannel_->ResetBackoff(); } + + private: + Subchannel* subchannel_; + UniquePtr health_check_service_name_; +}; + // // ChannelData::ClientChannelControlHelper // @@ -970,15 +1032,26 @@ class ChannelData::ClientChannelControlHelper "ClientChannelControlHelper"); } - Subchannel* CreateSubchannel(const grpc_channel_args& args) override { + RefCountedPtr CreateSubchannel( + const grpc_channel_args& args) override { + bool inhibit_health_checking = grpc_channel_arg_get_bool( + grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false); + UniquePtr health_check_service_name; + if (!inhibit_health_checking) { + health_check_service_name.reset( + gpr_strdup(chand_->health_check_service_name_.get())); + } + static const char* args_to_remove[] = {GRPC_ARG_INHIBIT_HEALTH_CHECKING}; grpc_arg arg = SubchannelPoolInterface::CreateChannelArg( chand_->subchannel_pool_.get()); - grpc_channel_args* new_args = - grpc_channel_args_copy_and_add(&args, &arg, 1); + grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( + &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1); Subchannel* subchannel = chand_->client_channel_factory_->CreateSubchannel(new_args); grpc_channel_args_destroy(new_args); - return subchannel; + if (subchannel == nullptr) return nullptr; + return MakeRefCounted(subchannel, + std::move(health_check_service_name)); } grpc_channel* CreateChannel(const char* target, @@ -1211,14 +1284,14 @@ void ChannelData::ProcessLbPolicy( // Synchronous callback from ResolvingLoadBalancingPolicy to process a // resolver result update. bool ChannelData::ProcessResolverResultLocked( - void* arg, Resolver::Result* result, const char** lb_policy_name, + void* arg, const Resolver::Result& result, const char** lb_policy_name, RefCountedPtr* lb_policy_config, grpc_error** service_config_error) { ChannelData* chand = static_cast(arg); RefCountedPtr service_config; // If resolver did not return a service config or returned an invalid service // config, we need a fallback service config. - if (result->service_config_error != GRPC_ERROR_NONE) { + if (result.service_config_error != GRPC_ERROR_NONE) { // If the service config was invalid, then fallback to the saved service // config. If there is no saved config either, use the default service // config. @@ -1239,7 +1312,7 @@ bool ChannelData::ProcessResolverResultLocked( } service_config = chand->default_service_config_; } - } else if (result->service_config == nullptr) { + } else if (result.service_config == nullptr) { if (chand->default_service_config_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, @@ -1250,11 +1323,11 @@ bool ChannelData::ProcessResolverResultLocked( service_config = chand->default_service_config_; } } else { - service_config = result->service_config; + service_config = result.service_config; } - *service_config_error = GRPC_ERROR_REF(result->service_config_error); + *service_config_error = GRPC_ERROR_REF(result.service_config_error); if (service_config == nullptr && - result->service_config_error != GRPC_ERROR_NONE) { + result.service_config_error != GRPC_ERROR_NONE) { return false; } // Process service config. @@ -1267,19 +1340,6 @@ bool ChannelData::ProcessResolverResultLocked( service_config->GetGlobalParsedConfig( internal::ClientChannelServiceConfigParser::ParserIndex())); } - // TODO(roth): Eliminate this hack as part of hiding health check - // service name from LB policy API. As part of this, change the API - // for this function to pass in result as a const reference. - if (parsed_service_config != nullptr && - parsed_service_config->health_check_service_name() != nullptr) { - grpc_arg new_arg = grpc_channel_arg_string_create( - const_cast("grpc.temp.health_check"), - const_cast(parsed_service_config->health_check_service_name())); - grpc_channel_args* new_args = - grpc_channel_args_copy_and_add(result->args, &new_arg, 1); - grpc_channel_args_destroy(result->args); - result->args = new_args; - } // Check if the config has changed. const bool service_config_changed = ((service_config == nullptr) != @@ -1296,6 +1356,14 @@ bool ChannelData::ProcessResolverResultLocked( "chand=%p: resolver returned updated service config: \"%s\"", chand, service_config_json.get()); } + // Save health check service name. + if (service_config != nullptr) { + chand->health_check_service_name_.reset( + gpr_strdup(parsed_service_config->health_check_service_name())); + } else { + chand->health_check_service_name_.reset(); + } + // Save service config. chand->saved_service_config_ = std::move(service_config); } // We want to set the service config at least once. This should not really be @@ -1314,7 +1382,7 @@ bool ChannelData::ProcessResolverResultLocked( chand->saved_service_config_); } UniquePtr processed_lb_policy_name; - chand->ProcessLbPolicy(*result, parsed_service_config, + chand->ProcessLbPolicy(result, parsed_service_config, &processed_lb_policy_name, lb_policy_config); // Swap out the data used by GetChannelInfo(). { @@ -1336,8 +1404,9 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { LoadBalancingPolicy::PickResult result = picker_->Pick(LoadBalancingPolicy::PickArgs()); if (result.connected_subchannel != nullptr) { - result.connected_subchannel->Ping(op->send_ping.on_initiate, - op->send_ping.on_ack); + ConnectedSubchannel* connected_subchannel = + static_cast(result.connected_subchannel.get()); + connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack); } else { if (result.error == GRPC_ERROR_NONE) { result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 5920254a9ef..2cadcc31998 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -24,7 +24,7 @@ #include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/service_config.h" -#include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/client_channel/subchannel_interface.h" #include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -128,7 +128,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Used only if type is PICK_COMPLETE. Will be set to the selected /// subchannel, or nullptr if the LB policy decides to drop the call. - RefCountedPtr connected_subchannel; + RefCountedPtr connected_subchannel; /// Used only if type is PICK_TRANSIENT_FAILURE. /// Error to be set when returning a transient failure. @@ -184,8 +184,8 @@ class LoadBalancingPolicy : public InternallyRefCounted { virtual ~ChannelControlHelper() = default; /// Creates a new subchannel with the specified channel args. - virtual Subchannel* CreateSubchannel(const grpc_channel_args& args) - GRPC_ABSTRACT; + virtual RefCountedPtr CreateSubchannel( + const grpc_channel_args& args) GRPC_ABSTRACT; /// Creates a channel with the specified target and channel args. /// This can be used in cases where the LB policy needs to create a diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 2c652e4c6e6..a3a2a44eb0e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -293,7 +293,8 @@ class GrpcLb : public LoadBalancingPolicy { explicit Helper(RefCountedPtr parent) : parent_(std::move(parent)) {} - Subchannel* CreateSubchannel(const grpc_channel_args& args) override; + RefCountedPtr CreateSubchannel( + const grpc_channel_args& args) override; grpc_channel* CreateChannel(const char* target, const grpc_channel_args& args) override; void UpdateState(grpc_connectivity_state state, @@ -620,7 +621,8 @@ bool GrpcLb::Helper::CalledByCurrentChild() const { return child_ == parent_->child_policy_.get(); } -Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) { +RefCountedPtr GrpcLb::Helper::CreateSubchannel( + const grpc_channel_args& args) { if (parent_->shutting_down_ || (!CalledByPendingChild() && !CalledByCurrentChild())) { return nullptr; diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 1b0dd230b49..4680117fede 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -68,8 +68,9 @@ class PickFirst : public LoadBalancingPolicy { PickFirstSubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel) - : SubchannelData(subchannel_list, address, subchannel) {} + const ServerAddress& address, + RefCountedPtr subchannel) + : SubchannelData(subchannel_list, address, std::move(subchannel)) {} void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state) override; @@ -112,7 +113,8 @@ class PickFirst : public LoadBalancingPolicy { class Picker : public SubchannelPicker { public: - explicit Picker(RefCountedPtr connected_subchannel) + explicit Picker( + RefCountedPtr connected_subchannel) : connected_subchannel_(std::move(connected_subchannel)) {} PickResult Pick(PickArgs args) override { @@ -123,7 +125,7 @@ class PickFirst : public LoadBalancingPolicy { } private: - RefCountedPtr connected_subchannel_; + RefCountedPtr connected_subchannel_; }; // Helper class to ensure that any function that modifies the child refs diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 0b9915de28e..3b8ec4f2872 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -83,8 +83,9 @@ class RoundRobin : public LoadBalancingPolicy { RoundRobinSubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel) - : SubchannelData(subchannel_list, address, subchannel) {} + const ServerAddress& address, + RefCountedPtr subchannel) + : SubchannelData(subchannel_list, address, std::move(subchannel)) {} grpc_connectivity_state connectivity_state() const { return last_connectivity_state_; @@ -156,7 +157,7 @@ class RoundRobin : public LoadBalancingPolicy { RoundRobin* parent_; size_t last_picked_index_; - InlinedVector, 10> subchannels_; + InlinedVector, 10> subchannels_; }; // Helper class to ensure that any function that modifies the child refs diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 93b4bbd369a..8929bc4ab1e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -27,7 +27,10 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/server_address.h" +// TODO(roth): Should not need the include of subchannel.h here, since +// that implementation should be hidden from the LB policy API. #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/client_channel/subchannel_interface.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/abstract.h" @@ -88,11 +91,11 @@ class SubchannelData { } // Returns a pointer to the subchannel. - Subchannel* subchannel() const { return subchannel_; } + SubchannelInterface* subchannel() const { return subchannel_.get(); } // Returns the connected subchannel. Will be null if the subchannel // is not connected. - ConnectedSubchannel* connected_subchannel() const { + ConnectedSubchannelInterface* connected_subchannel() const { return connected_subchannel_.get(); } @@ -102,8 +105,8 @@ class SubchannelData { // calling CancelConnectivityWatchLocked()). grpc_connectivity_state CheckConnectivityStateLocked() { GPR_ASSERT(pending_watcher_ == nullptr); - connectivity_state_ = subchannel()->CheckConnectivityState( - subchannel_list_->health_check_service_name(), &connected_subchannel_); + connectivity_state_ = + subchannel()->CheckConnectivityState(&connected_subchannel_); return connectivity_state_; } @@ -128,7 +131,8 @@ class SubchannelData { protected: SubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel); + const ServerAddress& address, + RefCountedPtr subchannel); virtual ~SubchannelData(); @@ -140,7 +144,7 @@ class SubchannelData { private: // Watcher for subchannel connectivity state. - class Watcher : public Subchannel::ConnectivityStateWatcher { + class Watcher : public SubchannelInterface::ConnectivityStateWatcher { public: Watcher( SubchannelData* subchannel_data, @@ -150,9 +154,9 @@ class SubchannelData { ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); } - void OnConnectivityStateChange( - grpc_connectivity_state new_state, - RefCountedPtr connected_subchannel) override; + void OnConnectivityStateChange(grpc_connectivity_state new_state, + RefCountedPtr + connected_subchannel) override; grpc_pollset_set* interested_parties() override { return subchannel_list_->policy()->interested_parties(); @@ -169,7 +173,7 @@ class SubchannelData { RefCountedPtr> subchannel_list, grpc_connectivity_state state, - RefCountedPtr connected_subchannel); + RefCountedPtr connected_subchannel); ~Updater() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher::Updater dtor"); @@ -182,7 +186,7 @@ class SubchannelData { RefCountedPtr> subchannel_list_; const grpc_connectivity_state state_; - RefCountedPtr connected_subchannel_; + RefCountedPtr connected_subchannel_; grpc_closure closure_; }; @@ -196,12 +200,12 @@ class SubchannelData { // Backpointer to owning subchannel list. Not owned. SubchannelList* subchannel_list_; // The subchannel. - Subchannel* subchannel_; + RefCountedPtr subchannel_; // Will be non-null when the subchannel's state is being watched. - Subchannel::ConnectivityStateWatcher* pending_watcher_ = nullptr; + SubchannelInterface::ConnectivityStateWatcher* pending_watcher_ = nullptr; // Data updated by the watcher. grpc_connectivity_state connectivity_state_; - RefCountedPtr connected_subchannel_; + RefCountedPtr connected_subchannel_; }; // A list of subchannels. @@ -235,9 +239,6 @@ class SubchannelList : public InternallyRefCounted { // Accessors. LoadBalancingPolicy* policy() const { return policy_; } TraceFlag* tracer() const { return tracer_; } - const char* health_check_service_name() const { - return health_check_service_name_.get(); - } // Resets connection backoff of all subchannels. // TODO(roth): We will probably need to rethink this as part of moving @@ -275,8 +276,6 @@ class SubchannelList : public InternallyRefCounted { TraceFlag* tracer_; - UniquePtr health_check_service_name_; - grpc_combiner* combiner_; // The list of subchannels. @@ -300,7 +299,7 @@ template void SubchannelData::Watcher:: OnConnectivityStateChange( grpc_connectivity_state new_state, - RefCountedPtr connected_subchannel) { + RefCountedPtr connected_subchannel) { // Will delete itself. New(subchannel_data_, subchannel_list_->Ref(DEBUG_LOCATION, "Watcher::Updater"), @@ -314,7 +313,7 @@ SubchannelData::Watcher::Updater:: RefCountedPtr> subchannel_list, grpc_connectivity_state state, - RefCountedPtr connected_subchannel) + RefCountedPtr connected_subchannel) : subchannel_data_(subchannel_data), subchannel_list_(std::move(subchannel_list)), state_(state), @@ -336,7 +335,7 @@ void SubchannelData::Watcher::Updater:: "connected_subchannel=%p, shutting_down=%d, pending_watcher=%p", sd->subchannel_list_->tracer()->name(), sd->subchannel_list_->policy(), sd->subchannel_list_, sd->Index(), - sd->subchannel_list_->num_subchannels(), sd->subchannel_, + sd->subchannel_list_->num_subchannels(), sd->subchannel_.get(), grpc_connectivity_state_name(self->state_), self->connected_subchannel_.get(), sd->subchannel_list_->shutting_down(), sd->pending_watcher_); @@ -360,9 +359,9 @@ void SubchannelData::Watcher::Updater:: template SubchannelData::SubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel) + const ServerAddress& address, RefCountedPtr subchannel) : subchannel_list_(subchannel_list), - subchannel_(subchannel), + subchannel_(std::move(subchannel)), // We assume that the current state is IDLE. If not, we'll get a // callback telling us that. connectivity_state_(GRPC_CHANNEL_IDLE) {} @@ -382,10 +381,9 @@ void SubchannelData:: " (subchannel %p): unreffing subchannel", subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_); + subchannel_.get()); } - GRPC_SUBCHANNEL_UNREF(subchannel_, reason); - subchannel_ = nullptr; + subchannel_.reset(); connected_subchannel_.reset(); } } @@ -407,16 +405,16 @@ void SubchannelDatatracer()->name(), subchannel_list_->policy(), subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_, grpc_connectivity_state_name(connectivity_state_)); + subchannel_.get(), + grpc_connectivity_state_name(connectivity_state_)); } GPR_ASSERT(pending_watcher_ == nullptr); pending_watcher_ = New(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher")); subchannel_->WatchConnectivityState( connectivity_state_, - UniquePtr( - gpr_strdup(subchannel_list_->health_check_service_name())), - UniquePtr(pending_watcher_)); + UniquePtr( + pending_watcher_)); } template @@ -428,11 +426,10 @@ void SubchannelData:: " (subchannel %p): canceling connectivity watch (%s)", subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_, reason); + subchannel_.get(), reason); } if (pending_watcher_ != nullptr) { - subchannel_->CancelConnectivityStateWatch( - subchannel_list_->health_check_service_name(), pending_watcher_); + subchannel_->CancelConnectivityStateWatch(pending_watcher_); pending_watcher_ = nullptr; } } @@ -463,25 +460,12 @@ SubchannelList::SubchannelList( tracer_->name(), policy, this, addresses.size()); } subchannels_.reserve(addresses.size()); - // Find health check service name. - const bool inhibit_health_checking = grpc_channel_arg_get_bool( - grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false); - if (!inhibit_health_checking) { - const char* health_check_service_name = grpc_channel_arg_get_string( - grpc_channel_args_find(&args, "grpc.temp.health_check")); - if (health_check_service_name != nullptr) { - health_check_service_name_.reset(gpr_strdup(health_check_service_name)); - } - } // We need to remove the LB addresses in order to be able to compare the // subchannel keys of subchannels from a different batch of addresses. - // We also remove the health-checking-related args, since we are - // handling that here. // We remove the service config, since it will be passed into the // subchannel via call context. - static const char* keys_to_remove[] = { - GRPC_ARG_SUBCHANNEL_ADDRESS, "grpc.temp.health_check", - GRPC_ARG_INHIBIT_HEALTH_CHECKING, GRPC_ARG_SERVICE_CONFIG}; + static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_SERVICE_CONFIG}; // Create a subchannel for each address. for (size_t i = 0; i < addresses.size(); i++) { // TODO(roth): we should ideally hide this from the LB policy code. In @@ -504,7 +488,8 @@ SubchannelList::SubchannelList( &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add.data(), args_to_add.size()); gpr_free(args_to_add[subchannel_address_arg_index].value.string); - Subchannel* subchannel = helper->CreateSubchannel(*new_args); + RefCountedPtr subchannel = + helper->CreateSubchannel(*new_args); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) { // Subchannel could not be created. @@ -523,11 +508,11 @@ SubchannelList::SubchannelList( gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR ": Created subchannel %p for address uri %s", - tracer_->name(), policy_, this, subchannels_.size(), subchannel, - address_uri); + tracer_->name(), policy_, this, subchannels_.size(), + subchannel.get(), address_uri); gpr_free(address_uri); } - subchannels_.emplace_back(this, addresses[i], subchannel); + subchannels_.emplace_back(this, addresses[i], std::move(subchannel)); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index d70042af229..b198e0e8637 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -334,7 +334,8 @@ class XdsLb : public LoadBalancingPolicy { explicit FallbackHelper(RefCountedPtr parent) : parent_(std::move(parent)) {} - Subchannel* CreateSubchannel(const grpc_channel_args& args) override; + RefCountedPtr CreateSubchannel( + const grpc_channel_args& args) override; grpc_channel* CreateChannel(const char* target, const grpc_channel_args& args) override; void UpdateState(grpc_connectivity_state state, @@ -374,7 +375,8 @@ class XdsLb : public LoadBalancingPolicy { explicit Helper(RefCountedPtr entry) : entry_(std::move(entry)) {} - Subchannel* CreateSubchannel(const grpc_channel_args& args) override; + RefCountedPtr CreateSubchannel( + const grpc_channel_args& args) override; grpc_channel* CreateChannel(const char* target, const grpc_channel_args& args) override; void UpdateState(grpc_connectivity_state state, @@ -579,7 +581,7 @@ bool XdsLb::FallbackHelper::CalledByCurrentFallback() const { return child_ == parent_->fallback_policy_.get(); } -Subchannel* XdsLb::FallbackHelper::CreateSubchannel( +RefCountedPtr XdsLb::FallbackHelper::CreateSubchannel( const grpc_channel_args& args) { if (parent_->shutting_down_ || (!CalledByPendingFallback() && !CalledByCurrentFallback())) { @@ -1997,7 +1999,8 @@ bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByCurrentChild() const { return child_ == entry_->child_policy_.get(); } -Subchannel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel( +RefCountedPtr +XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel( const grpc_channel_args& args) { if (entry_->parent_->shutting_down_ || (!CalledByPendingChild() && !CalledByCurrentChild())) { diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index 3fe2ee74c92..d23ac1f4e33 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -106,7 +106,8 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper RefCountedPtr parent) : parent_(std::move(parent)) {} - Subchannel* CreateSubchannel(const grpc_channel_args& args) override { + RefCountedPtr CreateSubchannel( + const grpc_channel_args& args) override { if (parent_->resolver_ == nullptr) return nullptr; // Shutting down. if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr; return parent_->channel_control_helper()->CreateSubchannel(args); @@ -536,7 +537,7 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked( if (process_resolver_result_ != nullptr) { grpc_error* service_config_error = GRPC_ERROR_NONE; service_config_changed = process_resolver_result_( - process_resolver_result_user_data_, &result, &lb_policy_name, + process_resolver_result_user_data_, result, &lb_policy_name, &lb_policy_config, &service_config_error); if (service_config_error != GRPC_ERROR_NONE) { service_config_error_string = diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.h b/src/core/ext/filters/client_channel/resolving_lb_policy.h index cc9f3176cce..679c8d0fba0 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.h +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.h @@ -69,7 +69,8 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { // empty, it means that we don't have a valid service config to use, and we // should set the channel to be in TRANSIENT_FAILURE. typedef bool (*ProcessResolverResultCallback)( - void* user_data, Resolver::Result* result, const char** lb_policy_name, + void* user_data, const Resolver::Result& result, + const char** lb_policy_name, RefCountedPtr* lb_policy_config, grpc_error** service_config_error); // If error is set when this returns, then construction failed, and diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index cd778976166..cc3457e1e96 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -83,7 +83,7 @@ ConnectedSubchannel::ConnectedSubchannel( grpc_channel_stack* channel_stack, const grpc_channel_args* args, RefCountedPtr channelz_subchannel, intptr_t socket_uuid) - : RefCounted(&grpc_trace_stream_refcount), + : ConnectedSubchannelInterface(&grpc_trace_stream_refcount), channel_stack_(channel_stack), args_(grpc_channel_args_copy(args)), channelz_subchannel_(std::move(channelz_subchannel)), @@ -376,25 +376,17 @@ class Subchannel::ConnectedSubchannelStateWatcher { void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked( UniquePtr watcher) { - watcher->next_ = head_; - head_ = watcher.release(); + watchers_.insert(MakePair(watcher.get(), std::move(watcher))); } void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked( ConnectivityStateWatcher* watcher) { - for (ConnectivityStateWatcher** w = &head_; *w != nullptr; w = &(*w)->next_) { - if (*w == watcher) { - *w = watcher->next_; - Delete(watcher); - return; - } - } - GPR_UNREACHABLE_CODE(return ); + watchers_.erase(watcher); } void Subchannel::ConnectivityStateWatcherList::NotifyLocked( Subchannel* subchannel, grpc_connectivity_state state) { - for (ConnectivityStateWatcher* w = head_; w != nullptr; w = w->next_) { + for (const auto& p : watchers_) { RefCountedPtr connected_subchannel; if (state == GRPC_CHANNEL_READY) { connected_subchannel = subchannel->connected_subchannel_; @@ -407,15 +399,7 @@ void Subchannel::ConnectivityStateWatcherList::NotifyLocked( // the notification into the client_channel control-plane combiner // before processing it. But if we ever have any other callers here, // we will probably need to change this. - w->OnConnectivityStateChange(state, std::move(connected_subchannel)); - } -} - -void Subchannel::ConnectivityStateWatcherList::Clear() { - while (head_ != nullptr) { - ConnectivityStateWatcher* next = head_->next_; - Delete(head_); - head_ = next; + p.second->OnConnectivityStateChange(state, std::move(connected_subchannel)); } } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index e0741bb28fa..2f05792b872 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -23,6 +23,7 @@ #include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/connector.h" +#include "src/core/ext/filters/client_channel/subchannel_interface.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_stack.h" @@ -69,7 +70,7 @@ namespace grpc_core { class SubchannelCall; -class ConnectedSubchannel : public RefCounted { +class ConnectedSubchannel : public ConnectedSubchannelInterface { public: struct CallArgs { grpc_polling_entity* pollent; @@ -96,7 +97,7 @@ class ConnectedSubchannel : public RefCounted { grpc_error** error); grpc_channel_stack* channel_stack() const { return channel_stack_; } - const grpc_channel_args* args() const { return args_; } + const grpc_channel_args* args() const override { return args_; } channelz::SubchannelNode* channelz_subchannel() const { return channelz_subchannel_.get(); } @@ -176,37 +177,9 @@ class SubchannelCall { // A subchannel that knows how to connect to exactly one target address. It // provides a target for load balancing. class Subchannel { - private: - class ConnectivityStateWatcherList; // Forward declaration. - public: - class ConnectivityStateWatcher { - public: - virtual ~ConnectivityStateWatcher() = default; - - // Will be invoked whenever the subchannel's connectivity state - // changes. There will be only one invocation of this method on a - // given watcher instance at any given time. - // - // When the state changes to READY, connected_subchannel will - // contain a ref to the connected subchannel. When it changes from - // READY to some other state, the implementation must release its - // ref to the connected subchannel. - virtual void OnConnectivityStateChange( - grpc_connectivity_state new_state, - RefCountedPtr connected_subchannel) // NOLINT - GRPC_ABSTRACT; - - virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT; - - GRPC_ABSTRACT_BASE_CLASS - - private: - // For access to next_. - friend class Subchannel::ConnectivityStateWatcherList; - - ConnectivityStateWatcher* next_ = nullptr; - }; + typedef SubchannelInterface::ConnectivityStateWatcher + ConnectivityStateWatcher; // The ctor and dtor are not intended to use directly. Subchannel(SubchannelKey* key, grpc_connector* connector, @@ -296,12 +269,15 @@ class Subchannel { // Notifies all watchers in the list about a change to state. void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state); - void Clear(); + void Clear() { watchers_.clear(); } - bool empty() const { return head_ == nullptr; } + bool empty() const { return watchers_.empty(); } private: - ConnectivityStateWatcher* head_ = nullptr; + // TODO(roth): This could be a set instead of a map if we had a set + // implementation. + Map> + watchers_; }; // A map that tracks ConnectivityStateWatchers using a particular health diff --git a/src/core/ext/filters/client_channel/subchannel_interface.h b/src/core/ext/filters/client_channel/subchannel_interface.h new file mode 100644 index 00000000000..0a471045f03 --- /dev/null +++ b/src/core/ext/filters/client_channel/subchannel_interface.h @@ -0,0 +1,109 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INTERFACE_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INTERFACE_H + +#include + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" + +namespace grpc_core { + +// TODO(roth): In a subsequent PR, remove this from this API. +class ConnectedSubchannelInterface + : public RefCounted { + public: + virtual const grpc_channel_args* args() const GRPC_ABSTRACT; + + protected: + template + explicit ConnectedSubchannelInterface(TraceFlagT* trace_flag = nullptr) + : RefCounted(trace_flag) {} +}; + +class SubchannelInterface : public RefCounted { + public: + class ConnectivityStateWatcher { + public: + virtual ~ConnectivityStateWatcher() = default; + + // Will be invoked whenever the subchannel's connectivity state + // changes. There will be only one invocation of this method on a + // given watcher instance at any given time. + // + // When the state changes to READY, connected_subchannel will + // contain a ref to the connected subchannel. When it changes from + // READY to some other state, the implementation must release its + // ref to the connected subchannel. + virtual void OnConnectivityStateChange( + grpc_connectivity_state new_state, + RefCountedPtr + connected_subchannel) // NOLINT + GRPC_ABSTRACT; + + // TODO(roth): Remove this as soon as we move to EventManager-based + // polling. + virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS + }; + + virtual ~SubchannelInterface() = default; + + // Returns the current connectivity state of the subchannel. + virtual grpc_connectivity_state CheckConnectivityState( + RefCountedPtr* connected_subchannel) + GRPC_ABSTRACT; + + // Starts watching the subchannel's connectivity state. + // The first callback to the watcher will be delivered when the + // subchannel's connectivity state becomes a value other than + // initial_state, which may happen immediately. + // Subsequent callbacks will be delivered as the subchannel's state + // changes. + // The watcher will be destroyed either when the subchannel is + // destroyed or when CancelConnectivityStateWatch() is called. + // There can be only one watcher of a given subchannel. It is not + // valid to call this method a second time without first cancelling + // the previous watcher using CancelConnectivityStateWatch(). + virtual void WatchConnectivityState( + grpc_connectivity_state initial_state, + UniquePtr watcher) GRPC_ABSTRACT; + + // Cancels a connectivity state watch. + // If the watcher has already been destroyed, this is a no-op. + virtual void CancelConnectivityStateWatch(ConnectivityStateWatcher* watcher) + GRPC_ABSTRACT; + + // Attempt to connect to the backend. Has no effect if already connected. + virtual void AttemptToConnect() GRPC_ABSTRACT; + + // TODO(roth): These methods should be removed from this interface to + // bettter hide grpc-specific functionality from the LB policy API. + virtual channelz::SubchannelNode* channelz_node() GRPC_ABSTRACT; + virtual void ResetBackoff() GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INTERFACE_H */ diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index bd28422bcd1..ced6d9d8027 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -143,7 +143,8 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy InterceptRecvTrailingMetadataCallback cb, void* user_data) : parent_(std::move(parent)), cb_(cb), user_data_(user_data) {} - Subchannel* CreateSubchannel(const grpc_channel_args& args) override { + RefCountedPtr CreateSubchannel( + const grpc_channel_args& args) override { return parent_->channel_control_helper()->CreateSubchannel(args); } diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 2c20d69e6e3..7768bca30f5 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -974,6 +974,7 @@ src/core/ext/filters/client_channel/service_config.cc \ src/core/ext/filters/client_channel/service_config.h \ src/core/ext/filters/client_channel/subchannel.cc \ src/core/ext/filters/client_channel/subchannel.h \ +src/core/ext/filters/client_channel/subchannel_interface.h \ src/core/ext/filters/client_channel/subchannel_pool_interface.cc \ src/core/ext/filters/client_channel/subchannel_pool_interface.h \ src/core/ext/filters/deadline/deadline_filter.cc \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index a9dc9fcfe3f..a479a00509a 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -8912,6 +8912,7 @@ "src/core/ext/filters/client_channel/server_address.h", "src/core/ext/filters/client_channel/service_config.h", "src/core/ext/filters/client_channel/subchannel.h", + "src/core/ext/filters/client_channel/subchannel_interface.h", "src/core/ext/filters/client_channel/subchannel_pool_interface.h" ], "is_filegroup": true, @@ -8968,6 +8969,7 @@ "src/core/ext/filters/client_channel/service_config.h", "src/core/ext/filters/client_channel/subchannel.cc", "src/core/ext/filters/client_channel/subchannel.h", + "src/core/ext/filters/client_channel/subchannel_interface.h", "src/core/ext/filters/client_channel/subchannel_pool_interface.cc", "src/core/ext/filters/client_channel/subchannel_pool_interface.h" ],