[round robin] delegate to pick_first instead of creating subchannels directly (#32692)
More work on the dualstack backend design: - Change round_robin to delegate to pick_first instead of creating subchannels directly. - Change pick_first such that when it is the child of a petiole policy, it will unconditionally start a health watch. - Change the client-side health checking code such that if client-side health checking is not enabled, it will return the subchannel's raw connectivity state. - As part of this, we introduce a new endpoint_list library to be used by petiole policies, which is intended to replace the existing subchannel_list library. The only policy that will still directly interact with subchannels is pick_first, so the relevant parts of the subchannel_list functionality have been copied directly into that policy. The subchannel_list library will be removed after all petiole policies are updated to delegate to pick_first.pull/33087/head^2
30 changed files with 1359 additions and 442 deletions
@ -0,0 +1,188 @@ |
// Copyright 2015 gRPC authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h> |
#include "src/core/ext/filters/client_channel/lb_policy/endpoint_list.h" |
#include <stdlib.h> |
#include <algorithm> |
#include <memory> |
#include <utility> |
#include <vector> |
#include "absl/status/status.h" |
#include "absl/status/statusor.h" |
#include "absl/types/optional.h" |
#include <grpc/impl/connectivity_state.h> |
#include <grpc/support/json.h> |
#include <grpc/support/log.h> |
#include "src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h" |
#include "src/core/lib/channel/channel_args.h" |
#include "src/core/lib/config/core_configuration.h" |
#include "src/core/lib/gprpp/debug_location.h" |
#include "src/core/lib/gprpp/orphanable.h" |
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
#include "src/core/lib/iomgr/pollset_set.h" |
#include "src/core/lib/json/json.h" |
#include "src/core/lib/load_balancing/delegating_helper.h" |
#include "src/core/lib/load_balancing/lb_policy.h" |
#include "src/core/lib/load_balancing/lb_policy_registry.h" |
#include "src/core/lib/resolver/server_address.h" |
namespace grpc_core { |
// EndpointList::Endpoint::Helper
class EndpointList::Endpoint::Helper |
: public LoadBalancingPolicy::DelegatingChannelControlHelper { |
public: |
explicit Helper(RefCountedPtr<Endpoint> endpoint) |
: endpoint_(std::move(endpoint)) {} |
~Helper() override { endpoint_.reset(DEBUG_LOCATION, "Helper"); } |
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
ServerAddress address, const ChannelArgs& args) override { |
return endpoint_->CreateSubchannel(std::move(address), args); |
} |
void UpdateState( |
grpc_connectivity_state state, const absl::Status& status, |
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override { |
auto old_state = std::exchange(endpoint_->connectivity_state_, state); |
endpoint_->picker_ = std::move(picker); |
endpoint_->OnStateUpdate(old_state, state, status); |
} |
private: |
LoadBalancingPolicy::ChannelControlHelper* parent_helper() const override { |
return endpoint_->endpoint_list_->channel_control_helper(); |
} |
RefCountedPtr<Endpoint> endpoint_; |
}; |
// EndpointList::Endpoint
void EndpointList::Endpoint::Init( |
const ServerAddress& address, const ChannelArgs& args, |
std::shared_ptr<WorkSerializer> work_serializer) { |
ChannelArgs child_args = |
LoadBalancingPolicy::Args lb_policy_args; |
lb_policy_args.work_serializer = std::move(work_serializer); |
lb_policy_args.args = child_args; |
lb_policy_args.channel_control_helper = |
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper")); |
child_policy_ = |
CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy( |
"pick_first", std::move(lb_policy_args)); |
if (GPR_UNLIKELY(endpoint_list_->tracer_ != nullptr)) { |
gpr_log(GPR_INFO, "[%s %p] endpoint %p: created child policy %p", |
endpoint_list_->tracer_, endpoint_list_->policy_.get(), this, |
child_policy_.get()); |
} |
// Add our interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// this policy, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set( |
child_policy_->interested_parties(), |
endpoint_list_->policy_->interested_parties()); |
// Construct pick_first config.
auto config = |
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( |
Json::FromArray( |
{Json::FromObject({{"pick_first", Json::FromObject({})}})})); |
GPR_ASSERT(config.ok()); |
// Update child policy.
LoadBalancingPolicy::UpdateArgs update_args; |
update_args.addresses.emplace().emplace_back(address); |
update_args.args = child_args; |
update_args.config = std::move(*config); |
// TODO(roth): If the child reports a non-OK status with the update,
// we need to propagate that back to the resolver somehow.
(void)child_policy_->UpdateLocked(std::move(update_args)); |
} |
void EndpointList::Endpoint::Orphan() { |
// Remove pollset_set linkage.
grpc_pollset_set_del_pollset_set( |
child_policy_->interested_parties(), |
endpoint_list_->policy_->interested_parties()); |
child_policy_.reset(); |
picker_.reset(); |
Unref(); |
} |
void EndpointList::Endpoint::ResetBackoffLocked() { |
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); |
} |
void EndpointList::Endpoint::ExitIdleLocked() { |
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); |
} |
size_t EndpointList::Endpoint::Index() const { |
for (size_t i = 0; i < endpoint_list_->endpoints_.size(); ++i) { |
if (endpoint_list_->endpoints_[i].get() == this) return i; |
} |
return -1; |
} |
RefCountedPtr<SubchannelInterface> EndpointList::Endpoint::CreateSubchannel( |
ServerAddress address, const ChannelArgs& args) { |
return endpoint_list_->channel_control_helper()->CreateSubchannel( |
std::move(address), args); |
} |
// EndpointList
void EndpointList::Init( |
const ServerAddressList& addresses, const ChannelArgs& args, |
absl::AnyInvocable<OrphanablePtr<Endpoint>( |
RefCountedPtr<EndpointList>, const ServerAddress&, const ChannelArgs&)> |
create_endpoint) { |
for (const ServerAddress& address : addresses) { |
endpoints_.push_back( |
create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), address, args)); |
} |
} |
void EndpointList::ResetBackoffLocked() { |
for (const auto& endpoint : endpoints_) { |
endpoint->ResetBackoffLocked(); |
} |
} |
bool EndpointList::AllEndpointsSeenInitialState() const { |
for (const auto& endpoint : endpoints_) { |
if (!endpoint->connectivity_state().has_value()) return false; |
} |
return true; |
} |
} // namespace grpc_core
@ -0,0 +1,212 @@ |
// Copyright 2015 gRPC authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h> |
#include <stdlib.h> |
#include <memory> |
#include <utility> |
#include <vector> |
#include "absl/functional/any_invocable.h" |
#include "absl/status/status.h" |
#include "absl/types/optional.h" |
#include <grpc/impl/connectivity_state.h> |
#include "src/core/lib/channel/channel_args.h" |
#include "src/core/lib/gprpp/debug_location.h" |
#include "src/core/lib/gprpp/orphanable.h" |
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
#include "src/core/lib/gprpp/work_serializer.h" |
#include "src/core/lib/load_balancing/lb_policy.h" |
#include "src/core/lib/load_balancing/subchannel_interface.h" |
#include "src/core/lib/resolver/server_address.h" |
namespace grpc_core { |
// A list of endpoints for use in a petiole LB policy. Each endpoint may
// have one or more addresses, which will be passed down to a pick_first
// child policy.
// To use this, a petiole policy must define its own subclass of both
// EndpointList and EndpointList::Endpoint, like so:
class MyEndpointList : public EndpointList { |
public: |
MyEndpointList(RefCountedPtr<MyLbPolicy> lb_policy, |
const ServerAddressList& addresses, const ChannelArgs& args) |
: EndpointList(std::move(lb_policy), |
GRPC_TRACE_FLAG_ENABLED(grpc_my_tracer) |
? "MyEndpointList" |
: nullptr) { |
Init(addresses, args, |
[&](RefCountedPtr<MyEndpointList> endpoint_list, |
const ServerAddress& address, const ChannelArgs& args) { |
return MakeOrphanable<MyEndpoint>( |
std::move(endpoint_list), address, args, |
policy<MyLbPolicy>()->work_serializer()); |
}); |
} |
private: |
class MyEndpoint : public Endpoint { |
public: |
MyEndpoint(RefCountedPtr<MyEndpointList> endpoint_list, |
const ServerAddress& address, const ChannelArgs& args, |
std::shared_ptr<WorkSerializer> work_serializer) |
: Endpoint(std::move(endpoint_list)) { |
Init(address, args, std::move(work_serializer)); |
} |
private: |
void OnStateUpdate( |
absl::optional<grpc_connectivity_state> old_state, |
grpc_connectivity_state new_state, |
const absl::Status& status) override { |
// ...handle connectivity state change...
} |
}; |
LoadBalancingPolicy::ChannelControlHelper* channel_control_helper() |
const override { |
return policy<MyLbPolicy>()->channel_control_helper(); |
} |
}; |
*/ |
class EndpointList : public InternallyRefCounted<EndpointList> { |
public: |
// An individual endpoint.
class Endpoint : public InternallyRefCounted<Endpoint> { |
public: |
~Endpoint() override { endpoint_list_.reset(DEBUG_LOCATION, "Endpoint"); } |
void Orphan() override; |
void ResetBackoffLocked(); |
void ExitIdleLocked(); |
absl::optional<grpc_connectivity_state> connectivity_state() const { |
return connectivity_state_; |
} |
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker() const { |
return picker_; |
} |
protected: |
// We use two-phase initialization here to ensure that the vtable is
// initialized before we need to use it. Subclass must invoke Init()
// from inside its ctor.
explicit Endpoint(RefCountedPtr<EndpointList> endpoint_list) |
: endpoint_list_(std::move(endpoint_list)) {} |
void Init(const ServerAddress& address, const ChannelArgs& args, |
std::shared_ptr<WorkSerializer> work_serializer); |
// Templated for convenience, to provide a short-hand for
// down-casting in the caller.
template <typename T> |
T* endpoint_list() const { |
return static_cast<T*>(endpoint_list_.get()); |
} |
// Templated for convenience, to provide a short-hand for down-casting
// in the caller.
template <typename T> |
T* policy() const { |
return endpoint_list_->policy<T>(); |
} |
// Returns the index of this endpoint within the EndpointList.
// Intended for trace logging.
size_t Index() const; |
private: |
class Helper; |
// Called when the child policy reports a connectivity state update.
virtual void OnStateUpdate( |
absl::optional<grpc_connectivity_state> old_state, |
grpc_connectivity_state new_state, const absl::Status& status) = 0; |
// Called to create a subchannel. Subclasses may override.
virtual RefCountedPtr<SubchannelInterface> CreateSubchannel( |
ServerAddress address, const ChannelArgs& args); |
RefCountedPtr<EndpointList> endpoint_list_; |
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
absl::optional<grpc_connectivity_state> connectivity_state_; |
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_; |
}; |
~EndpointList() override { policy_.reset(DEBUG_LOCATION, "EndpointList"); } |
void Orphan() override { |
endpoints_.clear(); |
Unref(); |
} |
size_t size() const { return endpoints_.size(); } |
const std::vector<OrphanablePtr<Endpoint>>& endpoints() const { |
return endpoints_; |
} |
void ResetBackoffLocked(); |
protected: |
// We use two-phase initialization here to ensure that the vtable is
// initialized before we need to use it. Subclass must invoke Init()
// from inside its ctor.
EndpointList(RefCountedPtr<LoadBalancingPolicy> policy, const char* tracer) |
: policy_(std::move(policy)), tracer_(tracer) {} |
void Init(const ServerAddressList& addresses, const ChannelArgs& args, |
absl::AnyInvocable<OrphanablePtr<Endpoint>( |
RefCountedPtr<EndpointList>, const ServerAddress&, |
const ChannelArgs&)> |
create_endpoint); |
// Templated for convenience, to provide a short-hand for down-casting
// in the caller.
template <typename T> |
T* policy() const { |
return static_cast<T*>(policy_.get()); |
} |
// Returns true if all endpoints have seen their initial connectivity
// state notification.
bool AllEndpointsSeenInitialState() const; |
private: |
// Returns the parent policy's helper. Needed because the accessor
// method is protected on LoadBalancingPolicy.
virtual LoadBalancingPolicy::ChannelControlHelper* channel_control_helper() |
const = 0; |
RefCountedPtr<LoadBalancingPolicy> policy_; |
const char* tracer_; |
std::vector<OrphanablePtr<Endpoint>> endpoints_; |
}; |
} // namespace grpc_core
@ -0,0 +1,36 @@ |
// Copyright 2023 gRPC authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h> |
#include "src/core/lib/resolver/server_address.h" |
// Internal channel arg to enable health checking in pick_first.
// Intended to be used by petiole policies (e.g., round_robin) that
// delegate to pick_first.
GRPC_ARG_NO_SUBCHANNEL_PREFIX "pick_first_enable_health_checking" |
// Internal channel arg to tell pick_first to omit the prefix it normally
// adds to error status messages. Intended to be used by petiole policies
// (e.g., round_robin) that want to add their own prefixes.
GRPC_ARG_NO_SUBCHANNEL_PREFIX "pick_first_omit_status_message_prefix" |
Reference in new issue