diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index a67792fcd37..0b612e67a33 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -147,9 +147,6 @@ class ChannelData { return service_config_; } - RefCountedPtr GetConnectedSubchannelInDataPlane( - SubchannelInterface* subchannel) const; - grpc_connectivity_state CheckConnectivityState(bool try_to_connect); void AddExternalConnectivityWatcher(grpc_polling_entity pollent, grpc_connectivity_state* state, @@ -164,9 +161,9 @@ class ChannelData { } private: - class SubchannelWrapper; class ConnectivityStateAndPickerSetter; class ServiceConfigSetter; + class GrpcSubchannel; class ClientChannelControlHelper; class ExternalConnectivityWatcher { @@ -265,14 +262,7 @@ class ChannelData { UniquePtr health_check_service_name_; RefCountedPtr saved_service_config_; bool received_first_resolver_result_ = false; - // The number of SubchannelWrapper instances referencing a given Subchannel. Map subchannel_refcount_map_; - // Pending ConnectedSubchannel updates for each SubchannelWrapper. - // Updates are queued here in the control plane combiner and then applied - // in the data plane combiner when the picker is updated. - Map, RefCountedPtr, - RefCountedPtrLess> - pending_subchannel_updates_; // // Fields accessed from both data plane and control plane combiners. @@ -716,247 +706,6 @@ class CallData { grpc_metadata_batch send_trailing_metadata_; }; -// -// ChannelData::SubchannelWrapper -// - -// This class is a wrapper for Subchannel that hides details of the -// channel's implementation (such as the health check service name and -// connected subchannel) from the LB policy API. -// -// Note that no synchronization is needed here, because even if the -// underlying subchannel is shared between channels, this wrapper will only -// be used within one channel, so it will always be synchronized by the -// control plane combiner. -class ChannelData::SubchannelWrapper : public SubchannelInterface { - public: - SubchannelWrapper(ChannelData* chand, Subchannel* subchannel, - UniquePtr health_check_service_name) - : SubchannelInterface(&grpc_client_channel_routing_trace), - chand_(chand), - subchannel_(subchannel), - health_check_service_name_(std::move(health_check_service_name)) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: creating subchannel wrapper %p for subchannel %p", - chand, this, subchannel_); - } - GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper"); - auto* subchannel_node = subchannel_->channelz_node(); - if (subchannel_node != nullptr) { - intptr_t subchannel_uuid = subchannel_node->uuid(); - auto it = chand_->subchannel_refcount_map_.find(subchannel_); - if (it == chand_->subchannel_refcount_map_.end()) { - chand_->channelz_node_->AddChildSubchannel(subchannel_uuid); - it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first; - } - ++it->second; - } - } - - ~SubchannelWrapper() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: destroying subchannel wrapper %p for subchannel %p", - chand_, this, subchannel_); - } - auto* subchannel_node = subchannel_->channelz_node(); - if (subchannel_node != nullptr) { - intptr_t subchannel_uuid = subchannel_node->uuid(); - auto it = chand_->subchannel_refcount_map_.find(subchannel_); - GPR_ASSERT(it != chand_->subchannel_refcount_map_.end()); - --it->second; - if (it->second == 0) { - chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid); - chand_->subchannel_refcount_map_.erase(it); - } - } - GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB"); - GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper"); - } - - grpc_connectivity_state CheckConnectivityState() override { - RefCountedPtr connected_subchannel; - grpc_connectivity_state connectivity_state = - subchannel_->CheckConnectivityState(health_check_service_name_.get(), - &connected_subchannel); - MaybeUpdateConnectedSubchannel(std::move(connected_subchannel)); - return connectivity_state; - } - - void WatchConnectivityState( - grpc_connectivity_state initial_state, - UniquePtr watcher) override { - auto& watcher_wrapper = watcher_map_[watcher.get()]; - GPR_ASSERT(watcher_wrapper == nullptr); - watcher_wrapper = New( - std::move(watcher), Ref(DEBUG_LOCATION, "WatcherWrapper")); - subchannel_->WatchConnectivityState( - initial_state, - UniquePtr(gpr_strdup(health_check_service_name_.get())), - OrphanablePtr( - watcher_wrapper)); - } - - void CancelConnectivityStateWatch( - ConnectivityStateWatcherInterface* watcher) override { - auto it = watcher_map_.find(watcher); - GPR_ASSERT(it != watcher_map_.end()); - subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(), - it->second); - watcher_map_.erase(it); - } - - void AttemptToConnect() override { subchannel_->AttemptToConnect(); } - - void ResetBackoff() override { subchannel_->ResetBackoff(); } - - const grpc_channel_args* channel_args() override { - return subchannel_->channel_args(); - } - - // Caller must be holding the control-plane combiner. - ConnectedSubchannel* connected_subchannel() const { - return connected_subchannel_.get(); - } - - // Caller must be holding the data-plane combiner. - ConnectedSubchannel* connected_subchannel_in_data_plane() const { - return connected_subchannel_in_data_plane_.get(); - } - void set_connected_subchannel_in_data_plane( - RefCountedPtr connected_subchannel) { - connected_subchannel_in_data_plane_ = std::move(connected_subchannel); - } - - private: - // Subchannel and SubchannelInterface have different interfaces for - // their respective ConnectivityStateWatcherInterface classes. - // The one in Subchannel updates the ConnectedSubchannel along with - // the state, whereas the one in SubchannelInterface does not expose - // the ConnectedSubchannel. - // - // This wrapper provides a bridge between the two. It implements - // Subchannel::ConnectivityStateWatcherInterface and wraps - // the instance of SubchannelInterface::ConnectivityStateWatcherInterface - // that was passed in by the LB policy. We pass an instance of this - // class to the underlying Subchannel, and when we get updates from - // the subchannel, we pass those on to the wrapped watcher to return - // the update to the LB policy. This allows us to set the connected - // subchannel before passing the result back to the LB policy. - class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface { - public: - WatcherWrapper( - UniquePtr - watcher, - RefCountedPtr parent) - : watcher_(std::move(watcher)), parent_(std::move(parent)) {} - - ~WatcherWrapper() { parent_.reset(DEBUG_LOCATION, "WatcherWrapper"); } - - void Orphan() override { Unref(); } - - void OnConnectivityStateChange( - grpc_connectivity_state new_state, - RefCountedPtr connected_subchannel) override { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: connectivity change for subchannel wrapper %p " - "subchannel %p (connected_subchannel=%p state=%s); " - "hopping into combiner", - parent_->chand_, parent_.get(), parent_->subchannel_, - connected_subchannel.get(), - grpc_connectivity_state_name(new_state)); - } - // Will delete itself. - New(Ref(), new_state, std::move(connected_subchannel)); - } - - grpc_pollset_set* interested_parties() override { - return watcher_->interested_parties(); - } - - private: - class Updater { - public: - Updater(RefCountedPtr parent, - grpc_connectivity_state new_state, - RefCountedPtr connected_subchannel) - : parent_(std::move(parent)), - state_(new_state), - connected_subchannel_(std::move(connected_subchannel)) { - GRPC_CLOSURE_INIT( - &closure_, ApplyUpdateInControlPlaneCombiner, this, - grpc_combiner_scheduler(parent_->parent_->chand_->combiner_)); - GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); - } - - private: - static void ApplyUpdateInControlPlaneCombiner(void* arg, - grpc_error* error) { - Updater* self = static_cast(arg); - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: processing connectivity change in combiner " - "for subchannel wrapper %p subchannel %p " - "(connected_subchannel=%p state=%s)", - self->parent_->parent_->chand_, self->parent_->parent_.get(), - self->parent_->parent_->subchannel_, - self->connected_subchannel_.get(), - grpc_connectivity_state_name(self->state_)); - } - self->parent_->parent_->MaybeUpdateConnectedSubchannel( - std::move(self->connected_subchannel_)); - self->parent_->watcher_->OnConnectivityStateChange(self->state_); - Delete(self); - } - - RefCountedPtr parent_; - grpc_connectivity_state state_; - RefCountedPtr connected_subchannel_; - grpc_closure closure_; - }; - - UniquePtr watcher_; - RefCountedPtr parent_; - }; - - void MaybeUpdateConnectedSubchannel( - RefCountedPtr connected_subchannel) { - // Update the connected subchannel only if the channel is not shutting - // down. This is because once the channel is shutting down, we - // ignore picker updates from the LB policy, which means that - // ConnectivityStateAndPickerSetter will never process the entries - // in chand_->pending_subchannel_updates_. So we don't want to add - // entries there that will never be processed, since that would - // leave dangling refs to the channel and prevent its destruction. - grpc_error* disconnect_error = chand_->disconnect_error(); - if (disconnect_error != GRPC_ERROR_NONE) return; - // Not shutting down, so do the update. - if (connected_subchannel_ != connected_subchannel) { - connected_subchannel_ = std::move(connected_subchannel); - // Record the new connected subchannel so that it can be updated - // in the data plane combiner the next time the picker is updated. - chand_->pending_subchannel_updates_[Ref( - DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_; - } - } - - ChannelData* chand_; - Subchannel* subchannel_; - UniquePtr health_check_service_name_; - // Maps from the address of the watcher passed to us by the LB policy - // to the address of the WrapperWatcher that we passed to the underlying - // subchannel. This is needed so that when the LB policy calls - // CancelConnectivityStateWatch() with its watcher, we know the - // corresponding WrapperWatcher to cancel on the underlying subchannel. - Map watcher_map_; - // To be accessed only in the control plane combiner. - RefCountedPtr connected_subchannel_; - // To be accessed only in the data plane combiner. - RefCountedPtr connected_subchannel_in_data_plane_; -}; - // // ChannelData::ConnectivityStateAndPickerSetter // @@ -980,13 +729,10 @@ class ChannelData::ConnectivityStateAndPickerSetter { grpc_slice_from_static_string( GetChannelConnectivityStateChangeString(state))); } - // Grab any pending subchannel updates. - pending_subchannel_updates_ = - std::move(chand_->pending_subchannel_updates_); // Bounce into the data plane combiner to reset the picker. GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ConnectivityStateAndPickerSetter"); - GRPC_CLOSURE_INIT(&closure_, SetPickerInDataPlane, this, + GRPC_CLOSURE_INIT(&closure_, SetPicker, this, grpc_combiner_scheduler(chand->data_plane_combiner_)); GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); } @@ -1009,38 +755,16 @@ class ChannelData::ConnectivityStateAndPickerSetter { GPR_UNREACHABLE_CODE(return "UNKNOWN"); } - static void SetPickerInDataPlane(void* arg, grpc_error* ignored) { + static void SetPicker(void* arg, grpc_error* ignored) { auto* self = static_cast(arg); - // Handle subchannel updates. - for (auto& p : self->pending_subchannel_updates_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: updating subchannel wrapper %p data plane " - "connected_subchannel to %p", - self->chand_, p.first.get(), p.second.get()); - } - p.first->set_connected_subchannel_in_data_plane(std::move(p.second)); - } - // Swap out the picker. We hang on to the old picker so that it can - // be deleted in the control-plane combiner, since that's where we need - // to unref the subchannel wrappers that are reffed by the picker. - self->picker_.swap(self->chand_->picker_); + // Update picker. + self->chand_->picker_ = std::move(self->picker_); // Re-process queued picks. for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr; pick = pick->next) { CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE); } - // Pop back into the control plane combiner to delete ourself, so - // that we make sure to unref subchannel wrappers there. This - // includes both the ones reffed by the old picker (now stored in - // self->picker_) and the ones in self->pending_subchannel_updates_. - GRPC_CLOSURE_INIT(&self->closure_, CleanUpInControlPlane, self, - grpc_combiner_scheduler(self->chand_->combiner_)); - GRPC_CLOSURE_SCHED(&self->closure_, GRPC_ERROR_NONE); - } - - static void CleanUpInControlPlane(void* arg, grpc_error* ignored) { - auto* self = static_cast(arg); + // Clean up. GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, "ConnectivityStateAndPickerSetter"); Delete(self); @@ -1048,9 +772,6 @@ class ChannelData::ConnectivityStateAndPickerSetter { ChannelData* chand_; UniquePtr picker_; - Map, RefCountedPtr, - RefCountedPtrLess> - pending_subchannel_updates_; grpc_closure closure_; }; @@ -1225,6 +946,89 @@ void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked( &self->chand_->state_tracker_, self->state_, &self->my_closure_); } +// +// ChannelData::GrpcSubchannel +// + +// This class is a wrapper for Subchannel that hides details of the +// channel's implementation (such as the health check service name) from +// the LB policy API. +// +// Note that no synchronization is needed here, because even if the +// underlying subchannel is shared between channels, this wrapper will only +// be used within one channel, so it will always be synchronized by the +// control plane combiner. +class ChannelData::GrpcSubchannel : public SubchannelInterface { + public: + GrpcSubchannel(ChannelData* chand, Subchannel* subchannel, + UniquePtr health_check_service_name) + : chand_(chand), + subchannel_(subchannel), + health_check_service_name_(std::move(health_check_service_name)) { + GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "GrpcSubchannel"); + auto* subchannel_node = subchannel_->channelz_node(); + if (subchannel_node != nullptr) { + intptr_t subchannel_uuid = subchannel_node->uuid(); + auto it = chand_->subchannel_refcount_map_.find(subchannel_); + if (it == chand_->subchannel_refcount_map_.end()) { + chand_->channelz_node_->AddChildSubchannel(subchannel_uuid); + it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first; + } + ++it->second; + } + } + + ~GrpcSubchannel() { + auto* subchannel_node = subchannel_->channelz_node(); + if (subchannel_node != nullptr) { + intptr_t subchannel_uuid = subchannel_node->uuid(); + auto it = chand_->subchannel_refcount_map_.find(subchannel_); + GPR_ASSERT(it != chand_->subchannel_refcount_map_.end()); + --it->second; + if (it->second == 0) { + chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid); + chand_->subchannel_refcount_map_.erase(it); + } + } + GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB"); + GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "GrpcSubchannel"); + } + + grpc_connectivity_state CheckConnectivityState( + RefCountedPtr* connected_subchannel) + override { + RefCountedPtr tmp; + auto retval = subchannel_->CheckConnectivityState( + health_check_service_name_.get(), &tmp); + *connected_subchannel = std::move(tmp); + return retval; + } + + void WatchConnectivityState( + grpc_connectivity_state initial_state, + UniquePtr watcher) override { + subchannel_->WatchConnectivityState( + initial_state, + UniquePtr(gpr_strdup(health_check_service_name_.get())), + std::move(watcher)); + } + + void CancelConnectivityStateWatch( + ConnectivityStateWatcher* watcher) override { + subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(), + watcher); + } + + void AttemptToConnect() override { subchannel_->AttemptToConnect(); } + + void ResetBackoff() override { subchannel_->ResetBackoff(); } + + private: + ChannelData* chand_; + Subchannel* subchannel_; + UniquePtr health_check_service_name_; +}; + // // ChannelData::ClientChannelControlHelper // @@ -1262,8 +1066,8 @@ class ChannelData::ClientChannelControlHelper chand_->client_channel_factory_->CreateSubchannel(new_args); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) return nullptr; - return MakeRefCounted( - chand_, subchannel, std::move(health_check_service_name)); + return MakeRefCounted(chand_, subchannel, + std::move(health_check_service_name)); } grpc_channel* CreateChannel(const char* target, @@ -1274,7 +1078,8 @@ class ChannelData::ClientChannelControlHelper void UpdateState( grpc_connectivity_state state, UniquePtr picker) override { - grpc_error* disconnect_error = chand_->disconnect_error(); + grpc_error* disconnect_error = + chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { const char* extra = disconnect_error == GRPC_ERROR_NONE ? "" @@ -1646,13 +1451,9 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { } LoadBalancingPolicy::PickResult result = picker_->Pick(LoadBalancingPolicy::PickArgs()); - ConnectedSubchannel* connected_subchannel = nullptr; - if (result.subchannel != nullptr) { - SubchannelWrapper* subchannel = - static_cast(result.subchannel.get()); - connected_subchannel = subchannel->connected_subchannel(); - } - if (connected_subchannel != nullptr) { + if (result.connected_subchannel != nullptr) { + ConnectedSubchannel* connected_subchannel = + static_cast(result.connected_subchannel.get()); connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack); } else { if (result.error == GRPC_ERROR_NONE) { @@ -1695,10 +1496,6 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) { } // Disconnect. if (op->disconnect_with_error != GRPC_ERROR_NONE) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { - gpr_log(GPR_INFO, "chand=%p: channel shut down from API: %s", chand, - grpc_error_string(op->disconnect_with_error)); - } grpc_error* error = GRPC_ERROR_NONE; GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong( &error, op->disconnect_with_error, MemoryOrder::ACQ_REL, @@ -1773,17 +1570,6 @@ void ChannelData::RemoveQueuedPick(QueuedPick* to_remove, } } -RefCountedPtr -ChannelData::GetConnectedSubchannelInDataPlane( - SubchannelInterface* subchannel) const { - SubchannelWrapper* subchannel_wrapper = - static_cast(subchannel); - ConnectedSubchannel* connected_subchannel = - subchannel_wrapper->connected_subchannel_in_data_plane(); - if (connected_subchannel == nullptr) return nullptr; - return connected_subchannel->Ref(); -} - void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) { auto* chand = static_cast(arg); if (chand->resolving_lb_policy_ != nullptr) { @@ -3711,9 +3497,10 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { auto result = chand->picker()->Pick(pick_args); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, - "chand=%p calld=%p: LB pick returned %s (subchannel=%p, error=%s)", + "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, " + "error=%s)", chand, calld, PickResultTypeName(result.type), - result.subchannel.get(), grpc_error_string(result.error)); + result.connected_subchannel.get(), grpc_error_string(result.error)); } switch (result.type) { case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: { @@ -3755,16 +3542,11 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { break; default: // PICK_COMPLETE // Handle drops. - if (GPR_UNLIKELY(result.subchannel == nullptr)) { + if (GPR_UNLIKELY(result.connected_subchannel == nullptr)) { result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); - } else { - // Grab a ref to the connected subchannel while we're still - // holding the data plane combiner. - calld->connected_subchannel_ = - chand->GetConnectedSubchannelInDataPlane(result.subchannel.get()); - GPR_ASSERT(calld->connected_subchannel_ != nullptr); } + calld->connected_subchannel_ = std::move(result.connected_subchannel); calld->lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready; calld->lb_recv_trailing_metadata_ready_user_data_ = diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index c21e9d90ec4..f98a41dee07 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -128,7 +128,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Used only if type is PICK_COMPLETE. Will be set to the selected /// subchannel, or nullptr if the LB policy decides to drop the call. - RefCountedPtr subchannel; + RefCountedPtr connected_subchannel; /// Used only if type is PICK_TRANSIENT_FAILURE. /// Error to be set when returning a transient failure. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index dad10f0ce86..a87dfda7321 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -575,12 +575,13 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { result = child_picker_->Pick(args); // If pick succeeded, add LB token to initial metadata. if (result.type == PickResult::PICK_COMPLETE && - result.subchannel != nullptr) { + result.connected_subchannel != nullptr) { const grpc_arg* arg = grpc_channel_args_find( - result.subchannel->channel_args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN); + result.connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN); if (arg == nullptr) { - gpr_log(GPR_ERROR, "[grpclb %p picker %p] No LB token for subchannel %p", - parent_, this, result.subchannel.get()); + gpr_log(GPR_ERROR, + "[grpclb %p picker %p] No LB token for connected subchannel %p", + parent_, this, result.connected_subchannel.get()); abort(); } grpc_mdelem lb_token = {reinterpret_cast(arg->value.pointer.p)}; diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 8bf3d825b23..00036d8be64 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -28,6 +28,7 @@ #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -84,8 +85,9 @@ class PickFirst : public LoadBalancingPolicy { public: PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer, const ServerAddressList& addresses, + grpc_combiner* combiner, const grpc_channel_args& args) - : SubchannelList(policy, tracer, addresses, + : SubchannelList(policy, tracer, addresses, combiner, policy->channel_control_helper(), args) { // Need to maintain a ref to the LB policy as long as we maintain // any references to subchannels, since the subchannels' @@ -109,18 +111,19 @@ class PickFirst : public LoadBalancingPolicy { class Picker : public SubchannelPicker { public: - explicit Picker(RefCountedPtr subchannel) - : subchannel_(std::move(subchannel)) {} + explicit Picker( + RefCountedPtr connected_subchannel) + : connected_subchannel_(std::move(connected_subchannel)) {} PickResult Pick(PickArgs args) override { PickResult result; result.type = PickResult::PICK_COMPLETE; - result.subchannel = subchannel_; + result.connected_subchannel = connected_subchannel_; return result; } private: - RefCountedPtr subchannel_; + RefCountedPtr connected_subchannel_; }; void ShutdownLocked() override; @@ -163,9 +166,6 @@ void PickFirst::ShutdownLocked() { void PickFirst::ExitIdleLocked() { if (shutdown_) return; if (idle_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, "Pick First %p exiting idle", this); - } idle_ = false; if (subchannel_list_ == nullptr || subchannel_list_->num_subchannels() == 0) { @@ -200,7 +200,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) { grpc_channel_args* new_args = grpc_channel_args_copy_and_add(args.args, &new_arg, 1); auto subchannel_list = MakeOrphanable( - this, &grpc_lb_pick_first_trace, args.addresses, *new_args); + this, &grpc_lb_pick_first_trace, args.addresses, combiner(), *new_args); grpc_channel_args_destroy(new_args); if (subchannel_list->num_subchannels() == 0) { // Empty update or no valid subchannels. Unsubscribe from all current @@ -351,8 +351,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // some connectivity state notifications. if (connectivity_state == GRPC_CHANNEL_READY) { p->channel_control_helper()->UpdateState( - GRPC_CHANNEL_READY, - UniquePtr(New(subchannel()->Ref()))); + GRPC_CHANNEL_READY, UniquePtr(New( + connected_subchannel()->Ref()))); } else { // CONNECTING p->channel_control_helper()->UpdateState( connectivity_state, UniquePtr(New( @@ -447,13 +447,13 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); } // Cases 1 and 2. - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel()); - } p->selected_ = this; p->channel_control_helper()->UpdateState( GRPC_CHANNEL_READY, - UniquePtr(New(subchannel()->Ref()))); + UniquePtr(New(connected_subchannel()->Ref()))); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel()); + } } void PickFirst::PickFirstSubchannelData:: diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 04308ee254c..c6655c7d9bb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -38,6 +38,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" @@ -105,8 +106,9 @@ class RoundRobin : public LoadBalancingPolicy { public: RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer, const ServerAddressList& addresses, + grpc_combiner* combiner, const grpc_channel_args& args) - : SubchannelList(policy, tracer, addresses, + : SubchannelList(policy, tracer, addresses, combiner, policy->channel_control_helper(), args) { // Need to maintain a ref to the LB policy as long as we maintain // any references to subchannels, since the subchannels' @@ -153,7 +155,7 @@ class RoundRobin : public LoadBalancingPolicy { RoundRobin* parent_; size_t last_picked_index_; - InlinedVector, 10> subchannels_; + InlinedVector, 10> subchannels_; }; void ShutdownLocked() override; @@ -178,9 +180,10 @@ RoundRobin::Picker::Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list) : parent_(parent) { for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { - RoundRobinSubchannelData* sd = subchannel_list->subchannel(i); - if (sd->connectivity_state() == GRPC_CHANNEL_READY) { - subchannels_.push_back(sd->subchannel()->Ref()); + auto* connected_subchannel = + subchannel_list->subchannel(i)->connected_subchannel(); + if (connected_subchannel != nullptr) { + subchannels_.push_back(connected_subchannel->Ref()); } } // For discussion on why we generate a random starting index for @@ -201,13 +204,14 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) { last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, - "[RR %p picker %p] returning index %" PRIuPTR ", subchannel=%p", + "[RR %p picker %p] returning index %" PRIuPTR + ", connected_subchannel=%p", parent_, this, last_picked_index_, subchannels_[last_picked_index_].get()); } PickResult result; result.type = PickResult::PICK_COMPLETE; - result.subchannel = subchannels_[last_picked_index_]; + result.connected_subchannel = subchannels_[last_picked_index_]; return result; } @@ -420,7 +424,7 @@ void RoundRobin::UpdateLocked(UpdateArgs args) { } } latest_pending_subchannel_list_ = MakeOrphanable( - this, &grpc_lb_round_robin_trace, args.addresses, *args.args); + this, &grpc_lb_round_robin_trace, args.addresses, combiner(), *args.args); if (latest_pending_subchannel_list_->num_subchannels() == 0) { // If the new list is empty, immediately promote the new list to the // current list and transition to TRANSIENT_FAILURE. diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 34cd0f549fe..7d70928a83c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -39,6 +39,7 @@ #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -63,7 +64,8 @@ class MySubchannelList }; */ -// All methods will be called from within the client_channel combiner. +// All methods with a Locked() suffix must be called from within the +// client_channel combiner. namespace grpc_core { @@ -91,13 +93,20 @@ class SubchannelData { // Returns a pointer to the subchannel. SubchannelInterface* subchannel() const { return subchannel_.get(); } + // Returns the connected subchannel. Will be null if the subchannel + // is not connected. + ConnectedSubchannelInterface* connected_subchannel() const { + return connected_subchannel_.get(); + } + // Synchronously checks the subchannel's connectivity state. // Must not be called while there is a connectivity notification // pending (i.e., between calling StartConnectivityWatchLocked() and // calling CancelConnectivityWatchLocked()). grpc_connectivity_state CheckConnectivityStateLocked() { GPR_ASSERT(pending_watcher_ == nullptr); - connectivity_state_ = subchannel_->CheckConnectivityState(); + connectivity_state_ = + subchannel()->CheckConnectivityState(&connected_subchannel_); return connectivity_state_; } @@ -135,8 +144,7 @@ class SubchannelData { private: // Watcher for subchannel connectivity state. - class Watcher - : public SubchannelInterface::ConnectivityStateWatcherInterface { + class Watcher : public SubchannelInterface::ConnectivityStateWatcher { public: Watcher( SubchannelData* subchannel_data, @@ -146,13 +154,42 @@ class SubchannelData { ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); } - void OnConnectivityStateChange(grpc_connectivity_state new_state) override; + void OnConnectivityStateChange(grpc_connectivity_state new_state, + RefCountedPtr + connected_subchannel) override; grpc_pollset_set* interested_parties() override { return subchannel_list_->policy()->interested_parties(); } private: + // A fire-and-forget class that bounces into the combiner to process + // a connectivity state update. + class Updater { + public: + Updater( + SubchannelData* + subchannel_data, + RefCountedPtr> + subchannel_list, + grpc_connectivity_state state, + RefCountedPtr connected_subchannel); + + ~Updater() { + subchannel_list_.reset(DEBUG_LOCATION, "Watcher::Updater dtor"); + } + + private: + static void OnUpdateLocked(void* arg, grpc_error* error); + + SubchannelData* subchannel_data_; + RefCountedPtr> + subchannel_list_; + const grpc_connectivity_state state_; + RefCountedPtr connected_subchannel_; + grpc_closure closure_; + }; + SubchannelData* subchannel_data_; RefCountedPtr subchannel_list_; }; @@ -165,10 +202,10 @@ class SubchannelData { // The subchannel. RefCountedPtr subchannel_; // Will be non-null when the subchannel's state is being watched. - SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ = - nullptr; + SubchannelInterface::ConnectivityStateWatcher* pending_watcher_ = nullptr; // Data updated by the watcher. grpc_connectivity_state connectivity_state_; + RefCountedPtr connected_subchannel_; }; // A list of subchannels. @@ -195,6 +232,7 @@ class SubchannelList : public InternallyRefCounted { // the backoff code out of subchannels and into LB policies. void ResetBackoffLocked(); + // Note: Caller must ensure that this is invoked inside of the combiner. void Orphan() override { ShutdownLocked(); InternallyRefCounted::Unref(DEBUG_LOCATION, "shutdown"); @@ -204,7 +242,7 @@ class SubchannelList : public InternallyRefCounted { protected: SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer, - const ServerAddressList& addresses, + const ServerAddressList& addresses, grpc_combiner* combiner, LoadBalancingPolicy::ChannelControlHelper* helper, const grpc_channel_args& args); @@ -225,6 +263,8 @@ class SubchannelList : public InternallyRefCounted { TraceFlag* tracer_; + grpc_combiner* combiner_; + // The list of subchannels. SubchannelVector subchannels_; @@ -244,26 +284,59 @@ class SubchannelList : public InternallyRefCounted { template void SubchannelData::Watcher:: - OnConnectivityStateChange(grpc_connectivity_state new_state) { - if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) { + OnConnectivityStateChange( + grpc_connectivity_state new_state, + RefCountedPtr connected_subchannel) { + // Will delete itself. + New(subchannel_data_, + subchannel_list_->Ref(DEBUG_LOCATION, "Watcher::Updater"), + new_state, std::move(connected_subchannel)); +} + +template +SubchannelData::Watcher::Updater:: + Updater( + SubchannelData* subchannel_data, + RefCountedPtr> + subchannel_list, + grpc_connectivity_state state, + RefCountedPtr connected_subchannel) + : subchannel_data_(subchannel_data), + subchannel_list_(std::move(subchannel_list)), + state_(state), + connected_subchannel_(std::move(connected_subchannel)) { + GRPC_CLOSURE_INIT(&closure_, &OnUpdateLocked, this, + grpc_combiner_scheduler(subchannel_list_->combiner_)); + GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); +} + +template +void SubchannelData::Watcher::Updater:: + OnUpdateLocked(void* arg, grpc_error* error) { + Updater* self = static_cast(arg); + SubchannelData* sd = self->subchannel_data_; + if (GRPC_TRACE_FLAG_ENABLED(*sd->subchannel_list_->tracer())) { gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): connectivity changed: state=%s, " - "shutting_down=%d, pending_watcher=%p", - subchannel_list_->tracer()->name(), subchannel_list_->policy(), - subchannel_list_.get(), subchannel_data_->Index(), - subchannel_list_->num_subchannels(), - subchannel_data_->subchannel_.get(), - grpc_connectivity_state_name(new_state), - subchannel_list_->shutting_down(), - subchannel_data_->pending_watcher_); + "connected_subchannel=%p, shutting_down=%d, pending_watcher=%p", + sd->subchannel_list_->tracer()->name(), + sd->subchannel_list_->policy(), sd->subchannel_list_, sd->Index(), + sd->subchannel_list_->num_subchannels(), sd->subchannel_.get(), + grpc_connectivity_state_name(self->state_), + self->connected_subchannel_.get(), + sd->subchannel_list_->shutting_down(), sd->pending_watcher_); } - if (!subchannel_list_->shutting_down() && - subchannel_data_->pending_watcher_ != nullptr) { - subchannel_data_->connectivity_state_ = new_state; + if (!sd->subchannel_list_->shutting_down() && + sd->pending_watcher_ != nullptr) { + sd->connectivity_state_ = self->state_; + // Get or release ref to connected subchannel. + sd->connected_subchannel_ = std::move(self->connected_subchannel_); // Call the subclass's ProcessConnectivityChangeLocked() method. - subchannel_data_->ProcessConnectivityChangeLocked(new_state); + sd->ProcessConnectivityChangeLocked(sd->connectivity_state_); } + // Clean up. + Delete(self); } // @@ -298,6 +371,7 @@ void SubchannelData:: subchannel_.get()); } subchannel_.reset(); + connected_subchannel_.reset(); } } @@ -326,7 +400,7 @@ void SubchannelData(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher")); subchannel_->WatchConnectivityState( connectivity_state_, - UniquePtr( + UniquePtr( pending_watcher_)); } @@ -360,12 +434,13 @@ void SubchannelData::ShutdownLocked() { template SubchannelList::SubchannelList( LoadBalancingPolicy* policy, TraceFlag* tracer, - const ServerAddressList& addresses, + const ServerAddressList& addresses, grpc_combiner* combiner, LoadBalancingPolicy::ChannelControlHelper* helper, const grpc_channel_args& args) : InternallyRefCounted(tracer), policy_(policy), - tracer_(tracer) { + tracer_(tracer), + combiner_(GRPC_COMBINER_REF(combiner, "subchannel_list")) { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", @@ -434,6 +509,7 @@ SubchannelList::~SubchannelList() { gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(), policy_, this); } + GRPC_COMBINER_UNREF(combiner_, "subchannel_list"); } template diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 74660cec92b..e790ec25520 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -570,7 +570,7 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) { PickResult result = PickFromLocality(key, args); // If pick succeeded, add client stats. if (result.type == PickResult::PICK_COMPLETE && - result.subchannel != nullptr && client_stats_ != nullptr) { + result.connected_subchannel != nullptr && client_stats_ != nullptr) { // TODO(roth): Add support for client stats. } return result; diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 99c7721e5e5..dd16eded826 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -86,7 +86,7 @@ ConnectedSubchannel::ConnectedSubchannel( grpc_channel_stack* channel_stack, const grpc_channel_args* args, RefCountedPtr channelz_subchannel, intptr_t socket_uuid) - : RefCounted(&grpc_trace_subchannel_refcount), + : ConnectedSubchannelInterface(&grpc_trace_subchannel_refcount), channel_stack_(channel_stack), args_(grpc_channel_args_copy(args)), channelz_subchannel_(std::move(channelz_subchannel)), @@ -378,12 +378,12 @@ class Subchannel::ConnectedSubchannelStateWatcher { // void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked( - OrphanablePtr watcher) { + UniquePtr watcher) { watchers_.insert(MakePair(watcher.get(), std::move(watcher))); } void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked( - ConnectivityStateWatcherInterface* watcher) { + ConnectivityStateWatcher* watcher) { watchers_.erase(watcher); } @@ -438,9 +438,8 @@ class Subchannel::HealthWatcherMap::HealthWatcher grpc_connectivity_state state() const { return state_; } - void AddWatcherLocked( - grpc_connectivity_state initial_state, - OrphanablePtr watcher) { + void AddWatcherLocked(grpc_connectivity_state initial_state, + UniquePtr watcher) { if (state_ != initial_state) { RefCountedPtr connected_subchannel; if (state_ == GRPC_CHANNEL_READY) { @@ -452,7 +451,7 @@ class Subchannel::HealthWatcherMap::HealthWatcher watcher_list_.AddWatcherLocked(std::move(watcher)); } - void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher) { + void RemoveWatcherLocked(ConnectivityStateWatcher* watcher) { watcher_list_.RemoveWatcherLocked(watcher); } @@ -528,7 +527,7 @@ class Subchannel::HealthWatcherMap::HealthWatcher void Subchannel::HealthWatcherMap::AddWatcherLocked( Subchannel* subchannel, grpc_connectivity_state initial_state, UniquePtr health_check_service_name, - OrphanablePtr watcher) { + UniquePtr watcher) { // If the health check service name is not already present in the map, // add it. auto it = map_.find(health_check_service_name.get()); @@ -547,8 +546,7 @@ void Subchannel::HealthWatcherMap::AddWatcherLocked( } void Subchannel::HealthWatcherMap::RemoveWatcherLocked( - const char* health_check_service_name, - ConnectivityStateWatcherInterface* watcher) { + const char* health_check_service_name, ConnectivityStateWatcher* watcher) { auto it = map_.find(health_check_service_name); GPR_ASSERT(it != map_.end()); it->second->RemoveWatcherLocked(watcher); @@ -820,7 +818,7 @@ grpc_connectivity_state Subchannel::CheckConnectivityState( void Subchannel::WatchConnectivityState( grpc_connectivity_state initial_state, UniquePtr health_check_service_name, - OrphanablePtr watcher) { + UniquePtr watcher) { MutexLock lock(&mu_); grpc_pollset_set* interested_parties = watcher->interested_parties(); if (interested_parties != nullptr) { @@ -839,8 +837,7 @@ void Subchannel::WatchConnectivityState( } void Subchannel::CancelConnectivityStateWatch( - const char* health_check_service_name, - ConnectivityStateWatcherInterface* watcher) { + const char* health_check_service_name, ConnectivityStateWatcher* watcher) { MutexLock lock(&mu_); grpc_pollset_set* interested_parties = watcher->interested_parties(); if (interested_parties != nullptr) { diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 9e8de767839..2f05792b872 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -23,6 +23,7 @@ #include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/connector.h" +#include "src/core/ext/filters/client_channel/subchannel_interface.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_stack.h" @@ -69,7 +70,7 @@ namespace grpc_core { class SubchannelCall; -class ConnectedSubchannel : public RefCounted { +class ConnectedSubchannel : public ConnectedSubchannelInterface { public: struct CallArgs { grpc_polling_entity* pollent; @@ -96,7 +97,7 @@ class ConnectedSubchannel : public RefCounted { grpc_error** error); grpc_channel_stack* channel_stack() const { return channel_stack_; } - const grpc_channel_args* args() const { return args_; } + const grpc_channel_args* args() const override { return args_; } channelz::SubchannelNode* channelz_subchannel() const { return channelz_subchannel_.get(); } @@ -175,35 +176,10 @@ class SubchannelCall { // A subchannel that knows how to connect to exactly one target address. It // provides a target for load balancing. -// -// Note that this is the "real" subchannel implementation, whose API is -// different from the SubchannelInterface that is exposed to LB policy -// implementations. The client channel provides an adaptor class -// (SubchannelWrapper) that "converts" between the two. class Subchannel { public: - class ConnectivityStateWatcherInterface - : public InternallyRefCounted { - public: - virtual ~ConnectivityStateWatcherInterface() = default; - - // Will be invoked whenever the subchannel's connectivity state - // changes. There will be only one invocation of this method on a - // given watcher instance at any given time. - // - // When the state changes to READY, connected_subchannel will - // contain a ref to the connected subchannel. When it changes from - // READY to some other state, the implementation must release its - // ref to the connected subchannel. - virtual void OnConnectivityStateChange( - grpc_connectivity_state new_state, - RefCountedPtr connected_subchannel) // NOLINT - GRPC_ABSTRACT; - - virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT; - - GRPC_ABSTRACT_BASE_CLASS - }; + typedef SubchannelInterface::ConnectivityStateWatcher + ConnectivityStateWatcher; // The ctor and dtor are not intended to use directly. Subchannel(SubchannelKey* key, grpc_connector* connector, @@ -230,8 +206,6 @@ class Subchannel { // Caller doesn't take ownership. const char* GetTargetAddress(); - const grpc_channel_args* channel_args() const { return args_; } - channelz::SubchannelNode* channelz_node(); // Returns the current connectivity state of the subchannel. @@ -251,15 +225,14 @@ class Subchannel { // changes. // The watcher will be destroyed either when the subchannel is // destroyed or when CancelConnectivityStateWatch() is called. - void WatchConnectivityState( - grpc_connectivity_state initial_state, - UniquePtr health_check_service_name, - OrphanablePtr watcher); + void WatchConnectivityState(grpc_connectivity_state initial_state, + UniquePtr health_check_service_name, + UniquePtr watcher); // Cancels a connectivity state watch. // If the watcher has already been destroyed, this is a no-op. void CancelConnectivityStateWatch(const char* health_check_service_name, - ConnectivityStateWatcherInterface* watcher); + ConnectivityStateWatcher* watcher); // Attempt to connect to the backend. Has no effect if already connected. void AttemptToConnect(); @@ -284,15 +257,14 @@ class Subchannel { grpc_resolved_address* addr); private: - // A linked list of ConnectivityStateWatcherInterfaces that are monitoring - // the subchannel's state. + // A linked list of ConnectivityStateWatchers that are monitoring the + // subchannel's state. class ConnectivityStateWatcherList { public: ~ConnectivityStateWatcherList() { Clear(); } - void AddWatcherLocked( - OrphanablePtr watcher); - void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher); + void AddWatcherLocked(UniquePtr watcher); + void RemoveWatcherLocked(ConnectivityStateWatcher* watcher); // Notifies all watchers in the list about a change to state. void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state); @@ -304,13 +276,12 @@ class Subchannel { private: // TODO(roth): This could be a set instead of a map if we had a set // implementation. - Map> + Map> watchers_; }; - // A map that tracks ConnectivityStateWatcherInterfaces using a particular - // health check service name. + // A map that tracks ConnectivityStateWatchers using a particular health + // check service name. // // There is one entry in the map for each health check service name. // Entries exist only as long as there are watchers using the @@ -320,12 +291,12 @@ class Subchannel { // state READY. class HealthWatcherMap { public: - void AddWatcherLocked( - Subchannel* subchannel, grpc_connectivity_state initial_state, - UniquePtr health_check_service_name, - OrphanablePtr watcher); + void AddWatcherLocked(Subchannel* subchannel, + grpc_connectivity_state initial_state, + UniquePtr health_check_service_name, + UniquePtr watcher); void RemoveWatcherLocked(const char* health_check_service_name, - ConnectivityStateWatcherInterface* watcher); + ConnectivityStateWatcher* watcher); // Notifies the watcher when the subchannel's state changes. void NotifyLocked(grpc_connectivity_state state); diff --git a/src/core/ext/filters/client_channel/subchannel_interface.h b/src/core/ext/filters/client_channel/subchannel_interface.h index 2e448dc5a64..10b1bf124c2 100644 --- a/src/core/ext/filters/client_channel/subchannel_interface.h +++ b/src/core/ext/filters/client_channel/subchannel_interface.h @@ -21,22 +21,42 @@ #include +#include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" namespace grpc_core { -// The interface for subchannels that is exposed to LB policy implementations. +// TODO(roth): In a subsequent PR, remove this from this API. +class ConnectedSubchannelInterface + : public RefCounted { + public: + virtual const grpc_channel_args* args() const GRPC_ABSTRACT; + + protected: + template + explicit ConnectedSubchannelInterface(TraceFlagT* trace_flag = nullptr) + : RefCounted(trace_flag) {} +}; + class SubchannelInterface : public RefCounted { public: - class ConnectivityStateWatcherInterface { + class ConnectivityStateWatcher { public: - virtual ~ConnectivityStateWatcherInterface() = default; + virtual ~ConnectivityStateWatcher() = default; // Will be invoked whenever the subchannel's connectivity state // changes. There will be only one invocation of this method on a // given watcher instance at any given time. - virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) + // + // When the state changes to READY, connected_subchannel will + // contain a ref to the connected subchannel. When it changes from + // READY to some other state, the implementation must release its + // ref to the connected subchannel. + virtual void OnConnectivityStateChange( + grpc_connectivity_state new_state, + RefCountedPtr + connected_subchannel) // NOLINT GRPC_ABSTRACT; // TODO(roth): Remove this as soon as we move to EventManager-based @@ -46,14 +66,12 @@ class SubchannelInterface : public RefCounted { GRPC_ABSTRACT_BASE_CLASS }; - template - explicit SubchannelInterface(TraceFlagT* trace_flag = nullptr) - : RefCounted(trace_flag) {} - virtual ~SubchannelInterface() = default; // Returns the current connectivity state of the subchannel. - virtual grpc_connectivity_state CheckConnectivityState() GRPC_ABSTRACT; + virtual grpc_connectivity_state CheckConnectivityState( + RefCountedPtr* connected_subchannel) + GRPC_ABSTRACT; // Starts watching the subchannel's connectivity state. // The first callback to the watcher will be delivered when the @@ -68,12 +86,12 @@ class SubchannelInterface : public RefCounted { // the previous watcher using CancelConnectivityStateWatch(). virtual void WatchConnectivityState( grpc_connectivity_state initial_state, - UniquePtr watcher) GRPC_ABSTRACT; + UniquePtr watcher) GRPC_ABSTRACT; // Cancels a connectivity state watch. // If the watcher has already been destroyed, this is a no-op. - virtual void CancelConnectivityStateWatch( - ConnectivityStateWatcherInterface* watcher) GRPC_ABSTRACT; + virtual void CancelConnectivityStateWatch(ConnectivityStateWatcher* watcher) + GRPC_ABSTRACT; // Attempt to connect to the backend. Has no effect if already connected. // If the subchannel is currently in backoff delay due to a previously @@ -87,9 +105,6 @@ class SubchannelInterface : public RefCounted { // attempt will be started as soon as AttemptToConnect() is called. virtual void ResetBackoff() GRPC_ABSTRACT; - // TODO(roth): Need a better non-grpc-specific abstraction here. - virtual const grpc_channel_args* channel_args() GRPC_ABSTRACT; - GRPC_ABSTRACT_BASE_CLASS }; diff --git a/src/core/lib/gprpp/map.h b/src/core/lib/gprpp/map.h index 566691df580..36e32d60c07 100644 --- a/src/core/lib/gprpp/map.h +++ b/src/core/lib/gprpp/map.h @@ -30,7 +30,6 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/pair.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" namespace grpc_core { struct StringLess { @@ -42,13 +41,6 @@ struct StringLess { } }; -template -struct RefCountedPtrLess { - bool operator()(const RefCountedPtr& p1, const RefCountedPtr& p2) { - return p1.get() < p2.get(); - } -}; - namespace testing { class MapTest; } @@ -63,28 +55,8 @@ class Map { typedef Compare key_compare; class iterator; - Map() {} ~Map() { clear(); } - // Copying not currently supported. - Map(const Map& other) = delete; - - // Move support. - Map(Map&& other) : root_(other.root_), size_(other.size_) { - other.root_ = nullptr; - other.size_ = 0; - } - Map& operator=(Map&& other) { - if (this != &other) { - clear(); - root_ = other.root_; - size_ = other.size_; - other.root_ = nullptr; - other.size_ = 0; - } - return *this; - } - T& operator[](key_type&& key); T& operator[](const key_type& key); iterator find(const key_type& k); diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc index 7766ee186c6..1523ced16d8 100644 --- a/src/core/lib/transport/metadata.cc +++ b/src/core/lib/transport/metadata.cc @@ -222,12 +222,7 @@ void grpc_mdctx_global_shutdown() { abort(); } } - // For ASAN builds, we don't want to crash here, because that will - // prevent ASAN from providing leak detection information, which is - // far more useful than this simple assertion. -#ifndef GRPC_ASAN_ENABLED GPR_DEBUG_ASSERT(shard->count == 0); -#endif gpr_free(shard->elems); } } diff --git a/test/core/gprpp/map_test.cc b/test/core/gprpp/map_test.cc index 21aeee82486..30d9eb0b207 100644 --- a/test/core/gprpp/map_test.cc +++ b/test/core/gprpp/map_test.cc @@ -437,35 +437,6 @@ TEST_F(MapTest, LowerBound) { EXPECT_EQ(it, test_map.end()); } -// Test move ctor -TEST_F(MapTest, MoveCtor) { - Map test_map; - for (int i = 0; i < 5; i++) { - test_map.emplace(kKeys[i], Payload(i)); - } - Map test_map2 = std::move(test_map); - for (int i = 0; i < 5; i++) { - EXPECT_EQ(test_map.end(), test_map.find(kKeys[i])); - EXPECT_EQ(i, test_map2.find(kKeys[i])->second.data()); - } -} - -// Test move assignment -TEST_F(MapTest, MoveAssignment) { - Map test_map; - for (int i = 0; i < 5; i++) { - test_map.emplace(kKeys[i], Payload(i)); - } - Map test_map2; - test_map2.emplace("xxx", Payload(123)); - test_map2 = std::move(test_map); - for (int i = 0; i < 5; i++) { - EXPECT_EQ(test_map.end(), test_map.find(kKeys[i])); - EXPECT_EQ(i, test_map2.find(kKeys[i])->second.data()); - } - EXPECT_EQ(test_map2.end(), test_map2.find("xxx")); -} - } // namespace testing } // namespace grpc_core diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index 041ce1f45a1..2c1f988d173 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -117,7 +117,7 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy PickResult Pick(PickArgs args) override { PickResult result = delegate_picker_->Pick(args); if (result.type == PickResult::PICK_COMPLETE && - result.subchannel != nullptr) { + result.connected_subchannel != nullptr) { new (args.call_state->Alloc(sizeof(TrailingMetadataHandler))) TrailingMetadataHandler(&result, cb_, user_data_); }