mirror of https://github.com/grpc/grpc.git
[round_robin] 4th attempt: delegate to pick_first as per dualstack design (#34337)
Most recent attempt was #34320, reverted in #34335. The first commit here is a pure revert. The second commit fixes the outlier_detection unit test to pass both with and without the experiment.pull/34346/head
parent
0b8fb5a3c6
commit
1986007e1e
28 changed files with 997 additions and 43 deletions
@ -0,0 +1,188 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2015 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy/endpoint_list.h" |
||||||
|
|
||||||
|
#include <stdlib.h> |
||||||
|
|
||||||
|
#include <algorithm> |
||||||
|
#include <memory> |
||||||
|
#include <utility> |
||||||
|
#include <vector> |
||||||
|
|
||||||
|
#include "absl/status/status.h" |
||||||
|
#include "absl/status/statusor.h" |
||||||
|
#include "absl/types/optional.h" |
||||||
|
|
||||||
|
#include <grpc/impl/connectivity_state.h> |
||||||
|
#include <grpc/support/json.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h" |
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/config/core_configuration.h" |
||||||
|
#include "src/core/lib/gprpp/debug_location.h" |
||||||
|
#include "src/core/lib/gprpp/orphanable.h" |
||||||
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||||
|
#include "src/core/lib/iomgr/pollset_set.h" |
||||||
|
#include "src/core/lib/json/json.h" |
||||||
|
#include "src/core/lib/load_balancing/delegating_helper.h" |
||||||
|
#include "src/core/lib/load_balancing/lb_policy.h" |
||||||
|
#include "src/core/lib/load_balancing/lb_policy_registry.h" |
||||||
|
#include "src/core/lib/resolver/server_address.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
//
|
||||||
|
// EndpointList::Endpoint::Helper
|
||||||
|
//
|
||||||
|
|
||||||
|
class EndpointList::Endpoint::Helper |
||||||
|
: public LoadBalancingPolicy::DelegatingChannelControlHelper { |
||||||
|
public: |
||||||
|
explicit Helper(RefCountedPtr<Endpoint> endpoint) |
||||||
|
: endpoint_(std::move(endpoint)) {} |
||||||
|
|
||||||
|
~Helper() override { endpoint_.reset(DEBUG_LOCATION, "Helper"); } |
||||||
|
|
||||||
|
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
||||||
|
ServerAddress address, const ChannelArgs& args) override { |
||||||
|
return endpoint_->CreateSubchannel(std::move(address), args); |
||||||
|
} |
||||||
|
|
||||||
|
void UpdateState( |
||||||
|
grpc_connectivity_state state, const absl::Status& status, |
||||||
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override { |
||||||
|
auto old_state = std::exchange(endpoint_->connectivity_state_, state); |
||||||
|
endpoint_->picker_ = std::move(picker); |
||||||
|
endpoint_->OnStateUpdate(old_state, state, status); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
LoadBalancingPolicy::ChannelControlHelper* parent_helper() const override { |
||||||
|
return endpoint_->endpoint_list_->channel_control_helper(); |
||||||
|
} |
||||||
|
|
||||||
|
RefCountedPtr<Endpoint> endpoint_; |
||||||
|
}; |
||||||
|
|
||||||
|
//
|
||||||
|
// EndpointList::Endpoint
|
||||||
|
//
|
||||||
|
|
||||||
|
void EndpointList::Endpoint::Init( |
||||||
|
const ServerAddress& address, const ChannelArgs& args, |
||||||
|
std::shared_ptr<WorkSerializer> work_serializer) { |
||||||
|
ChannelArgs child_args = |
||||||
|
args.Set(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true) |
||||||
|
.Set(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX, true); |
||||||
|
LoadBalancingPolicy::Args lb_policy_args; |
||||||
|
lb_policy_args.work_serializer = std::move(work_serializer); |
||||||
|
lb_policy_args.args = child_args; |
||||||
|
lb_policy_args.channel_control_helper = |
||||||
|
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper")); |
||||||
|
child_policy_ = |
||||||
|
CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy( |
||||||
|
"pick_first", std::move(lb_policy_args)); |
||||||
|
if (GPR_UNLIKELY(endpoint_list_->tracer_ != nullptr)) { |
||||||
|
gpr_log(GPR_INFO, "[%s %p] endpoint %p: created child policy %p", |
||||||
|
endpoint_list_->tracer_, endpoint_list_->policy_.get(), this, |
||||||
|
child_policy_.get()); |
||||||
|
} |
||||||
|
// Add our interested_parties pollset_set to that of the newly created
|
||||||
|
// child policy. This will make the child policy progress upon activity on
|
||||||
|
// this policy, which in turn is tied to the application's call.
|
||||||
|
grpc_pollset_set_add_pollset_set( |
||||||
|
child_policy_->interested_parties(), |
||||||
|
endpoint_list_->policy_->interested_parties()); |
||||||
|
// Construct pick_first config.
|
||||||
|
auto config = |
||||||
|
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( |
||||||
|
Json::FromArray( |
||||||
|
{Json::FromObject({{"pick_first", Json::FromObject({})}})})); |
||||||
|
GPR_ASSERT(config.ok()); |
||||||
|
// Update child policy.
|
||||||
|
LoadBalancingPolicy::UpdateArgs update_args; |
||||||
|
update_args.addresses.emplace().emplace_back(address); |
||||||
|
update_args.args = child_args; |
||||||
|
update_args.config = std::move(*config); |
||||||
|
// TODO(roth): If the child reports a non-OK status with the update,
|
||||||
|
// we need to propagate that back to the resolver somehow.
|
||||||
|
(void)child_policy_->UpdateLocked(std::move(update_args)); |
||||||
|
} |
||||||
|
|
||||||
|
void EndpointList::Endpoint::Orphan() { |
||||||
|
// Remove pollset_set linkage.
|
||||||
|
grpc_pollset_set_del_pollset_set( |
||||||
|
child_policy_->interested_parties(), |
||||||
|
endpoint_list_->policy_->interested_parties()); |
||||||
|
child_policy_.reset(); |
||||||
|
picker_.reset(); |
||||||
|
Unref(); |
||||||
|
} |
||||||
|
|
||||||
|
void EndpointList::Endpoint::ResetBackoffLocked() { |
||||||
|
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void EndpointList::Endpoint::ExitIdleLocked() { |
||||||
|
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
size_t EndpointList::Endpoint::Index() const { |
||||||
|
for (size_t i = 0; i < endpoint_list_->endpoints_.size(); ++i) { |
||||||
|
if (endpoint_list_->endpoints_[i].get() == this) return i; |
||||||
|
} |
||||||
|
return -1; |
||||||
|
} |
||||||
|
|
||||||
|
RefCountedPtr<SubchannelInterface> EndpointList::Endpoint::CreateSubchannel( |
||||||
|
ServerAddress address, const ChannelArgs& args) { |
||||||
|
return endpoint_list_->channel_control_helper()->CreateSubchannel( |
||||||
|
std::move(address), args); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// EndpointList
|
||||||
|
//
|
||||||
|
|
||||||
|
void EndpointList::Init( |
||||||
|
const ServerAddressList& addresses, const ChannelArgs& args, |
||||||
|
absl::AnyInvocable<OrphanablePtr<Endpoint>( |
||||||
|
RefCountedPtr<EndpointList>, const ServerAddress&, const ChannelArgs&)> |
||||||
|
create_endpoint) { |
||||||
|
for (const ServerAddress& address : addresses) { |
||||||
|
endpoints_.push_back( |
||||||
|
create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), address, args)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void EndpointList::ResetBackoffLocked() { |
||||||
|
for (const auto& endpoint : endpoints_) { |
||||||
|
endpoint->ResetBackoffLocked(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
bool EndpointList::AllEndpointsSeenInitialState() const { |
||||||
|
for (const auto& endpoint : endpoints_) { |
||||||
|
if (!endpoint->connectivity_state().has_value()) return false; |
||||||
|
} |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace grpc_core
|
@ -0,0 +1,214 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2015 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H |
||||||
|
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <stdlib.h> |
||||||
|
|
||||||
|
#include <memory> |
||||||
|
#include <utility> |
||||||
|
#include <vector> |
||||||
|
|
||||||
|
#include "absl/functional/any_invocable.h" |
||||||
|
#include "absl/status/status.h" |
||||||
|
#include "absl/types/optional.h" |
||||||
|
|
||||||
|
#include <grpc/impl/connectivity_state.h> |
||||||
|
|
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/gprpp/debug_location.h" |
||||||
|
#include "src/core/lib/gprpp/orphanable.h" |
||||||
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||||
|
#include "src/core/lib/gprpp/work_serializer.h" |
||||||
|
#include "src/core/lib/load_balancing/lb_policy.h" |
||||||
|
#include "src/core/lib/load_balancing/subchannel_interface.h" |
||||||
|
#include "src/core/lib/resolver/server_address.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
// A list of endpoints for use in a petiole LB policy. Each endpoint may
|
||||||
|
// have one or more addresses, which will be passed down to a pick_first
|
||||||
|
// child policy.
|
||||||
|
//
|
||||||
|
// To use this, a petiole policy must define its own subclass of both
|
||||||
|
// EndpointList and EndpointList::Endpoint, like so:
|
||||||
|
/*
|
||||||
|
class MyEndpointList : public EndpointList { |
||||||
|
public: |
||||||
|
MyEndpointList(RefCountedPtr<MyLbPolicy> lb_policy, |
||||||
|
const ServerAddressList& addresses, const ChannelArgs& args) |
||||||
|
: EndpointList(std::move(lb_policy), |
||||||
|
GRPC_TRACE_FLAG_ENABLED(grpc_my_tracer) |
||||||
|
? "MyEndpointList" |
||||||
|
: nullptr) { |
||||||
|
Init(addresses, args, |
||||||
|
[&](RefCountedPtr<MyEndpointList> endpoint_list, |
||||||
|
const ServerAddress& address, const ChannelArgs& args) { |
||||||
|
return MakeOrphanable<MyEndpoint>( |
||||||
|
std::move(endpoint_list), address, args, |
||||||
|
policy<MyLbPolicy>()->work_serializer()); |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
class MyEndpoint : public Endpoint { |
||||||
|
public: |
||||||
|
MyEndpoint(RefCountedPtr<MyEndpointList> endpoint_list, |
||||||
|
const ServerAddress& address, const ChannelArgs& args, |
||||||
|
std::shared_ptr<WorkSerializer> work_serializer) |
||||||
|
: Endpoint(std::move(endpoint_list)) { |
||||||
|
Init(address, args, std::move(work_serializer)); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
void OnStateUpdate( |
||||||
|
absl::optional<grpc_connectivity_state> old_state, |
||||||
|
grpc_connectivity_state new_state, |
||||||
|
const absl::Status& status) override { |
||||||
|
// ...handle connectivity state change...
|
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
LoadBalancingPolicy::ChannelControlHelper* channel_control_helper() |
||||||
|
const override { |
||||||
|
return policy<MyLbPolicy>()->channel_control_helper(); |
||||||
|
} |
||||||
|
}; |
||||||
|
*/ |
||||||
|
// TODO(roth): Consider wrapping this in an LB policy subclass for petiole
|
||||||
|
// policies to inherit from.
|
||||||
|
class EndpointList : public InternallyRefCounted<EndpointList> { |
||||||
|
public: |
||||||
|
// An individual endpoint.
|
||||||
|
class Endpoint : public InternallyRefCounted<Endpoint> { |
||||||
|
public: |
||||||
|
~Endpoint() override { endpoint_list_.reset(DEBUG_LOCATION, "Endpoint"); } |
||||||
|
|
||||||
|
void Orphan() override; |
||||||
|
|
||||||
|
void ResetBackoffLocked(); |
||||||
|
void ExitIdleLocked(); |
||||||
|
|
||||||
|
absl::optional<grpc_connectivity_state> connectivity_state() const { |
||||||
|
return connectivity_state_; |
||||||
|
} |
||||||
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker() const { |
||||||
|
return picker_; |
||||||
|
} |
||||||
|
|
||||||
|
protected: |
||||||
|
// We use two-phase initialization here to ensure that the vtable is
|
||||||
|
// initialized before we need to use it. Subclass must invoke Init()
|
||||||
|
// from inside its ctor.
|
||||||
|
explicit Endpoint(RefCountedPtr<EndpointList> endpoint_list) |
||||||
|
: endpoint_list_(std::move(endpoint_list)) {} |
||||||
|
|
||||||
|
void Init(const ServerAddress& address, const ChannelArgs& args, |
||||||
|
std::shared_ptr<WorkSerializer> work_serializer); |
||||||
|
|
||||||
|
// Templated for convenience, to provide a short-hand for
|
||||||
|
// down-casting in the caller.
|
||||||
|
template <typename T> |
||||||
|
T* endpoint_list() const { |
||||||
|
return static_cast<T*>(endpoint_list_.get()); |
||||||
|
} |
||||||
|
|
||||||
|
// Templated for convenience, to provide a short-hand for down-casting
|
||||||
|
// in the caller.
|
||||||
|
template <typename T> |
||||||
|
T* policy() const { |
||||||
|
return endpoint_list_->policy<T>(); |
||||||
|
} |
||||||
|
|
||||||
|
// Returns the index of this endpoint within the EndpointList.
|
||||||
|
// Intended for trace logging.
|
||||||
|
size_t Index() const; |
||||||
|
|
||||||
|
private: |
||||||
|
class Helper; |
||||||
|
|
||||||
|
// Called when the child policy reports a connectivity state update.
|
||||||
|
virtual void OnStateUpdate( |
||||||
|
absl::optional<grpc_connectivity_state> old_state, |
||||||
|
grpc_connectivity_state new_state, const absl::Status& status) = 0; |
||||||
|
|
||||||
|
// Called to create a subchannel. Subclasses may override.
|
||||||
|
virtual RefCountedPtr<SubchannelInterface> CreateSubchannel( |
||||||
|
ServerAddress address, const ChannelArgs& args); |
||||||
|
|
||||||
|
RefCountedPtr<EndpointList> endpoint_list_; |
||||||
|
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
||||||
|
absl::optional<grpc_connectivity_state> connectivity_state_; |
||||||
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_; |
||||||
|
}; |
||||||
|
|
||||||
|
~EndpointList() override { policy_.reset(DEBUG_LOCATION, "EndpointList"); } |
||||||
|
|
||||||
|
void Orphan() override { |
||||||
|
endpoints_.clear(); |
||||||
|
Unref(); |
||||||
|
} |
||||||
|
|
||||||
|
size_t size() const { return endpoints_.size(); } |
||||||
|
|
||||||
|
const std::vector<OrphanablePtr<Endpoint>>& endpoints() const { |
||||||
|
return endpoints_; |
||||||
|
} |
||||||
|
|
||||||
|
void ResetBackoffLocked(); |
||||||
|
|
||||||
|
protected: |
||||||
|
// We use two-phase initialization here to ensure that the vtable is
|
||||||
|
// initialized before we need to use it. Subclass must invoke Init()
|
||||||
|
// from inside its ctor.
|
||||||
|
EndpointList(RefCountedPtr<LoadBalancingPolicy> policy, const char* tracer) |
||||||
|
: policy_(std::move(policy)), tracer_(tracer) {} |
||||||
|
|
||||||
|
void Init(const ServerAddressList& addresses, const ChannelArgs& args, |
||||||
|
absl::AnyInvocable<OrphanablePtr<Endpoint>( |
||||||
|
RefCountedPtr<EndpointList>, const ServerAddress&, |
||||||
|
const ChannelArgs&)> |
||||||
|
create_endpoint); |
||||||
|
|
||||||
|
// Templated for convenience, to provide a short-hand for down-casting
|
||||||
|
// in the caller.
|
||||||
|
template <typename T> |
||||||
|
T* policy() const { |
||||||
|
return static_cast<T*>(policy_.get()); |
||||||
|
} |
||||||
|
|
||||||
|
// Returns true if all endpoints have seen their initial connectivity
|
||||||
|
// state notification.
|
||||||
|
bool AllEndpointsSeenInitialState() const; |
||||||
|
|
||||||
|
private: |
||||||
|
// Returns the parent policy's helper. Needed because the accessor
|
||||||
|
// method is protected on LoadBalancingPolicy.
|
||||||
|
virtual LoadBalancingPolicy::ChannelControlHelper* channel_control_helper() |
||||||
|
const = 0; |
||||||
|
|
||||||
|
RefCountedPtr<LoadBalancingPolicy> policy_; |
||||||
|
const char* tracer_; |
||||||
|
std::vector<OrphanablePtr<Endpoint>> endpoints_; |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H
|
Loading…
Reference in new issue