From 7767fbe683fc95f562a6ae1a22c67828e80afebe Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 12 Jun 2019 06:44:36 -0700 Subject: [PATCH] Hide ConnectedSubchannel from LB policy API. --- .../filters/client_channel/client_channel.cc | 420 +++++++++++++----- .../ext/filters/client_channel/lb_policy.h | 2 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 9 +- .../lb_policy/pick_first/pick_first.cc | 30 +- .../lb_policy/round_robin/round_robin.cc | 20 +- .../lb_policy/subchannel_list.h | 126 ++---- .../client_channel/lb_policy/xds/xds.cc | 2 +- .../ext/filters/client_channel/subchannel.cc | 23 +- .../ext/filters/client_channel/subchannel.h | 71 ++- .../client_channel/subchannel_interface.h | 45 +- src/core/lib/gprpp/map.h | 28 ++ src/core/lib/transport/metadata.cc | 5 + test/core/gprpp/map_test.cc | 29 ++ test/core/util/test_lb_policies.cc | 2 +- 14 files changed, 514 insertions(+), 298 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 81562ab2107..19cceb6bf4f 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -147,6 +147,9 @@ class ChannelData { return service_config_; } + RefCountedPtr GetConnectedSubchannelInDataPlane( + SubchannelInterface* subchannel) const; + grpc_connectivity_state CheckConnectivityState(bool try_to_connect); void AddExternalConnectivityWatcher(grpc_polling_entity pollent, grpc_connectivity_state* state, @@ -161,9 +164,9 @@ class ChannelData { } private: + class SubchannelWrapper; class ConnectivityStateAndPickerSetter; class ServiceConfigSetter; - class GrpcSubchannel; class ClientChannelControlHelper; class ExternalConnectivityWatcher { @@ -262,7 +265,14 @@ class ChannelData { UniquePtr health_check_service_name_; RefCountedPtr saved_service_config_; bool received_first_resolver_result_ = false; + // The number of SubchannelWrapper instances referencing a given Subchannel. Map subchannel_refcount_map_; + // Pending ConnectedSubchannel updates for each SubchannelWrapper. + // Updates are queued here in the control plane combiner and then applied + // in the data plane combiner when the picker is updated. + Map, RefCountedPtr, + RefCountedPtrLess> + pending_subchannel_updates_; // // Fields accessed from both data plane and control plane combiners. @@ -706,6 +716,247 @@ class CallData { grpc_metadata_batch send_trailing_metadata_; }; +// +// ChannelData::SubchannelWrapper +// + +// This class is a wrapper for Subchannel that hides details of the +// channel's implementation (such as the health check service name and +// connected subchannel) from the LB policy API. +// +// Note that no synchronization is needed here, because even if the +// underlying subchannel is shared between channels, this wrapper will only +// be used within one channel, so it will always be synchronized by the +// control plane combiner. +class ChannelData::SubchannelWrapper : public SubchannelInterface { + public: + SubchannelWrapper(ChannelData* chand, Subchannel* subchannel, + UniquePtr health_check_service_name) + : SubchannelInterface(&grpc_client_channel_routing_trace), + chand_(chand), + subchannel_(subchannel), + health_check_service_name_(std::move(health_check_service_name)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: creating subchannel wrapper %p for subchannel %p", + chand, this, subchannel_); + } + GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper"); + auto* subchannel_node = subchannel_->channelz_node(); + if (subchannel_node != nullptr) { + intptr_t subchannel_uuid = subchannel_node->uuid(); + auto it = chand_->subchannel_refcount_map_.find(subchannel_); + if (it == chand_->subchannel_refcount_map_.end()) { + chand_->channelz_node_->AddChildSubchannel(subchannel_uuid); + it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first; + } + ++it->second; + } + } + + ~SubchannelWrapper() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: destroying subchannel wrapper %p for subchannel %p", + chand_, this, subchannel_); + } + auto* subchannel_node = subchannel_->channelz_node(); + if (subchannel_node != nullptr) { + intptr_t subchannel_uuid = subchannel_node->uuid(); + auto it = chand_->subchannel_refcount_map_.find(subchannel_); + GPR_ASSERT(it != chand_->subchannel_refcount_map_.end()); + --it->second; + if (it->second == 0) { + chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid); + chand_->subchannel_refcount_map_.erase(it); + } + } + GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB"); + GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper"); + } + + grpc_connectivity_state CheckConnectivityState() override { + RefCountedPtr connected_subchannel; + grpc_connectivity_state connectivity_state = + subchannel_->CheckConnectivityState(health_check_service_name_.get(), + &connected_subchannel); + MaybeUpdateConnectedSubchannel(std::move(connected_subchannel)); + return connectivity_state; + } + + void WatchConnectivityState( + grpc_connectivity_state initial_state, + UniquePtr watcher) override { + auto& watcher_wrapper = watcher_map_[watcher.get()]; + GPR_ASSERT(watcher_wrapper == nullptr); + watcher_wrapper = New( + std::move(watcher), Ref(DEBUG_LOCATION, "WatcherWrapper")); + subchannel_->WatchConnectivityState( + initial_state, + UniquePtr(gpr_strdup(health_check_service_name_.get())), + OrphanablePtr( + watcher_wrapper)); + } + + void CancelConnectivityStateWatch( + ConnectivityStateWatcherInterface* watcher) override { + auto it = watcher_map_.find(watcher); + GPR_ASSERT(it != watcher_map_.end()); + subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(), + it->second); + watcher_map_.erase(it); + } + + void AttemptToConnect() override { subchannel_->AttemptToConnect(); } + + void ResetBackoff() override { subchannel_->ResetBackoff(); } + + const grpc_channel_args* channel_args() override { + return subchannel_->channel_args(); + } + + // Caller must be holding the control-plane combiner. + ConnectedSubchannel* connected_subchannel() const { + return connected_subchannel_.get(); + } + + // Caller must be holding the data-plane combiner. + ConnectedSubchannel* connected_subchannel_in_data_plane() const { + return connected_subchannel_in_data_plane_.get(); + } + void set_connected_subchannel_in_data_plane( + RefCountedPtr connected_subchannel) { + connected_subchannel_in_data_plane_ = std::move(connected_subchannel); + } + + private: + // Subchannel and SubchannelInterface have different interfaces for + // their respective ConnectivityStateWatcherInterface classes. + // The one in Subchannel updates the ConnectedSubchannel along with + // the state, whereas the one in SubchannelInterface does not expose + // the ConnectedSubchannel. + // + // This wrapper provides a bridge between the two. It implements + // Subchannel::ConnectivityStateWatcherInterface and wraps + // the instance of SubchannelInterface::ConnectivityStateWatcherInterface + // that was passed in by the LB policy. We pass an instance of this + // class to the underlying Subchannel, and when we get updates from + // the subchannel, we pass those on to the wrapped watcher to return + // the update to the LB policy. This allows us to set the connected + // subchannel before passing the result back to the LB policy. + class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface { + public: + WatcherWrapper( + UniquePtr + watcher, + RefCountedPtr parent) + : watcher_(std::move(watcher)), parent_(std::move(parent)) {} + + ~WatcherWrapper() { parent_.reset(DEBUG_LOCATION, "WatcherWrapper"); } + + void Orphan() override { Unref(); } + + void OnConnectivityStateChange( + grpc_connectivity_state new_state, + RefCountedPtr connected_subchannel) override { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: connectivity change for subchannel wrapper %p " + "subchannel %p (connected_subchannel=%p state=%s); " + "hopping into combiner", + parent_->chand_, parent_.get(), parent_->subchannel_, + connected_subchannel.get(), + grpc_connectivity_state_name(new_state)); + } + // Will delete itself. + New(Ref(), new_state, std::move(connected_subchannel)); + } + + grpc_pollset_set* interested_parties() override { + return watcher_->interested_parties(); + } + + private: + class Updater { + public: + Updater(RefCountedPtr parent, + grpc_connectivity_state new_state, + RefCountedPtr connected_subchannel) + : parent_(std::move(parent)), + state_(new_state), + connected_subchannel_(std::move(connected_subchannel)) { + GRPC_CLOSURE_INIT( + &closure_, ApplyUpdateInControlPlaneCombiner, this, + grpc_combiner_scheduler(parent_->parent_->chand_->combiner_)); + GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); + } + + private: + static void ApplyUpdateInControlPlaneCombiner(void* arg, + grpc_error* error) { + Updater* self = static_cast(arg); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: processing connectivity change in combiner " + "for subchannel wrapper %p subchannel %p " + "(connected_subchannel=%p state=%s)", + self->parent_->parent_->chand_, self->parent_->parent_.get(), + self->parent_->parent_->subchannel_, + self->connected_subchannel_.get(), + grpc_connectivity_state_name(self->state_)); + } + self->parent_->parent_->MaybeUpdateConnectedSubchannel( + std::move(self->connected_subchannel_)); + self->parent_->watcher_->OnConnectivityStateChange(self->state_); + Delete(self); + } + + RefCountedPtr parent_; + grpc_connectivity_state state_; + RefCountedPtr connected_subchannel_; + grpc_closure closure_; + }; + + UniquePtr watcher_; + RefCountedPtr parent_; + }; + + void MaybeUpdateConnectedSubchannel( + RefCountedPtr connected_subchannel) { + // Update the connected subchannel only if the channel is not shutting + // down. This is because once the channel is shutting down, we + // ignore picker updates from the LB policy, which means that + // ConnectivityStateAndPickerSetter will never process the entries + // in chand_->pending_subchannel_updates_. So we don't want to add + // entries there that will never be processed, since that would + // leave dangling refs to the channel and prevent its destruction. + grpc_error* disconnect_error = chand_->disconnect_error(); + if (disconnect_error != GRPC_ERROR_NONE) return; + // Not shutting down, so do the update. + if (connected_subchannel_ != connected_subchannel) { + connected_subchannel_ = std::move(connected_subchannel); + // Record the new connected subchannel so that it can be updated + // in the data plane combiner the next time the picker is updated. + chand_->pending_subchannel_updates_[Ref( + DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_; + } + } + + ChannelData* chand_; + Subchannel* subchannel_; + UniquePtr health_check_service_name_; + // Maps from the address of the watcher passed to us by the LB policy + // to the address of the WrapperWatcher that we passed to the underlying + // subchannel. This is needed so that when the LB policy calls + // CancelConnectivityStateWatch() with its watcher, we know the + // corresponding WrapperWatcher to cancel on the underlying subchannel. + Map watcher_map_; + // To be accessed only in the control plane combiner. + RefCountedPtr connected_subchannel_; + // To be accessed only in the data plane combiner. + RefCountedPtr connected_subchannel_in_data_plane_; +}; + // // ChannelData::ConnectivityStateAndPickerSetter // @@ -729,10 +980,13 @@ class ChannelData::ConnectivityStateAndPickerSetter { grpc_slice_from_static_string( GetChannelConnectivityStateChangeString(state))); } + // Grab any pending subchannel updates. + pending_subchannel_updates_ = + std::move(chand_->pending_subchannel_updates_); // Bounce into the data plane combiner to reset the picker. GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ConnectivityStateAndPickerSetter"); - GRPC_CLOSURE_INIT(&closure_, SetPicker, this, + GRPC_CLOSURE_INIT(&closure_, SetPickerInDataPlane, this, grpc_combiner_scheduler(chand->data_plane_combiner_)); GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); } @@ -755,16 +1009,38 @@ class ChannelData::ConnectivityStateAndPickerSetter { GPR_UNREACHABLE_CODE(return "UNKNOWN"); } - static void SetPicker(void* arg, grpc_error* ignored) { + static void SetPickerInDataPlane(void* arg, grpc_error* ignored) { auto* self = static_cast(arg); - // Update picker. - self->chand_->picker_ = std::move(self->picker_); + // Handle subchannel updates. + for (auto& p : self->pending_subchannel_updates_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: updating subchannel wrapper %p data plane " + "connected_subchannel to %p", + self->chand_, p.first.get(), p.second.get()); + } + p.first->set_connected_subchannel_in_data_plane(std::move(p.second)); + } + // Swap out the picker. We hang on to the old picker so that it can + // be deleted in the control-plane combiner, since that's where we need + // to unref the subchannel wrappers that are reffed by the picker. + self->picker_.swap(self->chand_->picker_); // Re-process queued picks. for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr; pick = pick->next) { CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE); } - // Clean up. + // Pop back into the control plane combiner to delete ourself, so + // that we make sure to unref subchannel wrappers there. This + // includes both the ones reffed by the old picker (now stored in + // self->picker_) and the ones in self->pending_subchannel_updates_. + GRPC_CLOSURE_INIT(&self->closure_, CleanUpInControlPlane, self, + grpc_combiner_scheduler(self->chand_->combiner_)); + GRPC_CLOSURE_SCHED(&self->closure_, GRPC_ERROR_NONE); + } + + static void CleanUpInControlPlane(void* arg, grpc_error* ignored) { + auto* self = static_cast(arg); GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, "ConnectivityStateAndPickerSetter"); Delete(self); @@ -772,6 +1048,9 @@ class ChannelData::ConnectivityStateAndPickerSetter { ChannelData* chand_; UniquePtr picker_; + Map, RefCountedPtr, + RefCountedPtrLess> + pending_subchannel_updates_; grpc_closure closure_; }; @@ -946,89 +1225,6 @@ void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked( &self->chand_->state_tracker_, self->state_, &self->my_closure_); } -// -// ChannelData::GrpcSubchannel -// - -// This class is a wrapper for Subchannel that hides details of the -// channel's implementation (such as the health check service name) from -// the LB policy API. -// -// Note that no synchronization is needed here, because even if the -// underlying subchannel is shared between channels, this wrapper will only -// be used within one channel, so it will always be synchronized by the -// control plane combiner. -class ChannelData::GrpcSubchannel : public SubchannelInterface { - public: - GrpcSubchannel(ChannelData* chand, Subchannel* subchannel, - UniquePtr health_check_service_name) - : chand_(chand), - subchannel_(subchannel), - health_check_service_name_(std::move(health_check_service_name)) { - GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "GrpcSubchannel"); - auto* subchannel_node = subchannel_->channelz_node(); - if (subchannel_node != nullptr) { - intptr_t subchannel_uuid = subchannel_node->uuid(); - auto it = chand_->subchannel_refcount_map_.find(subchannel_); - if (it == chand_->subchannel_refcount_map_.end()) { - chand_->channelz_node_->AddChildSubchannel(subchannel_uuid); - it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first; - } - ++it->second; - } - } - - ~GrpcSubchannel() { - auto* subchannel_node = subchannel_->channelz_node(); - if (subchannel_node != nullptr) { - intptr_t subchannel_uuid = subchannel_node->uuid(); - auto it = chand_->subchannel_refcount_map_.find(subchannel_); - GPR_ASSERT(it != chand_->subchannel_refcount_map_.end()); - --it->second; - if (it->second == 0) { - chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid); - chand_->subchannel_refcount_map_.erase(it); - } - } - GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB"); - GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "GrpcSubchannel"); - } - - grpc_connectivity_state CheckConnectivityState( - RefCountedPtr* connected_subchannel) - override { - RefCountedPtr tmp; - auto retval = subchannel_->CheckConnectivityState( - health_check_service_name_.get(), &tmp); - *connected_subchannel = std::move(tmp); - return retval; - } - - void WatchConnectivityState( - grpc_connectivity_state initial_state, - UniquePtr watcher) override { - subchannel_->WatchConnectivityState( - initial_state, - UniquePtr(gpr_strdup(health_check_service_name_.get())), - std::move(watcher)); - } - - void CancelConnectivityStateWatch( - ConnectivityStateWatcher* watcher) override { - subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(), - watcher); - } - - void AttemptToConnect() override { subchannel_->AttemptToConnect(); } - - void ResetBackoff() override { subchannel_->ResetBackoff(); } - - private: - ChannelData* chand_; - Subchannel* subchannel_; - UniquePtr health_check_service_name_; -}; - // // ChannelData::ClientChannelControlHelper // @@ -1066,8 +1262,8 @@ class ChannelData::ClientChannelControlHelper chand_->client_channel_factory_->CreateSubchannel(new_args); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) return nullptr; - return MakeRefCounted(chand_, subchannel, - std::move(health_check_service_name)); + return MakeRefCounted( + chand_, subchannel, std::move(health_check_service_name)); } grpc_channel* CreateChannel(const char* target, @@ -1078,8 +1274,7 @@ class ChannelData::ClientChannelControlHelper void UpdateState( grpc_connectivity_state state, UniquePtr picker) override { - grpc_error* disconnect_error = - chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE); + grpc_error* disconnect_error = chand_->disconnect_error(); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { const char* extra = disconnect_error == GRPC_ERROR_NONE ? "" @@ -1444,9 +1639,13 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { } LoadBalancingPolicy::PickResult result = picker_->Pick(LoadBalancingPolicy::PickArgs()); - if (result.connected_subchannel != nullptr) { - ConnectedSubchannel* connected_subchannel = - static_cast(result.connected_subchannel.get()); + ConnectedSubchannel* connected_subchannel = nullptr; + if (result.subchannel != nullptr) { + SubchannelWrapper* subchannel = + static_cast(result.subchannel.get()); + connected_subchannel = subchannel->connected_subchannel(); + } + if (connected_subchannel != nullptr) { connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack); } else { if (result.error == GRPC_ERROR_NONE) { @@ -1489,6 +1688,10 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) { } // Disconnect. if (op->disconnect_with_error != GRPC_ERROR_NONE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { + gpr_log(GPR_INFO, "chand=%p: channel shut down from API: %s", chand, + grpc_error_string(op->disconnect_with_error)); + } grpc_error* error = GRPC_ERROR_NONE; GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong( &error, op->disconnect_with_error, MemoryOrder::ACQ_REL, @@ -1563,6 +1766,17 @@ void ChannelData::RemoveQueuedPick(QueuedPick* to_remove, } } +RefCountedPtr +ChannelData::GetConnectedSubchannelInDataPlane( + SubchannelInterface* subchannel) const { + SubchannelWrapper* subchannel_wrapper = + static_cast(subchannel); + ConnectedSubchannel* connected_subchannel = + subchannel_wrapper->connected_subchannel_in_data_plane(); + if (connected_subchannel == nullptr) return nullptr; + return connected_subchannel->Ref(); +} + void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) { auto* chand = static_cast(arg); if (chand->resolving_lb_policy_ != nullptr) { @@ -3490,10 +3704,9 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { auto result = chand->picker()->Pick(pick_args); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, - "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, " - "error=%s)", + "chand=%p calld=%p: LB pick returned %s (subchannel=%p, error=%s)", chand, calld, PickResultTypeName(result.type), - result.connected_subchannel.get(), grpc_error_string(result.error)); + result.subchannel.get(), grpc_error_string(result.error)); } switch (result.type) { case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: { @@ -3535,11 +3748,16 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { break; default: // PICK_COMPLETE // Handle drops. - if (GPR_UNLIKELY(result.connected_subchannel == nullptr)) { + if (GPR_UNLIKELY(result.subchannel == nullptr)) { result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); + } else { + // Grab a ref to the connected subchannel while we're still + // holding the data plane combiner. + calld->connected_subchannel_ = + chand->GetConnectedSubchannelInDataPlane(result.subchannel.get()); + GPR_ASSERT(calld->connected_subchannel_ != nullptr); } - calld->connected_subchannel_ = std::move(result.connected_subchannel); calld->lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready; calld->lb_recv_trailing_metadata_ready_user_data_ = diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index d508932015d..ccaeffe269c 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -128,7 +128,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Used only if type is PICK_COMPLETE. Will be set to the selected /// subchannel, or nullptr if the LB policy decides to drop the call. - RefCountedPtr connected_subchannel; + RefCountedPtr subchannel; /// Used only if type is PICK_TRANSIENT_FAILURE. /// Error to be set when returning a transient failure. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index a87dfda7321..dad10f0ce86 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -575,13 +575,12 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { result = child_picker_->Pick(args); // If pick succeeded, add LB token to initial metadata. if (result.type == PickResult::PICK_COMPLETE && - result.connected_subchannel != nullptr) { + result.subchannel != nullptr) { const grpc_arg* arg = grpc_channel_args_find( - result.connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN); + result.subchannel->channel_args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN); if (arg == nullptr) { - gpr_log(GPR_ERROR, - "[grpclb %p picker %p] No LB token for connected subchannel %p", - parent_, this, result.connected_subchannel.get()); + gpr_log(GPR_ERROR, "[grpclb %p picker %p] No LB token for subchannel %p", + parent_, this, result.subchannel.get()); abort(); } grpc_mdelem lb_token = {reinterpret_cast(arg->value.pointer.p)}; diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index e4b58eb7558..de17b1f046e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -28,7 +28,6 @@ #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -85,9 +84,8 @@ class PickFirst : public LoadBalancingPolicy { public: PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer, const ServerAddressList& addresses, - grpc_combiner* combiner, const grpc_channel_args& args) - : SubchannelList(policy, tracer, addresses, combiner, + : SubchannelList(policy, tracer, addresses, policy->channel_control_helper(), args) { // Need to maintain a ref to the LB policy as long as we maintain // any references to subchannels, since the subchannels' @@ -111,19 +109,18 @@ class PickFirst : public LoadBalancingPolicy { class Picker : public SubchannelPicker { public: - explicit Picker( - RefCountedPtr connected_subchannel) - : connected_subchannel_(std::move(connected_subchannel)) {} + explicit Picker(RefCountedPtr subchannel) + : subchannel_(std::move(subchannel)) {} PickResult Pick(PickArgs args) override { PickResult result; result.type = PickResult::PICK_COMPLETE; - result.connected_subchannel = connected_subchannel_; + result.subchannel = subchannel_; return result; } private: - RefCountedPtr connected_subchannel_; + RefCountedPtr subchannel_; }; void ShutdownLocked() override; @@ -166,6 +163,9 @@ void PickFirst::ShutdownLocked() { void PickFirst::ExitIdleLocked() { if (shutdown_) return; if (idle_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Pick First %p exiting idle", this); + } idle_ = false; if (subchannel_list_ == nullptr || subchannel_list_->num_subchannels() == 0) { @@ -200,7 +200,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) { grpc_channel_args* new_args = grpc_channel_args_copy_and_add(args.args, &new_arg, 1); auto subchannel_list = MakeOrphanable( - this, &grpc_lb_pick_first_trace, args.addresses, combiner(), *new_args); + this, &grpc_lb_pick_first_trace, args.addresses, *new_args); grpc_channel_args_destroy(new_args); if (subchannel_list->num_subchannels() == 0) { // Empty update or no valid subchannels. Unsubscribe from all current @@ -350,8 +350,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // some connectivity state notifications. if (connectivity_state == GRPC_CHANNEL_READY) { p->channel_control_helper()->UpdateState( - GRPC_CHANNEL_READY, UniquePtr(New( - connected_subchannel()->Ref()))); + GRPC_CHANNEL_READY, + UniquePtr(New(subchannel()->Ref()))); } else { // CONNECTING p->channel_control_helper()->UpdateState( connectivity_state, @@ -445,13 +445,13 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); } // Cases 1 and 2. - p->selected_ = this; - p->channel_control_helper()->UpdateState( - GRPC_CHANNEL_READY, - UniquePtr(New(connected_subchannel()->Ref()))); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel()); } + p->selected_ = this; + p->channel_control_helper()->UpdateState( + GRPC_CHANNEL_READY, + UniquePtr(New(subchannel()->Ref()))); } void PickFirst::PickFirstSubchannelData:: diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 17c65d8b551..02bd319e37e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -38,7 +38,6 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" @@ -106,9 +105,8 @@ class RoundRobin : public LoadBalancingPolicy { public: RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer, const ServerAddressList& addresses, - grpc_combiner* combiner, const grpc_channel_args& args) - : SubchannelList(policy, tracer, addresses, combiner, + : SubchannelList(policy, tracer, addresses, policy->channel_control_helper(), args) { // Need to maintain a ref to the LB policy as long as we maintain // any references to subchannels, since the subchannels' @@ -155,7 +153,7 @@ class RoundRobin : public LoadBalancingPolicy { RoundRobin* parent_; size_t last_picked_index_; - InlinedVector, 10> subchannels_; + InlinedVector, 10> subchannels_; }; void ShutdownLocked() override; @@ -180,10 +178,9 @@ RoundRobin::Picker::Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list) : parent_(parent) { for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { - auto* connected_subchannel = - subchannel_list->subchannel(i)->connected_subchannel(); - if (connected_subchannel != nullptr) { - subchannels_.push_back(connected_subchannel->Ref()); + RoundRobinSubchannelData* sd = subchannel_list->subchannel(i); + if (sd->connectivity_state() == GRPC_CHANNEL_READY) { + subchannels_.push_back(sd->subchannel()->Ref()); } } // For discussion on why we generate a random starting index for @@ -204,14 +201,13 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) { last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, - "[RR %p picker %p] returning index %" PRIuPTR - ", connected_subchannel=%p", + "[RR %p picker %p] returning index %" PRIuPTR ", subchannel=%p", parent_, this, last_picked_index_, subchannels_[last_picked_index_].get()); } PickResult result; result.type = PickResult::PICK_COMPLETE; - result.connected_subchannel = subchannels_[last_picked_index_]; + result.subchannel = subchannels_[last_picked_index_]; return result; } @@ -424,7 +420,7 @@ void RoundRobin::UpdateLocked(UpdateArgs args) { } } latest_pending_subchannel_list_ = MakeOrphanable( - this, &grpc_lb_round_robin_trace, args.addresses, combiner(), *args.args); + this, &grpc_lb_round_robin_trace, args.addresses, *args.args); if (latest_pending_subchannel_list_->num_subchannels() == 0) { // If the new list is empty, immediately promote the new list to the // current list and transition to TRANSIENT_FAILURE. diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 7d70928a83c..34cd0f549fe 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -39,7 +39,6 @@ #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -64,8 +63,7 @@ class MySubchannelList }; */ -// All methods with a Locked() suffix must be called from within the -// client_channel combiner. +// All methods will be called from within the client_channel combiner. namespace grpc_core { @@ -93,20 +91,13 @@ class SubchannelData { // Returns a pointer to the subchannel. SubchannelInterface* subchannel() const { return subchannel_.get(); } - // Returns the connected subchannel. Will be null if the subchannel - // is not connected. - ConnectedSubchannelInterface* connected_subchannel() const { - return connected_subchannel_.get(); - } - // Synchronously checks the subchannel's connectivity state. // Must not be called while there is a connectivity notification // pending (i.e., between calling StartConnectivityWatchLocked() and // calling CancelConnectivityWatchLocked()). grpc_connectivity_state CheckConnectivityStateLocked() { GPR_ASSERT(pending_watcher_ == nullptr); - connectivity_state_ = - subchannel()->CheckConnectivityState(&connected_subchannel_); + connectivity_state_ = subchannel_->CheckConnectivityState(); return connectivity_state_; } @@ -144,7 +135,8 @@ class SubchannelData { private: // Watcher for subchannel connectivity state. - class Watcher : public SubchannelInterface::ConnectivityStateWatcher { + class Watcher + : public SubchannelInterface::ConnectivityStateWatcherInterface { public: Watcher( SubchannelData* subchannel_data, @@ -154,42 +146,13 @@ class SubchannelData { ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); } - void OnConnectivityStateChange(grpc_connectivity_state new_state, - RefCountedPtr - connected_subchannel) override; + void OnConnectivityStateChange(grpc_connectivity_state new_state) override; grpc_pollset_set* interested_parties() override { return subchannel_list_->policy()->interested_parties(); } private: - // A fire-and-forget class that bounces into the combiner to process - // a connectivity state update. - class Updater { - public: - Updater( - SubchannelData* - subchannel_data, - RefCountedPtr> - subchannel_list, - grpc_connectivity_state state, - RefCountedPtr connected_subchannel); - - ~Updater() { - subchannel_list_.reset(DEBUG_LOCATION, "Watcher::Updater dtor"); - } - - private: - static void OnUpdateLocked(void* arg, grpc_error* error); - - SubchannelData* subchannel_data_; - RefCountedPtr> - subchannel_list_; - const grpc_connectivity_state state_; - RefCountedPtr connected_subchannel_; - grpc_closure closure_; - }; - SubchannelData* subchannel_data_; RefCountedPtr subchannel_list_; }; @@ -202,10 +165,10 @@ class SubchannelData { // The subchannel. RefCountedPtr subchannel_; // Will be non-null when the subchannel's state is being watched. - SubchannelInterface::ConnectivityStateWatcher* pending_watcher_ = nullptr; + SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ = + nullptr; // Data updated by the watcher. grpc_connectivity_state connectivity_state_; - RefCountedPtr connected_subchannel_; }; // A list of subchannels. @@ -232,7 +195,6 @@ class SubchannelList : public InternallyRefCounted { // the backoff code out of subchannels and into LB policies. void ResetBackoffLocked(); - // Note: Caller must ensure that this is invoked inside of the combiner. void Orphan() override { ShutdownLocked(); InternallyRefCounted::Unref(DEBUG_LOCATION, "shutdown"); @@ -242,7 +204,7 @@ class SubchannelList : public InternallyRefCounted { protected: SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer, - const ServerAddressList& addresses, grpc_combiner* combiner, + const ServerAddressList& addresses, LoadBalancingPolicy::ChannelControlHelper* helper, const grpc_channel_args& args); @@ -263,8 +225,6 @@ class SubchannelList : public InternallyRefCounted { TraceFlag* tracer_; - grpc_combiner* combiner_; - // The list of subchannels. SubchannelVector subchannels_; @@ -284,59 +244,26 @@ class SubchannelList : public InternallyRefCounted { template void SubchannelData::Watcher:: - OnConnectivityStateChange( - grpc_connectivity_state new_state, - RefCountedPtr connected_subchannel) { - // Will delete itself. - New(subchannel_data_, - subchannel_list_->Ref(DEBUG_LOCATION, "Watcher::Updater"), - new_state, std::move(connected_subchannel)); -} - -template -SubchannelData::Watcher::Updater:: - Updater( - SubchannelData* subchannel_data, - RefCountedPtr> - subchannel_list, - grpc_connectivity_state state, - RefCountedPtr connected_subchannel) - : subchannel_data_(subchannel_data), - subchannel_list_(std::move(subchannel_list)), - state_(state), - connected_subchannel_(std::move(connected_subchannel)) { - GRPC_CLOSURE_INIT(&closure_, &OnUpdateLocked, this, - grpc_combiner_scheduler(subchannel_list_->combiner_)); - GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); -} - -template -void SubchannelData::Watcher::Updater:: - OnUpdateLocked(void* arg, grpc_error* error) { - Updater* self = static_cast(arg); - SubchannelData* sd = self->subchannel_data_; - if (GRPC_TRACE_FLAG_ENABLED(*sd->subchannel_list_->tracer())) { + OnConnectivityStateChange(grpc_connectivity_state new_state) { + if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) { gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): connectivity changed: state=%s, " - "connected_subchannel=%p, shutting_down=%d, pending_watcher=%p", - sd->subchannel_list_->tracer()->name(), - sd->subchannel_list_->policy(), sd->subchannel_list_, sd->Index(), - sd->subchannel_list_->num_subchannels(), sd->subchannel_.get(), - grpc_connectivity_state_name(self->state_), - self->connected_subchannel_.get(), - sd->subchannel_list_->shutting_down(), sd->pending_watcher_); + "shutting_down=%d, pending_watcher=%p", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_.get(), subchannel_data_->Index(), + subchannel_list_->num_subchannels(), + subchannel_data_->subchannel_.get(), + grpc_connectivity_state_name(new_state), + subchannel_list_->shutting_down(), + subchannel_data_->pending_watcher_); } - if (!sd->subchannel_list_->shutting_down() && - sd->pending_watcher_ != nullptr) { - sd->connectivity_state_ = self->state_; - // Get or release ref to connected subchannel. - sd->connected_subchannel_ = std::move(self->connected_subchannel_); + if (!subchannel_list_->shutting_down() && + subchannel_data_->pending_watcher_ != nullptr) { + subchannel_data_->connectivity_state_ = new_state; // Call the subclass's ProcessConnectivityChangeLocked() method. - sd->ProcessConnectivityChangeLocked(sd->connectivity_state_); + subchannel_data_->ProcessConnectivityChangeLocked(new_state); } - // Clean up. - Delete(self); } // @@ -371,7 +298,6 @@ void SubchannelData:: subchannel_.get()); } subchannel_.reset(); - connected_subchannel_.reset(); } } @@ -400,7 +326,7 @@ void SubchannelData(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher")); subchannel_->WatchConnectivityState( connectivity_state_, - UniquePtr( + UniquePtr( pending_watcher_)); } @@ -434,13 +360,12 @@ void SubchannelData::ShutdownLocked() { template SubchannelList::SubchannelList( LoadBalancingPolicy* policy, TraceFlag* tracer, - const ServerAddressList& addresses, grpc_combiner* combiner, + const ServerAddressList& addresses, LoadBalancingPolicy::ChannelControlHelper* helper, const grpc_channel_args& args) : InternallyRefCounted(tracer), policy_(policy), - tracer_(tracer), - combiner_(GRPC_COMBINER_REF(combiner, "subchannel_list")) { + tracer_(tracer) { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", @@ -509,7 +434,6 @@ SubchannelList::~SubchannelList() { gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(), policy_, this); } - GRPC_COMBINER_UNREF(combiner_, "subchannel_list"); } template diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 136d85bba14..ad030c17b5e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -554,7 +554,7 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) { PickResult result = PickFromLocality(key, args); // If pick succeeded, add client stats. if (result.type == PickResult::PICK_COMPLETE && - result.connected_subchannel != nullptr && client_stats_ != nullptr) { + result.subchannel != nullptr && client_stats_ != nullptr) { // TODO(roth): Add support for client stats. } return result; diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index dd16eded826..99c7721e5e5 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -86,7 +86,7 @@ ConnectedSubchannel::ConnectedSubchannel( grpc_channel_stack* channel_stack, const grpc_channel_args* args, RefCountedPtr channelz_subchannel, intptr_t socket_uuid) - : ConnectedSubchannelInterface(&grpc_trace_subchannel_refcount), + : RefCounted(&grpc_trace_subchannel_refcount), channel_stack_(channel_stack), args_(grpc_channel_args_copy(args)), channelz_subchannel_(std::move(channelz_subchannel)), @@ -378,12 +378,12 @@ class Subchannel::ConnectedSubchannelStateWatcher { // void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked( - UniquePtr watcher) { + OrphanablePtr watcher) { watchers_.insert(MakePair(watcher.get(), std::move(watcher))); } void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked( - ConnectivityStateWatcher* watcher) { + ConnectivityStateWatcherInterface* watcher) { watchers_.erase(watcher); } @@ -438,8 +438,9 @@ class Subchannel::HealthWatcherMap::HealthWatcher grpc_connectivity_state state() const { return state_; } - void AddWatcherLocked(grpc_connectivity_state initial_state, - UniquePtr watcher) { + void AddWatcherLocked( + grpc_connectivity_state initial_state, + OrphanablePtr watcher) { if (state_ != initial_state) { RefCountedPtr connected_subchannel; if (state_ == GRPC_CHANNEL_READY) { @@ -451,7 +452,7 @@ class Subchannel::HealthWatcherMap::HealthWatcher watcher_list_.AddWatcherLocked(std::move(watcher)); } - void RemoveWatcherLocked(ConnectivityStateWatcher* watcher) { + void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher) { watcher_list_.RemoveWatcherLocked(watcher); } @@ -527,7 +528,7 @@ class Subchannel::HealthWatcherMap::HealthWatcher void Subchannel::HealthWatcherMap::AddWatcherLocked( Subchannel* subchannel, grpc_connectivity_state initial_state, UniquePtr health_check_service_name, - UniquePtr watcher) { + OrphanablePtr watcher) { // If the health check service name is not already present in the map, // add it. auto it = map_.find(health_check_service_name.get()); @@ -546,7 +547,8 @@ void Subchannel::HealthWatcherMap::AddWatcherLocked( } void Subchannel::HealthWatcherMap::RemoveWatcherLocked( - const char* health_check_service_name, ConnectivityStateWatcher* watcher) { + const char* health_check_service_name, + ConnectivityStateWatcherInterface* watcher) { auto it = map_.find(health_check_service_name); GPR_ASSERT(it != map_.end()); it->second->RemoveWatcherLocked(watcher); @@ -818,7 +820,7 @@ grpc_connectivity_state Subchannel::CheckConnectivityState( void Subchannel::WatchConnectivityState( grpc_connectivity_state initial_state, UniquePtr health_check_service_name, - UniquePtr watcher) { + OrphanablePtr watcher) { MutexLock lock(&mu_); grpc_pollset_set* interested_parties = watcher->interested_parties(); if (interested_parties != nullptr) { @@ -837,7 +839,8 @@ void Subchannel::WatchConnectivityState( } void Subchannel::CancelConnectivityStateWatch( - const char* health_check_service_name, ConnectivityStateWatcher* watcher) { + const char* health_check_service_name, + ConnectivityStateWatcherInterface* watcher) { MutexLock lock(&mu_); grpc_pollset_set* interested_parties = watcher->interested_parties(); if (interested_parties != nullptr) { diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 2f05792b872..9e8de767839 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -23,7 +23,6 @@ #include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/connector.h" -#include "src/core/ext/filters/client_channel/subchannel_interface.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_stack.h" @@ -70,7 +69,7 @@ namespace grpc_core { class SubchannelCall; -class ConnectedSubchannel : public ConnectedSubchannelInterface { +class ConnectedSubchannel : public RefCounted { public: struct CallArgs { grpc_polling_entity* pollent; @@ -97,7 +96,7 @@ class ConnectedSubchannel : public ConnectedSubchannelInterface { grpc_error** error); grpc_channel_stack* channel_stack() const { return channel_stack_; } - const grpc_channel_args* args() const override { return args_; } + const grpc_channel_args* args() const { return args_; } channelz::SubchannelNode* channelz_subchannel() const { return channelz_subchannel_.get(); } @@ -176,10 +175,35 @@ class SubchannelCall { // A subchannel that knows how to connect to exactly one target address. It // provides a target for load balancing. +// +// Note that this is the "real" subchannel implementation, whose API is +// different from the SubchannelInterface that is exposed to LB policy +// implementations. The client channel provides an adaptor class +// (SubchannelWrapper) that "converts" between the two. class Subchannel { public: - typedef SubchannelInterface::ConnectivityStateWatcher - ConnectivityStateWatcher; + class ConnectivityStateWatcherInterface + : public InternallyRefCounted { + public: + virtual ~ConnectivityStateWatcherInterface() = default; + + // Will be invoked whenever the subchannel's connectivity state + // changes. There will be only one invocation of this method on a + // given watcher instance at any given time. + // + // When the state changes to READY, connected_subchannel will + // contain a ref to the connected subchannel. When it changes from + // READY to some other state, the implementation must release its + // ref to the connected subchannel. + virtual void OnConnectivityStateChange( + grpc_connectivity_state new_state, + RefCountedPtr connected_subchannel) // NOLINT + GRPC_ABSTRACT; + + virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS + }; // The ctor and dtor are not intended to use directly. Subchannel(SubchannelKey* key, grpc_connector* connector, @@ -206,6 +230,8 @@ class Subchannel { // Caller doesn't take ownership. const char* GetTargetAddress(); + const grpc_channel_args* channel_args() const { return args_; } + channelz::SubchannelNode* channelz_node(); // Returns the current connectivity state of the subchannel. @@ -225,14 +251,15 @@ class Subchannel { // changes. // The watcher will be destroyed either when the subchannel is // destroyed or when CancelConnectivityStateWatch() is called. - void WatchConnectivityState(grpc_connectivity_state initial_state, - UniquePtr health_check_service_name, - UniquePtr watcher); + void WatchConnectivityState( + grpc_connectivity_state initial_state, + UniquePtr health_check_service_name, + OrphanablePtr watcher); // Cancels a connectivity state watch. // If the watcher has already been destroyed, this is a no-op. void CancelConnectivityStateWatch(const char* health_check_service_name, - ConnectivityStateWatcher* watcher); + ConnectivityStateWatcherInterface* watcher); // Attempt to connect to the backend. Has no effect if already connected. void AttemptToConnect(); @@ -257,14 +284,15 @@ class Subchannel { grpc_resolved_address* addr); private: - // A linked list of ConnectivityStateWatchers that are monitoring the - // subchannel's state. + // A linked list of ConnectivityStateWatcherInterfaces that are monitoring + // the subchannel's state. class ConnectivityStateWatcherList { public: ~ConnectivityStateWatcherList() { Clear(); } - void AddWatcherLocked(UniquePtr watcher); - void RemoveWatcherLocked(ConnectivityStateWatcher* watcher); + void AddWatcherLocked( + OrphanablePtr watcher); + void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher); // Notifies all watchers in the list about a change to state. void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state); @@ -276,12 +304,13 @@ class Subchannel { private: // TODO(roth): This could be a set instead of a map if we had a set // implementation. - Map> + Map> watchers_; }; - // A map that tracks ConnectivityStateWatchers using a particular health - // check service name. + // A map that tracks ConnectivityStateWatcherInterfaces using a particular + // health check service name. // // There is one entry in the map for each health check service name. // Entries exist only as long as there are watchers using the @@ -291,12 +320,12 @@ class Subchannel { // state READY. class HealthWatcherMap { public: - void AddWatcherLocked(Subchannel* subchannel, - grpc_connectivity_state initial_state, - UniquePtr health_check_service_name, - UniquePtr watcher); + void AddWatcherLocked( + Subchannel* subchannel, grpc_connectivity_state initial_state, + UniquePtr health_check_service_name, + OrphanablePtr watcher); void RemoveWatcherLocked(const char* health_check_service_name, - ConnectivityStateWatcher* watcher); + ConnectivityStateWatcherInterface* watcher); // Notifies the watcher when the subchannel's state changes. void NotifyLocked(grpc_connectivity_state state); diff --git a/src/core/ext/filters/client_channel/subchannel_interface.h b/src/core/ext/filters/client_channel/subchannel_interface.h index 10b1bf124c2..2e448dc5a64 100644 --- a/src/core/ext/filters/client_channel/subchannel_interface.h +++ b/src/core/ext/filters/client_channel/subchannel_interface.h @@ -21,42 +21,22 @@ #include -#include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" namespace grpc_core { -// TODO(roth): In a subsequent PR, remove this from this API. -class ConnectedSubchannelInterface - : public RefCounted { - public: - virtual const grpc_channel_args* args() const GRPC_ABSTRACT; - - protected: - template - explicit ConnectedSubchannelInterface(TraceFlagT* trace_flag = nullptr) - : RefCounted(trace_flag) {} -}; - +// The interface for subchannels that is exposed to LB policy implementations. class SubchannelInterface : public RefCounted { public: - class ConnectivityStateWatcher { + class ConnectivityStateWatcherInterface { public: - virtual ~ConnectivityStateWatcher() = default; + virtual ~ConnectivityStateWatcherInterface() = default; // Will be invoked whenever the subchannel's connectivity state // changes. There will be only one invocation of this method on a // given watcher instance at any given time. - // - // When the state changes to READY, connected_subchannel will - // contain a ref to the connected subchannel. When it changes from - // READY to some other state, the implementation must release its - // ref to the connected subchannel. - virtual void OnConnectivityStateChange( - grpc_connectivity_state new_state, - RefCountedPtr - connected_subchannel) // NOLINT + virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) GRPC_ABSTRACT; // TODO(roth): Remove this as soon as we move to EventManager-based @@ -66,12 +46,14 @@ class SubchannelInterface : public RefCounted { GRPC_ABSTRACT_BASE_CLASS }; + template + explicit SubchannelInterface(TraceFlagT* trace_flag = nullptr) + : RefCounted(trace_flag) {} + virtual ~SubchannelInterface() = default; // Returns the current connectivity state of the subchannel. - virtual grpc_connectivity_state CheckConnectivityState( - RefCountedPtr* connected_subchannel) - GRPC_ABSTRACT; + virtual grpc_connectivity_state CheckConnectivityState() GRPC_ABSTRACT; // Starts watching the subchannel's connectivity state. // The first callback to the watcher will be delivered when the @@ -86,12 +68,12 @@ class SubchannelInterface : public RefCounted { // the previous watcher using CancelConnectivityStateWatch(). virtual void WatchConnectivityState( grpc_connectivity_state initial_state, - UniquePtr watcher) GRPC_ABSTRACT; + UniquePtr watcher) GRPC_ABSTRACT; // Cancels a connectivity state watch. // If the watcher has already been destroyed, this is a no-op. - virtual void CancelConnectivityStateWatch(ConnectivityStateWatcher* watcher) - GRPC_ABSTRACT; + virtual void CancelConnectivityStateWatch( + ConnectivityStateWatcherInterface* watcher) GRPC_ABSTRACT; // Attempt to connect to the backend. Has no effect if already connected. // If the subchannel is currently in backoff delay due to a previously @@ -105,6 +87,9 @@ class SubchannelInterface : public RefCounted { // attempt will be started as soon as AttemptToConnect() is called. virtual void ResetBackoff() GRPC_ABSTRACT; + // TODO(roth): Need a better non-grpc-specific abstraction here. + virtual const grpc_channel_args* channel_args() GRPC_ABSTRACT; + GRPC_ABSTRACT_BASE_CLASS }; diff --git a/src/core/lib/gprpp/map.h b/src/core/lib/gprpp/map.h index 36e32d60c07..566691df580 100644 --- a/src/core/lib/gprpp/map.h +++ b/src/core/lib/gprpp/map.h @@ -30,6 +30,7 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/pair.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" namespace grpc_core { struct StringLess { @@ -41,6 +42,13 @@ struct StringLess { } }; +template +struct RefCountedPtrLess { + bool operator()(const RefCountedPtr& p1, const RefCountedPtr& p2) { + return p1.get() < p2.get(); + } +}; + namespace testing { class MapTest; } @@ -55,8 +63,28 @@ class Map { typedef Compare key_compare; class iterator; + Map() {} ~Map() { clear(); } + // Copying not currently supported. + Map(const Map& other) = delete; + + // Move support. + Map(Map&& other) : root_(other.root_), size_(other.size_) { + other.root_ = nullptr; + other.size_ = 0; + } + Map& operator=(Map&& other) { + if (this != &other) { + clear(); + root_ = other.root_; + size_ = other.size_; + other.root_ = nullptr; + other.size_ = 0; + } + return *this; + } + T& operator[](key_type&& key); T& operator[](const key_type& key); iterator find(const key_type& k); diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc index 1523ced16d8..7766ee186c6 100644 --- a/src/core/lib/transport/metadata.cc +++ b/src/core/lib/transport/metadata.cc @@ -222,7 +222,12 @@ void grpc_mdctx_global_shutdown() { abort(); } } + // For ASAN builds, we don't want to crash here, because that will + // prevent ASAN from providing leak detection information, which is + // far more useful than this simple assertion. +#ifndef GRPC_ASAN_ENABLED GPR_DEBUG_ASSERT(shard->count == 0); +#endif gpr_free(shard->elems); } } diff --git a/test/core/gprpp/map_test.cc b/test/core/gprpp/map_test.cc index 30d9eb0b207..21aeee82486 100644 --- a/test/core/gprpp/map_test.cc +++ b/test/core/gprpp/map_test.cc @@ -437,6 +437,35 @@ TEST_F(MapTest, LowerBound) { EXPECT_EQ(it, test_map.end()); } +// Test move ctor +TEST_F(MapTest, MoveCtor) { + Map test_map; + for (int i = 0; i < 5; i++) { + test_map.emplace(kKeys[i], Payload(i)); + } + Map test_map2 = std::move(test_map); + for (int i = 0; i < 5; i++) { + EXPECT_EQ(test_map.end(), test_map.find(kKeys[i])); + EXPECT_EQ(i, test_map2.find(kKeys[i])->second.data()); + } +} + +// Test move assignment +TEST_F(MapTest, MoveAssignment) { + Map test_map; + for (int i = 0; i < 5; i++) { + test_map.emplace(kKeys[i], Payload(i)); + } + Map test_map2; + test_map2.emplace("xxx", Payload(123)); + test_map2 = std::move(test_map); + for (int i = 0; i < 5; i++) { + EXPECT_EQ(test_map.end(), test_map.find(kKeys[i])); + EXPECT_EQ(i, test_map2.find(kKeys[i])->second.data()); + } + EXPECT_EQ(test_map2.end(), test_map2.find("xxx")); +} + } // namespace testing } // namespace grpc_core diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index 2c1f988d173..041ce1f45a1 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -117,7 +117,7 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy PickResult Pick(PickArgs args) override { PickResult result = delegate_picker_->Pick(args); if (result.type == PickResult::PICK_COMPLETE && - result.connected_subchannel != nullptr) { + result.subchannel != nullptr) { new (args.call_state->Alloc(sizeof(TrailingMetadataHandler))) TrailingMetadataHandler(&result, cb_, user_data_); }