Revert "[round_robin] delegate to pick_first as per dualstack design" (#34317)
Reverts grpc/grpc#34241pull/34322/head
parent
ce78793ff3
commit
e6bf7c12cf
25 changed files with 262 additions and 693 deletions
@ -1,188 +0,0 @@ |
|||||||
//
|
|
||||||
// Copyright 2015 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/ext/filters/client_channel/lb_policy/endpoint_list.h" |
|
||||||
|
|
||||||
#include <stdlib.h> |
|
||||||
|
|
||||||
#include <algorithm> |
|
||||||
#include <memory> |
|
||||||
#include <utility> |
|
||||||
#include <vector> |
|
||||||
|
|
||||||
#include "absl/status/status.h" |
|
||||||
#include "absl/status/statusor.h" |
|
||||||
#include "absl/types/optional.h" |
|
||||||
|
|
||||||
#include <grpc/impl/connectivity_state.h> |
|
||||||
#include <grpc/support/json.h> |
|
||||||
#include <grpc/support/log.h> |
|
||||||
|
|
||||||
#include "src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h" |
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/config/core_configuration.h" |
|
||||||
#include "src/core/lib/gprpp/debug_location.h" |
|
||||||
#include "src/core/lib/gprpp/orphanable.h" |
|
||||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
||||||
#include "src/core/lib/iomgr/pollset_set.h" |
|
||||||
#include "src/core/lib/json/json.h" |
|
||||||
#include "src/core/lib/load_balancing/delegating_helper.h" |
|
||||||
#include "src/core/lib/load_balancing/lb_policy.h" |
|
||||||
#include "src/core/lib/load_balancing/lb_policy_registry.h" |
|
||||||
#include "src/core/lib/resolver/server_address.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
//
|
|
||||||
// EndpointList::Endpoint::Helper
|
|
||||||
//
|
|
||||||
|
|
||||||
class EndpointList::Endpoint::Helper |
|
||||||
: public LoadBalancingPolicy::DelegatingChannelControlHelper { |
|
||||||
public: |
|
||||||
explicit Helper(RefCountedPtr<Endpoint> endpoint) |
|
||||||
: endpoint_(std::move(endpoint)) {} |
|
||||||
|
|
||||||
~Helper() override { endpoint_.reset(DEBUG_LOCATION, "Helper"); } |
|
||||||
|
|
||||||
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
|
||||||
ServerAddress address, const ChannelArgs& args) override { |
|
||||||
return endpoint_->CreateSubchannel(std::move(address), args); |
|
||||||
} |
|
||||||
|
|
||||||
void UpdateState( |
|
||||||
grpc_connectivity_state state, const absl::Status& status, |
|
||||||
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override { |
|
||||||
auto old_state = std::exchange(endpoint_->connectivity_state_, state); |
|
||||||
endpoint_->picker_ = std::move(picker); |
|
||||||
endpoint_->OnStateUpdate(old_state, state, status); |
|
||||||
} |
|
||||||
|
|
||||||
private: |
|
||||||
LoadBalancingPolicy::ChannelControlHelper* parent_helper() const override { |
|
||||||
return endpoint_->endpoint_list_->channel_control_helper(); |
|
||||||
} |
|
||||||
|
|
||||||
RefCountedPtr<Endpoint> endpoint_; |
|
||||||
}; |
|
||||||
|
|
||||||
//
|
|
||||||
// EndpointList::Endpoint
|
|
||||||
//
|
|
||||||
|
|
||||||
void EndpointList::Endpoint::Init( |
|
||||||
const ServerAddress& address, const ChannelArgs& args, |
|
||||||
std::shared_ptr<WorkSerializer> work_serializer) { |
|
||||||
ChannelArgs child_args = |
|
||||||
args.Set(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true) |
|
||||||
.Set(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX, true); |
|
||||||
LoadBalancingPolicy::Args lb_policy_args; |
|
||||||
lb_policy_args.work_serializer = std::move(work_serializer); |
|
||||||
lb_policy_args.args = child_args; |
|
||||||
lb_policy_args.channel_control_helper = |
|
||||||
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper")); |
|
||||||
child_policy_ = |
|
||||||
CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy( |
|
||||||
"pick_first", std::move(lb_policy_args)); |
|
||||||
if (GPR_UNLIKELY(endpoint_list_->tracer_ != nullptr)) { |
|
||||||
gpr_log(GPR_INFO, "[%s %p] endpoint %p: created child policy %p", |
|
||||||
endpoint_list_->tracer_, endpoint_list_->policy_.get(), this, |
|
||||||
child_policy_.get()); |
|
||||||
} |
|
||||||
// Add our interested_parties pollset_set to that of the newly created
|
|
||||||
// child policy. This will make the child policy progress upon activity on
|
|
||||||
// this policy, which in turn is tied to the application's call.
|
|
||||||
grpc_pollset_set_add_pollset_set( |
|
||||||
child_policy_->interested_parties(), |
|
||||||
endpoint_list_->policy_->interested_parties()); |
|
||||||
// Construct pick_first config.
|
|
||||||
auto config = |
|
||||||
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( |
|
||||||
Json::FromArray( |
|
||||||
{Json::FromObject({{"pick_first", Json::FromObject({})}})})); |
|
||||||
GPR_ASSERT(config.ok()); |
|
||||||
// Update child policy.
|
|
||||||
LoadBalancingPolicy::UpdateArgs update_args; |
|
||||||
update_args.addresses.emplace().emplace_back(address); |
|
||||||
update_args.args = child_args; |
|
||||||
update_args.config = std::move(*config); |
|
||||||
// TODO(roth): If the child reports a non-OK status with the update,
|
|
||||||
// we need to propagate that back to the resolver somehow.
|
|
||||||
(void)child_policy_->UpdateLocked(std::move(update_args)); |
|
||||||
} |
|
||||||
|
|
||||||
void EndpointList::Endpoint::Orphan() { |
|
||||||
// Remove pollset_set linkage.
|
|
||||||
grpc_pollset_set_del_pollset_set( |
|
||||||
child_policy_->interested_parties(), |
|
||||||
endpoint_list_->policy_->interested_parties()); |
|
||||||
child_policy_.reset(); |
|
||||||
picker_.reset(); |
|
||||||
Unref(); |
|
||||||
} |
|
||||||
|
|
||||||
void EndpointList::Endpoint::ResetBackoffLocked() { |
|
||||||
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); |
|
||||||
} |
|
||||||
|
|
||||||
void EndpointList::Endpoint::ExitIdleLocked() { |
|
||||||
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); |
|
||||||
} |
|
||||||
|
|
||||||
size_t EndpointList::Endpoint::Index() const { |
|
||||||
for (size_t i = 0; i < endpoint_list_->endpoints_.size(); ++i) { |
|
||||||
if (endpoint_list_->endpoints_[i].get() == this) return i; |
|
||||||
} |
|
||||||
return -1; |
|
||||||
} |
|
||||||
|
|
||||||
RefCountedPtr<SubchannelInterface> EndpointList::Endpoint::CreateSubchannel( |
|
||||||
ServerAddress address, const ChannelArgs& args) { |
|
||||||
return endpoint_list_->channel_control_helper()->CreateSubchannel( |
|
||||||
std::move(address), args); |
|
||||||
} |
|
||||||
|
|
||||||
//
|
|
||||||
// EndpointList
|
|
||||||
//
|
|
||||||
|
|
||||||
void EndpointList::Init( |
|
||||||
const ServerAddressList& addresses, const ChannelArgs& args, |
|
||||||
absl::AnyInvocable<OrphanablePtr<Endpoint>( |
|
||||||
RefCountedPtr<EndpointList>, const ServerAddress&, const ChannelArgs&)> |
|
||||||
create_endpoint) { |
|
||||||
for (const ServerAddress& address : addresses) { |
|
||||||
endpoints_.push_back( |
|
||||||
create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), address, args)); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void EndpointList::ResetBackoffLocked() { |
|
||||||
for (const auto& endpoint : endpoints_) { |
|
||||||
endpoint->ResetBackoffLocked(); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
bool EndpointList::AllEndpointsSeenInitialState() const { |
|
||||||
for (const auto& endpoint : endpoints_) { |
|
||||||
if (!endpoint->connectivity_state().has_value()) return false; |
|
||||||
} |
|
||||||
return true; |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
@ -1,214 +0,0 @@ |
|||||||
//
|
|
||||||
// Copyright 2015 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
|
|
||||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H |
|
||||||
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <stdlib.h> |
|
||||||
|
|
||||||
#include <memory> |
|
||||||
#include <utility> |
|
||||||
#include <vector> |
|
||||||
|
|
||||||
#include "absl/functional/any_invocable.h" |
|
||||||
#include "absl/status/status.h" |
|
||||||
#include "absl/types/optional.h" |
|
||||||
|
|
||||||
#include <grpc/impl/connectivity_state.h> |
|
||||||
|
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/gprpp/debug_location.h" |
|
||||||
#include "src/core/lib/gprpp/orphanable.h" |
|
||||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
||||||
#include "src/core/lib/gprpp/work_serializer.h" |
|
||||||
#include "src/core/lib/load_balancing/lb_policy.h" |
|
||||||
#include "src/core/lib/load_balancing/subchannel_interface.h" |
|
||||||
#include "src/core/lib/resolver/server_address.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
// A list of endpoints for use in a petiole LB policy. Each endpoint may
|
|
||||||
// have one or more addresses, which will be passed down to a pick_first
|
|
||||||
// child policy.
|
|
||||||
//
|
|
||||||
// To use this, a petiole policy must define its own subclass of both
|
|
||||||
// EndpointList and EndpointList::Endpoint, like so:
|
|
||||||
/*
|
|
||||||
class MyEndpointList : public EndpointList { |
|
||||||
public: |
|
||||||
MyEndpointList(RefCountedPtr<MyLbPolicy> lb_policy, |
|
||||||
const ServerAddressList& addresses, const ChannelArgs& args) |
|
||||||
: EndpointList(std::move(lb_policy), |
|
||||||
GRPC_TRACE_FLAG_ENABLED(grpc_my_tracer) |
|
||||||
? "MyEndpointList" |
|
||||||
: nullptr) { |
|
||||||
Init(addresses, args, |
|
||||||
[&](RefCountedPtr<MyEndpointList> endpoint_list, |
|
||||||
const ServerAddress& address, const ChannelArgs& args) { |
|
||||||
return MakeOrphanable<MyEndpoint>( |
|
||||||
std::move(endpoint_list), address, args, |
|
||||||
policy<MyLbPolicy>()->work_serializer()); |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
private: |
|
||||||
class MyEndpoint : public Endpoint { |
|
||||||
public: |
|
||||||
MyEndpoint(RefCountedPtr<MyEndpointList> endpoint_list, |
|
||||||
const ServerAddress& address, const ChannelArgs& args, |
|
||||||
std::shared_ptr<WorkSerializer> work_serializer) |
|
||||||
: Endpoint(std::move(endpoint_list)) { |
|
||||||
Init(address, args, std::move(work_serializer)); |
|
||||||
} |
|
||||||
|
|
||||||
private: |
|
||||||
void OnStateUpdate( |
|
||||||
absl::optional<grpc_connectivity_state> old_state, |
|
||||||
grpc_connectivity_state new_state, |
|
||||||
const absl::Status& status) override { |
|
||||||
// ...handle connectivity state change...
|
|
||||||
} |
|
||||||
}; |
|
||||||
|
|
||||||
LoadBalancingPolicy::ChannelControlHelper* channel_control_helper() |
|
||||||
const override { |
|
||||||
return policy<MyLbPolicy>()->channel_control_helper(); |
|
||||||
} |
|
||||||
}; |
|
||||||
*/ |
|
||||||
// TODO(roth): Consider wrapping this in an LB policy subclass for petiole
|
|
||||||
// policies to inherit from.
|
|
||||||
class EndpointList : public InternallyRefCounted<EndpointList> { |
|
||||||
public: |
|
||||||
// An individual endpoint.
|
|
||||||
class Endpoint : public InternallyRefCounted<Endpoint> { |
|
||||||
public: |
|
||||||
~Endpoint() override { endpoint_list_.reset(DEBUG_LOCATION, "Endpoint"); } |
|
||||||
|
|
||||||
void Orphan() override; |
|
||||||
|
|
||||||
void ResetBackoffLocked(); |
|
||||||
void ExitIdleLocked(); |
|
||||||
|
|
||||||
absl::optional<grpc_connectivity_state> connectivity_state() const { |
|
||||||
return connectivity_state_; |
|
||||||
} |
|
||||||
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker() const { |
|
||||||
return picker_; |
|
||||||
} |
|
||||||
|
|
||||||
protected: |
|
||||||
// We use two-phase initialization here to ensure that the vtable is
|
|
||||||
// initialized before we need to use it. Subclass must invoke Init()
|
|
||||||
// from inside its ctor.
|
|
||||||
explicit Endpoint(RefCountedPtr<EndpointList> endpoint_list) |
|
||||||
: endpoint_list_(std::move(endpoint_list)) {} |
|
||||||
|
|
||||||
void Init(const ServerAddress& address, const ChannelArgs& args, |
|
||||||
std::shared_ptr<WorkSerializer> work_serializer); |
|
||||||
|
|
||||||
// Templated for convenience, to provide a short-hand for
|
|
||||||
// down-casting in the caller.
|
|
||||||
template <typename T> |
|
||||||
T* endpoint_list() const { |
|
||||||
return static_cast<T*>(endpoint_list_.get()); |
|
||||||
} |
|
||||||
|
|
||||||
// Templated for convenience, to provide a short-hand for down-casting
|
|
||||||
// in the caller.
|
|
||||||
template <typename T> |
|
||||||
T* policy() const { |
|
||||||
return endpoint_list_->policy<T>(); |
|
||||||
} |
|
||||||
|
|
||||||
// Returns the index of this endpoint within the EndpointList.
|
|
||||||
// Intended for trace logging.
|
|
||||||
size_t Index() const; |
|
||||||
|
|
||||||
private: |
|
||||||
class Helper; |
|
||||||
|
|
||||||
// Called when the child policy reports a connectivity state update.
|
|
||||||
virtual void OnStateUpdate( |
|
||||||
absl::optional<grpc_connectivity_state> old_state, |
|
||||||
grpc_connectivity_state new_state, const absl::Status& status) = 0; |
|
||||||
|
|
||||||
// Called to create a subchannel. Subclasses may override.
|
|
||||||
virtual RefCountedPtr<SubchannelInterface> CreateSubchannel( |
|
||||||
ServerAddress address, const ChannelArgs& args); |
|
||||||
|
|
||||||
RefCountedPtr<EndpointList> endpoint_list_; |
|
||||||
|
|
||||||
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
|
||||||
absl::optional<grpc_connectivity_state> connectivity_state_; |
|
||||||
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_; |
|
||||||
}; |
|
||||||
|
|
||||||
~EndpointList() override { policy_.reset(DEBUG_LOCATION, "EndpointList"); } |
|
||||||
|
|
||||||
void Orphan() override { |
|
||||||
endpoints_.clear(); |
|
||||||
Unref(); |
|
||||||
} |
|
||||||
|
|
||||||
size_t size() const { return endpoints_.size(); } |
|
||||||
|
|
||||||
const std::vector<OrphanablePtr<Endpoint>>& endpoints() const { |
|
||||||
return endpoints_; |
|
||||||
} |
|
||||||
|
|
||||||
void ResetBackoffLocked(); |
|
||||||
|
|
||||||
protected: |
|
||||||
// We use two-phase initialization here to ensure that the vtable is
|
|
||||||
// initialized before we need to use it. Subclass must invoke Init()
|
|
||||||
// from inside its ctor.
|
|
||||||
EndpointList(RefCountedPtr<LoadBalancingPolicy> policy, const char* tracer) |
|
||||||
: policy_(std::move(policy)), tracer_(tracer) {} |
|
||||||
|
|
||||||
void Init(const ServerAddressList& addresses, const ChannelArgs& args, |
|
||||||
absl::AnyInvocable<OrphanablePtr<Endpoint>( |
|
||||||
RefCountedPtr<EndpointList>, const ServerAddress&, |
|
||||||
const ChannelArgs&)> |
|
||||||
create_endpoint); |
|
||||||
|
|
||||||
// Templated for convenience, to provide a short-hand for down-casting
|
|
||||||
// in the caller.
|
|
||||||
template <typename T> |
|
||||||
T* policy() const { |
|
||||||
return static_cast<T*>(policy_.get()); |
|
||||||
} |
|
||||||
|
|
||||||
// Returns true if all endpoints have seen their initial connectivity
|
|
||||||
// state notification.
|
|
||||||
bool AllEndpointsSeenInitialState() const; |
|
||||||
|
|
||||||
private: |
|
||||||
// Returns the parent policy's helper. Needed because the accessor
|
|
||||||
// method is protected on LoadBalancingPolicy.
|
|
||||||
virtual LoadBalancingPolicy::ChannelControlHelper* channel_control_helper() |
|
||||||
const = 0; |
|
||||||
|
|
||||||
RefCountedPtr<LoadBalancingPolicy> policy_; |
|
||||||
const char* tracer_; |
|
||||||
std::vector<OrphanablePtr<Endpoint>> endpoints_; |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
||||||
|
|
||||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H
|
|
Loading…
Reference in new issue