commit
00458b3851
162 changed files with 6228 additions and 4382 deletions
@ -0,0 +1,83 @@ |
||||
//
|
||||
// Copyright 2020 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
|
||||
#define GRPC_ARG_HIERARCHICAL_PATH "grpc.internal.address.hierarchical_path" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
|
||||
void* HierarchicalPathCopy(void* p) { |
||||
std::vector<std::string>* path = static_cast<std::vector<std::string>*>(p); |
||||
return static_cast<void*>(new std::vector<std::string>(*path)); |
||||
} |
||||
|
||||
void HierarchicalPathDestroy(void* p) { |
||||
std::vector<std::string>* path = static_cast<std::vector<std::string>*>(p); |
||||
delete path; |
||||
} |
||||
|
||||
int HierarchicalPathCompare(void* p1, void* p2) { |
||||
std::vector<std::string>* path1 = static_cast<std::vector<std::string>*>(p1); |
||||
std::vector<std::string>* path2 = static_cast<std::vector<std::string>*>(p2); |
||||
for (size_t i = 0; i < path1->size(); ++i) { |
||||
if (path2->size() == i) return 1; |
||||
int r = (*path1)[i].compare((*path2)[i]); |
||||
if (r != 0) return r; |
||||
} |
||||
if (path2->size() > path1->size()) return -1; |
||||
return 0; |
||||
} |
||||
|
||||
const grpc_arg_pointer_vtable hierarchical_path_arg_vtable = { |
||||
HierarchicalPathCopy, HierarchicalPathDestroy, HierarchicalPathCompare}; |
||||
|
||||
} // namespace
|
||||
|
||||
grpc_arg MakeHierarchicalPathArg(const std::vector<std::string>& path) { |
||||
return grpc_channel_arg_pointer_create( |
||||
const_cast<char*>(GRPC_ARG_HIERARCHICAL_PATH), |
||||
const_cast<std::vector<std::string>*>(&path), |
||||
&hierarchical_path_arg_vtable); |
||||
} |
||||
|
||||
HierarchicalAddressMap MakeHierarchicalAddressMap( |
||||
const ServerAddressList& addresses) { |
||||
HierarchicalAddressMap result; |
||||
for (const ServerAddress& address : addresses) { |
||||
auto* path = grpc_channel_args_find_pointer<std::vector<std::string>>( |
||||
address.args(), GRPC_ARG_HIERARCHICAL_PATH); |
||||
if (path == nullptr || path->empty()) continue; |
||||
auto it = path->begin(); |
||||
ServerAddressList& target_list = result[*it]; |
||||
++it; |
||||
std::vector<std::string> remaining_path(it, path->end()); |
||||
const char* name_to_remove = GRPC_ARG_HIERARCHICAL_PATH; |
||||
grpc_arg new_arg = MakeHierarchicalPathArg(remaining_path); |
||||
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( |
||||
address.args(), &name_to_remove, 1, &new_arg, 1); |
||||
target_list.emplace_back(address.address(), new_args); |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,99 @@ |
||||
//
|
||||
// Copyright 2020 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ADDRESS_FILTERING_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ADDRESS_FILTERING_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <map> |
||||
#include <string> |
||||
#include <vector> |
||||
|
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include "src/core/ext/filters/client_channel/server_address.h" |
||||
|
||||
// The resolver returns a flat list of addresses. When a hierarchy of
|
||||
// LB policies is in use, each leaf of the hierarchy will need a
|
||||
// different subset of those addresses. This library provides a
|
||||
// mechanism for determining which address is passed to which leaf
|
||||
// policy.
|
||||
//
|
||||
// Each address will have an associated path that indicates which child
|
||||
// it should be sent to at each level of the hierarchy to wind up at the
|
||||
// right leaf policy. Each LB policy will look at the first element of
|
||||
// the path of each address to determine which child to send the address
|
||||
// to. It will then remove that first element when passing the address
|
||||
// down to its child.
|
||||
//
|
||||
// For example, consider the following LB policy hierarchy:
|
||||
//
|
||||
// - priority
|
||||
// - child0 (weighted_target)
|
||||
// - localityA (round_robin)
|
||||
// - localityB (round_robin)
|
||||
// - child1 (weighted_target)
|
||||
// - localityC (round_robin)
|
||||
// - localityD (round_robin)
|
||||
//
|
||||
// Now consider the following addresses:
|
||||
// - 10.0.0.1:80 path=["child0", "localityA"]
|
||||
// - 10.0.0.2:80 path=["child0", "localityB"]
|
||||
// - 10.0.0.3:80 path=["child1", "localityC"]
|
||||
// - 10.0.0.4:80 path=["child1", "localityD"]
|
||||
//
|
||||
// The priority policy will split this up into two lists, one for each
|
||||
// of its children:
|
||||
// - child0:
|
||||
// - 10.0.0.1:80 path=["localityA"]
|
||||
// - 10.0.0.2:80 path=["localityB"]
|
||||
// - child1:
|
||||
// - 10.0.0.3:80 path=["localityC"]
|
||||
// - 10.0.0.4:80 path=["localityD"]
|
||||
//
|
||||
// The weighted_target policy for child0 will split its list up into two
|
||||
// lists, one for each of its children:
|
||||
// - localityA:
|
||||
// - 10.0.0.1:80 path=[]
|
||||
// - localityB:
|
||||
// - 10.0.0.2:80 path=[]
|
||||
//
|
||||
// Similarly, the weighted_target policy for child1 will split its list
|
||||
// up into two lists, one for each of its children:
|
||||
// - localityC:
|
||||
// - 10.0.0.3:80 path=[]
|
||||
// - localityD:
|
||||
// - 10.0.0.4:80 path=[]
|
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Constructs a channel arg containing the hierarchical path
|
||||
// to be associated with an address.
|
||||
grpc_arg MakeHierarchicalPathArg(const std::vector<std::string>& path); |
||||
|
||||
// A map from the next path element to the addresses that fall under
|
||||
// that path element.
|
||||
using HierarchicalAddressMap = std::map<std::string, ServerAddressList>; |
||||
|
||||
// Splits up the addresses into a separate list for each child.
|
||||
HierarchicalAddressMap MakeHierarchicalAddressMap( |
||||
const ServerAddressList& addresses); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ADDRESS_FILTERING_H \ |
||||
*/ |
@ -0,0 +1,89 @@ |
||||
//
|
||||
// Copyright 2019 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h" |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gpr/useful.h" |
||||
|
||||
// Channel arg key for the list of balancer addresses.
|
||||
#define GRPC_ARG_GRPCLB_BALANCER_ADDRESSES "grpc.grpclb_balancer_addresses" |
||||
// Channel arg key for a string indicating an address's balancer name.
|
||||
#define GRPC_ARG_ADDRESS_BALANCER_NAME "grpc.address_balancer_name" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
|
||||
void* BalancerAddressesArgCopy(void* p) { |
||||
ServerAddressList* address_list = static_cast<ServerAddressList*>(p); |
||||
return new ServerAddressList(*address_list); |
||||
} |
||||
|
||||
void BalancerAddressesArgDestroy(void* p) { |
||||
ServerAddressList* address_list = static_cast<ServerAddressList*>(p); |
||||
delete address_list; |
||||
} |
||||
|
||||
int BalancerAddressesArgCmp(void* p, void* q) { |
||||
ServerAddressList* address_list1 = static_cast<ServerAddressList*>(p); |
||||
ServerAddressList* address_list2 = static_cast<ServerAddressList*>(q); |
||||
if (address_list1 == nullptr || address_list2 == nullptr) { |
||||
return GPR_ICMP(address_list1, address_list2); |
||||
} |
||||
if (address_list1->size() > address_list2->size()) return 1; |
||||
if (address_list1->size() < address_list2->size()) return -1; |
||||
for (size_t i = 0; i < address_list1->size(); ++i) { |
||||
int retval = (*address_list1)[i].Cmp((*address_list2)[i]); |
||||
if (retval != 0) return retval; |
||||
} |
||||
return 0; |
||||
} |
||||
|
||||
const grpc_arg_pointer_vtable kBalancerAddressesArgVtable = { |
||||
BalancerAddressesArgCopy, BalancerAddressesArgDestroy, |
||||
BalancerAddressesArgCmp}; |
||||
|
||||
} // namespace
|
||||
|
||||
grpc_arg CreateGrpclbBalancerAddressesArg( |
||||
const ServerAddressList* address_list) { |
||||
return grpc_channel_arg_pointer_create( |
||||
const_cast<char*>(GRPC_ARG_GRPCLB_BALANCER_ADDRESSES), |
||||
const_cast<ServerAddressList*>(address_list), |
||||
&kBalancerAddressesArgVtable); |
||||
} |
||||
|
||||
const ServerAddressList* FindGrpclbBalancerAddressesInChannelArgs( |
||||
const grpc_channel_args& args) { |
||||
return grpc_channel_args_find_pointer<const ServerAddressList>( |
||||
&args, const_cast<char*>(GRPC_ARG_GRPCLB_BALANCER_ADDRESSES)); |
||||
} |
||||
|
||||
grpc_arg CreateGrpclbBalancerNameArg(const char* balancer_name) { |
||||
return grpc_channel_arg_string_create( |
||||
const_cast<char*>(GRPC_ARG_ADDRESS_BALANCER_NAME), |
||||
const_cast<char*>(balancer_name)); |
||||
} |
||||
|
||||
const char* FindGrpclbBalancerNameInChannelArgs(const grpc_channel_args& args) { |
||||
return grpc_channel_args_find_string( |
||||
&args, const_cast<char*>(GRPC_ARG_ADDRESS_BALANCER_NAME)); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,40 @@ |
||||
//
|
||||
// Copyright 2019 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_BALANCER_ADDRESSES_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_BALANCER_ADDRESSES_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpc/impl/codegen/grpc_types.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/server_address.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
grpc_arg CreateGrpclbBalancerAddressesArg( |
||||
const ServerAddressList* address_list); |
||||
const ServerAddressList* FindGrpclbBalancerAddressesInChannelArgs( |
||||
const grpc_channel_args& args); |
||||
|
||||
grpc_arg CreateGrpclbBalancerNameArg(const char* balancer_name); |
||||
const char* FindGrpclbBalancerNameInChannelArgs(const grpc_channel_args& args); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif /* \ |
||||
GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_BALANCER_ADDRESSES_H \
|
||||
*/ |
@ -0,0 +1,870 @@ |
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <inttypes.h> |
||||
#include <limits.h> |
||||
|
||||
#include "absl/strings/str_cat.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
#include "src/core/lib/iomgr/work_serializer.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
TraceFlag grpc_lb_priority_trace(false, "priority_lb"); |
||||
|
||||
namespace { |
||||
|
||||
constexpr char kPriority[] = "priority_experimental"; |
||||
|
||||
// How long we keep a child around for after it is no longer being used
|
||||
// (either because it has been removed from the config or because we
|
||||
// have switched to a higher-priority child).
|
||||
constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000; |
||||
|
||||
// Default for how long we wait for a newly created child to get connected
|
||||
// before starting to attempt the next priority. Overridable via channel arg.
|
||||
constexpr int kDefaultChildFailoverTimeoutMs = 10000; |
||||
|
||||
// Config for priority LB policy.
|
||||
class PriorityLbConfig : public LoadBalancingPolicy::Config { |
||||
public: |
||||
PriorityLbConfig( |
||||
std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>> |
||||
children, |
||||
std::vector<std::string> priorities) |
||||
: children_(std::move(children)), priorities_(std::move(priorities)) {} |
||||
|
||||
const char* name() const override { return kPriority; } |
||||
|
||||
const std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>& |
||||
children() const { |
||||
return children_; |
||||
} |
||||
const std::vector<std::string>& priorities() const { return priorities_; } |
||||
|
||||
private: |
||||
const std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>> |
||||
children_; |
||||
const std::vector<std::string> priorities_; |
||||
}; |
||||
|
||||
// priority LB policy.
|
||||
class PriorityLb : public LoadBalancingPolicy { |
||||
public: |
||||
explicit PriorityLb(Args args); |
||||
|
||||
const char* name() const override { return kPriority; } |
||||
|
||||
void UpdateLocked(UpdateArgs args) override; |
||||
void ExitIdleLocked() override; |
||||
void ResetBackoffLocked() override; |
||||
|
||||
private: |
||||
// Each ChildPriority holds a ref to the PriorityLb.
|
||||
class ChildPriority : public InternallyRefCounted<ChildPriority> { |
||||
public: |
||||
ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name); |
||||
|
||||
~ChildPriority() { |
||||
priority_policy_.reset(DEBUG_LOCATION, "ChildPriority"); |
||||
} |
||||
|
||||
const std::string& name() const { return name_; } |
||||
|
||||
void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config); |
||||
void ExitIdleLocked(); |
||||
void ResetBackoffLocked(); |
||||
void DeactivateLocked(); |
||||
void MaybeReactivateLocked(); |
||||
void MaybeCancelFailoverTimerLocked(); |
||||
|
||||
void Orphan() override; |
||||
|
||||
std::unique_ptr<SubchannelPicker> GetPicker() { |
||||
return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_); |
||||
} |
||||
|
||||
grpc_connectivity_state connectivity_state() const { |
||||
return connectivity_state_; |
||||
} |
||||
bool failover_timer_callback_pending() const { |
||||
return failover_timer_callback_pending_; |
||||
} |
||||
|
||||
private: |
||||
// A simple wrapper for ref-counting a picker from the child policy.
|
||||
class RefCountedPicker : public RefCounted<RefCountedPicker> { |
||||
public: |
||||
explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker) |
||||
: picker_(std::move(picker)) {} |
||||
PickResult Pick(PickArgs args) { return picker_->Pick(args); } |
||||
|
||||
private: |
||||
std::unique_ptr<SubchannelPicker> picker_; |
||||
}; |
||||
|
||||
// A non-ref-counted wrapper for RefCountedPicker.
|
||||
class RefCountedPickerWrapper : public SubchannelPicker { |
||||
public: |
||||
explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker) |
||||
: picker_(std::move(picker)) {} |
||||
PickResult Pick(PickArgs args) override { return picker_->Pick(args); } |
||||
|
||||
private: |
||||
RefCountedPtr<RefCountedPicker> picker_; |
||||
}; |
||||
|
||||
class Helper : public ChannelControlHelper { |
||||
public: |
||||
explicit Helper(RefCountedPtr<ChildPriority> priority) |
||||
: priority_(std::move(priority)) {} |
||||
|
||||
~Helper() { priority_.reset(DEBUG_LOCATION, "Helper"); } |
||||
|
||||
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
||||
const grpc_channel_args& args) override; |
||||
void UpdateState(grpc_connectivity_state state, |
||||
std::unique_ptr<SubchannelPicker> picker) override; |
||||
void RequestReresolution() override; |
||||
void AddTraceEvent(TraceSeverity severity, StringView message) override; |
||||
|
||||
private: |
||||
RefCountedPtr<ChildPriority> priority_; |
||||
}; |
||||
|
||||
// Methods for dealing with the child policy.
|
||||
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
||||
const grpc_channel_args* args); |
||||
|
||||
void OnConnectivityStateUpdateLocked( |
||||
grpc_connectivity_state state, |
||||
std::unique_ptr<SubchannelPicker> picker); |
||||
|
||||
void StartFailoverTimerLocked(); |
||||
|
||||
static void OnFailoverTimer(void* arg, grpc_error* error); |
||||
void OnFailoverTimerLocked(grpc_error* error); |
||||
static void OnDeactivationTimer(void* arg, grpc_error* error); |
||||
void OnDeactivationTimerLocked(grpc_error* error); |
||||
|
||||
RefCountedPtr<PriorityLb> priority_policy_; |
||||
const std::string name_; |
||||
|
||||
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
||||
|
||||
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING; |
||||
RefCountedPtr<RefCountedPicker> picker_wrapper_; |
||||
|
||||
// States for delayed removal.
|
||||
grpc_timer deactivation_timer_; |
||||
grpc_closure on_deactivation_timer_; |
||||
bool deactivation_timer_callback_pending_ = false; |
||||
|
||||
// States of failover.
|
||||
grpc_timer failover_timer_; |
||||
grpc_closure on_failover_timer_; |
||||
bool failover_timer_callback_pending_ = false; |
||||
}; |
||||
|
||||
~PriorityLb(); |
||||
|
||||
void ShutdownLocked() override; |
||||
|
||||
// Returns UINT32_MAX if child is not in current priority list.
|
||||
uint32_t GetChildPriorityLocked(const std::string& child_name) const; |
||||
|
||||
void HandleChildConnectivityStateChangeLocked(ChildPriority* child); |
||||
void DeleteChild(ChildPriority* child); |
||||
|
||||
void TryNextPriorityLocked(bool report_connecting); |
||||
void SelectPriorityLocked(uint32_t priority); |
||||
|
||||
const int child_failover_timeout_ms_; |
||||
|
||||
// Current channel args and config from the resolver.
|
||||
const grpc_channel_args* args_ = nullptr; |
||||
RefCountedPtr<PriorityLbConfig> config_; |
||||
HierarchicalAddressMap addresses_; |
||||
|
||||
// Internal state.
|
||||
bool shutting_down_ = false; |
||||
|
||||
std::map<std::string, OrphanablePtr<ChildPriority>> children_; |
||||
// The priority that is being used.
|
||||
uint32_t current_priority_ = UINT32_MAX; |
||||
// Points to the current child from before the most recent update.
|
||||
// We will continue to use this child until we decide which of the new
|
||||
// children to use.
|
||||
ChildPriority* current_child_from_before_update_ = nullptr; |
||||
}; |
||||
|
||||
//
|
||||
// PriorityLb
|
||||
//
|
||||
|
||||
PriorityLb::PriorityLb(Args args) |
||||
: LoadBalancingPolicy(std::move(args)), |
||||
child_failover_timeout_ms_(grpc_channel_args_find_integer( |
||||
args.args, GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS, |
||||
{kDefaultChildFailoverTimeoutMs, 0, INT_MAX})) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, "[priority_lb %p] created", this); |
||||
} |
||||
} |
||||
|
||||
PriorityLb::~PriorityLb() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this); |
||||
} |
||||
grpc_channel_args_destroy(args_); |
||||
} |
||||
|
||||
void PriorityLb::ShutdownLocked() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this); |
||||
} |
||||
shutting_down_ = true; |
||||
children_.clear(); |
||||
} |
||||
|
||||
void PriorityLb::ExitIdleLocked() { |
||||
if (current_priority_ != UINT32_MAX) { |
||||
const std::string& child_name = config_->priorities()[current_priority_]; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] exiting IDLE for current priority %d child %s", |
||||
this, current_priority_, child_name.c_str()); |
||||
} |
||||
children_[child_name]->ExitIdleLocked(); |
||||
} |
||||
} |
||||
|
||||
void PriorityLb::ResetBackoffLocked() { |
||||
for (const auto& p : children_) p.second->ResetBackoffLocked(); |
||||
} |
||||
|
||||
void PriorityLb::UpdateLocked(UpdateArgs args) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, "[priority_lb %p] received update", this); |
||||
} |
||||
// Save current child.
|
||||
if (current_priority_ != UINT32_MAX) { |
||||
const std::string& child_name = config_->priorities()[current_priority_]; |
||||
current_child_from_before_update_ = children_[child_name].get(); |
||||
// Unset current_priority_, since it was an index into the old
|
||||
// config's priority list and may no longer be valid. It will be
|
||||
// reset later by TryNextPriorityLocked(), but we unset it here in
|
||||
// case updating any of our children triggers a state update.
|
||||
current_priority_ = UINT32_MAX; |
||||
} |
||||
// Update config.
|
||||
config_ = std::move(args.config); |
||||
// Update args.
|
||||
grpc_channel_args_destroy(args_); |
||||
args_ = args.args; |
||||
args.args = nullptr; |
||||
// Update addresses.
|
||||
addresses_ = MakeHierarchicalAddressMap(args.addresses); |
||||
// Check all existing children against the new config.
|
||||
for (const auto& p : children_) { |
||||
const std::string& child_name = p.first; |
||||
auto& child = p.second; |
||||
auto config_it = config_->children().find(child_name); |
||||
if (config_it == config_->children().end()) { |
||||
// Existing child not found in new config. Deactivate it.
|
||||
child->DeactivateLocked(); |
||||
} else { |
||||
// Existing child found in new config. Update it.
|
||||
child->UpdateLocked(config_it->second); |
||||
} |
||||
} |
||||
// Try to get connected.
|
||||
TryNextPriorityLocked(/*report_connecting=*/children_.empty()); |
||||
} |
||||
|
||||
uint32_t PriorityLb::GetChildPriorityLocked( |
||||
const std::string& child_name) const { |
||||
for (uint32_t priority = 0; priority < config_->priorities().size(); |
||||
++priority) { |
||||
if (config_->priorities()[priority] == child_name) return priority; |
||||
} |
||||
return UINT32_MAX; |
||||
} |
||||
|
||||
void PriorityLb::HandleChildConnectivityStateChangeLocked( |
||||
ChildPriority* child) { |
||||
// Special case for the child that was the current child before the
|
||||
// most recent update.
|
||||
if (child == current_child_from_before_update_) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] state update for current child from before " |
||||
"config update", |
||||
this); |
||||
} |
||||
if (child->connectivity_state() == GRPC_CHANNEL_READY || |
||||
child->connectivity_state() == GRPC_CHANNEL_IDLE) { |
||||
// If it's still READY or IDLE, we stick with this child, so pass
|
||||
// the new picker up to our parent.
|
||||
channel_control_helper()->UpdateState(child->connectivity_state(), |
||||
child->GetPicker()); |
||||
} else { |
||||
// If it's no longer READY or IDLE, we should stop using it.
|
||||
// We already started trying other priorities as a result of the
|
||||
// update, but calling TryNextPriorityLocked() ensures that we will
|
||||
// properly select between CONNECTING and TRANSIENT_FAILURE as the
|
||||
// new state to report to our parent.
|
||||
current_child_from_before_update_ = nullptr; |
||||
TryNextPriorityLocked(/*report_connecting=*/true); |
||||
} |
||||
return; |
||||
} |
||||
// Otherwise, find the child's priority.
|
||||
uint32_t child_priority = GetChildPriorityLocked(child->name()); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, "[priority_lb %p] state update for priority %d, child %s", |
||||
this, child_priority, child->name().c_str()); |
||||
} |
||||
// Ignore priorities not in the current config.
|
||||
if (child_priority == UINT32_MAX) return; |
||||
// Ignore lower-than-current priorities.
|
||||
if (child_priority > current_priority_) return; |
||||
// If a child reports TRANSIENT_FAILURE, start trying the next priority.
|
||||
// Note that even if this is for a higher-than-current priority, we
|
||||
// may still need to create some children between this priority and
|
||||
// the current one (e.g., if we got an update that inserted new
|
||||
// priorities ahead of the current one).
|
||||
if (child->connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
||||
TryNextPriorityLocked( |
||||
/*report_connecting=*/child_priority == current_priority_); |
||||
return; |
||||
} |
||||
// The update is for a higher-than-current priority (or for any
|
||||
// priority if we don't have any current priority).
|
||||
if (child_priority < current_priority_) { |
||||
// If the child reports READY or IDLE, switch to that priority.
|
||||
// Otherwise, ignore the update.
|
||||
if (child->connectivity_state() == GRPC_CHANNEL_READY || |
||||
child->connectivity_state() == GRPC_CHANNEL_IDLE) { |
||||
SelectPriorityLocked(child_priority); |
||||
} |
||||
return; |
||||
} |
||||
// The current priority has returned a new picker, so pass it up to
|
||||
// our parent.
|
||||
channel_control_helper()->UpdateState(child->connectivity_state(), |
||||
child->GetPicker()); |
||||
} |
||||
|
||||
void PriorityLb::DeleteChild(ChildPriority* child) { |
||||
// If this was the current child from before the most recent update,
|
||||
// stop using it. We already started trying other priorities as a
|
||||
// result of the update, but calling TryNextPriorityLocked() ensures that
|
||||
// we will properly select between CONNECTING and TRANSIENT_FAILURE as the
|
||||
// new state to report to our parent.
|
||||
if (current_child_from_before_update_ == child) { |
||||
current_child_from_before_update_ = nullptr; |
||||
TryNextPriorityLocked(/*report_connecting=*/true); |
||||
} |
||||
children_.erase(child->name()); |
||||
} |
||||
|
||||
void PriorityLb::TryNextPriorityLocked(bool report_connecting) { |
||||
for (uint32_t priority = 0; priority < config_->priorities().size(); |
||||
++priority) { |
||||
// If the child for the priority does not exist yet, create it.
|
||||
const std::string& child_name = config_->priorities()[priority]; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, "[priority_lb %p] trying priority %d, child %s", this, |
||||
priority, child_name.c_str()); |
||||
} |
||||
auto& child = children_[child_name]; |
||||
if (child == nullptr) { |
||||
if (report_connecting) { |
||||
channel_control_helper()->UpdateState( |
||||
GRPC_CHANNEL_CONNECTING, |
||||
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"))); |
||||
} |
||||
child = MakeOrphanable<ChildPriority>( |
||||
Ref(DEBUG_LOCATION, "ChildPriority"), child_name); |
||||
child->UpdateLocked(config_->children().find(child_name)->second); |
||||
return; |
||||
} |
||||
// The child already exists.
|
||||
child->MaybeReactivateLocked(); |
||||
// If the child is in state READY or IDLE, switch to it.
|
||||
if (child->connectivity_state() == GRPC_CHANNEL_READY || |
||||
child->connectivity_state() == GRPC_CHANNEL_IDLE) { |
||||
SelectPriorityLocked(priority); |
||||
return; |
||||
} |
||||
// Child is not READY or IDLE.
|
||||
// If its failover timer is still pending, give it time to fire.
|
||||
if (child->failover_timer_callback_pending()) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] priority %d, child %s: child still " |
||||
"attempting to connect, will wait", |
||||
this, priority, child_name.c_str()); |
||||
} |
||||
if (report_connecting) { |
||||
channel_control_helper()->UpdateState( |
||||
GRPC_CHANNEL_CONNECTING, |
||||
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"))); |
||||
} |
||||
return; |
||||
} |
||||
// Child has been failing for a while. Move on to the next priority.
|
||||
} |
||||
// If there are no more priorities to try, report TRANSIENT_FAILURE.
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] no priority reachable, putting channel in " |
||||
"TRANSIENT_FAILURE", |
||||
this); |
||||
} |
||||
current_priority_ = UINT32_MAX; |
||||
current_child_from_before_update_ = nullptr; |
||||
grpc_error* error = grpc_error_set_int( |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready priority"), |
||||
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); |
||||
channel_control_helper()->UpdateState( |
||||
GRPC_CHANNEL_TRANSIENT_FAILURE, |
||||
absl::make_unique<TransientFailurePicker>(error)); |
||||
} |
||||
|
||||
void PriorityLb::SelectPriorityLocked(uint32_t priority) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, "[priority_lb %p] selected priority %d, child %s", this, |
||||
priority, config_->priorities()[priority].c_str()); |
||||
} |
||||
current_priority_ = priority; |
||||
current_child_from_before_update_ = nullptr; |
||||
// Deactivate lower priorities.
|
||||
for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) { |
||||
const std::string& child_name = config_->priorities()[p]; |
||||
auto it = children_.find(child_name); |
||||
if (it != children_.end()) it->second->DeactivateLocked(); |
||||
} |
||||
// Update picker.
|
||||
auto& child = children_[config_->priorities()[priority]]; |
||||
channel_control_helper()->UpdateState(child->connectivity_state(), |
||||
child->GetPicker()); |
||||
} |
||||
|
||||
//
|
||||
// PriorityLb::ChildPriority
|
||||
//
|
||||
|
||||
PriorityLb::ChildPriority::ChildPriority( |
||||
RefCountedPtr<PriorityLb> priority_policy, std::string name) |
||||
: priority_policy_(std::move(priority_policy)), name_(std::move(name)) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)", |
||||
priority_policy_.get(), name_.c_str(), this); |
||||
} |
||||
GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this, |
||||
grpc_schedule_on_exec_ctx); |
||||
GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Start the failover timer.
|
||||
StartFailoverTimerLocked(); |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::Orphan() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned", |
||||
priority_policy_.get(), name_.c_str(), this); |
||||
} |
||||
MaybeCancelFailoverTimerLocked(); |
||||
if (deactivation_timer_callback_pending_) { |
||||
grpc_timer_cancel(&deactivation_timer_); |
||||
} |
||||
// Remove the child policy's interested_parties pollset_set from the
|
||||
// xDS policy.
|
||||
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), |
||||
priority_policy_->interested_parties()); |
||||
child_policy_.reset(); |
||||
// Drop our ref to the child's picker, in case it's holding a ref to
|
||||
// the child.
|
||||
picker_wrapper_.reset(); |
||||
if (deactivation_timer_callback_pending_) { |
||||
grpc_timer_cancel(&deactivation_timer_); |
||||
} |
||||
Unref(DEBUG_LOCATION, "ChildPriority+Orphan"); |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::UpdateLocked( |
||||
RefCountedPtr<LoadBalancingPolicy::Config> config) { |
||||
if (priority_policy_->shutting_down_) return; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update", |
||||
priority_policy_.get(), name_.c_str(), this); |
||||
} |
||||
// Create policy if needed.
|
||||
if (child_policy_ == nullptr) { |
||||
child_policy_ = CreateChildPolicyLocked(priority_policy_->args_); |
||||
} |
||||
// Construct update args.
|
||||
UpdateArgs update_args; |
||||
update_args.config = std::move(config); |
||||
update_args.addresses = priority_policy_->addresses_[name_]; |
||||
update_args.args = grpc_channel_args_copy(priority_policy_->args_); |
||||
// Update the policy.
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] child %s (%p): updating child policy handler %p", |
||||
priority_policy_.get(), name_.c_str(), this, child_policy_.get()); |
||||
} |
||||
child_policy_->UpdateLocked(std::move(update_args)); |
||||
} |
||||
|
||||
OrphanablePtr<LoadBalancingPolicy> |
||||
PriorityLb::ChildPriority::CreateChildPolicyLocked( |
||||
const grpc_channel_args* args) { |
||||
LoadBalancingPolicy::Args lb_policy_args; |
||||
lb_policy_args.work_serializer = priority_policy_->work_serializer(); |
||||
lb_policy_args.args = args; |
||||
lb_policy_args.channel_control_helper = |
||||
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper")); |
||||
OrphanablePtr<LoadBalancingPolicy> lb_policy = |
||||
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
||||
&grpc_lb_priority_trace); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] child %s (%p): created new child policy " |
||||
"handler %p", |
||||
priority_policy_.get(), name_.c_str(), this, lb_policy.get()); |
||||
} |
||||
// Add the parent's interested_parties pollset_set to that of the newly
|
||||
// created child policy. This will make the child policy progress upon
|
||||
// activity on the parent LB, which in turn is tied to the application's call.
|
||||
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
||||
priority_policy_->interested_parties()); |
||||
return lb_policy; |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::ExitIdleLocked() { |
||||
if (connectivity_state_ == GRPC_CHANNEL_IDLE && |
||||
!failover_timer_callback_pending_) { |
||||
StartFailoverTimerLocked(); |
||||
} |
||||
child_policy_->ExitIdleLocked(); |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::ResetBackoffLocked() { |
||||
child_policy_->ResetBackoffLocked(); |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked( |
||||
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] child %s (%p): state update: %s, picker %p", |
||||
priority_policy_.get(), name_.c_str(), this, |
||||
ConnectivityStateName(state), picker.get()); |
||||
} |
||||
// Store the state and picker.
|
||||
connectivity_state_ = state; |
||||
picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker)); |
||||
// If READY or TRANSIENT_FAILURE, cancel failover timer.
|
||||
if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
||||
MaybeCancelFailoverTimerLocked(); |
||||
} |
||||
// Notify the parent policy.
|
||||
priority_policy_->HandleChildConnectivityStateChangeLocked(this); |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::StartFailoverTimerLocked() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] child %s (%p): starting failover timer for %d ms", |
||||
priority_policy_.get(), name_.c_str(), this, |
||||
priority_policy_->child_failover_timeout_ms_); |
||||
} |
||||
Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release(); |
||||
grpc_timer_init( |
||||
&failover_timer_, |
||||
ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_ms_, |
||||
&on_failover_timer_); |
||||
failover_timer_callback_pending_ = true; |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() { |
||||
if (failover_timer_callback_pending_) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] child %s (%p): cancelling failover timer", |
||||
priority_policy_.get(), name_.c_str(), this); |
||||
} |
||||
grpc_timer_cancel(&failover_timer_); |
||||
failover_timer_callback_pending_ = false; |
||||
} |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::OnFailoverTimer(void* arg, grpc_error* error) { |
||||
ChildPriority* self = static_cast<ChildPriority*>(arg); |
||||
GRPC_ERROR_REF(error); // ref owned by lambda
|
||||
self->priority_policy_->work_serializer()->Run( |
||||
[self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION); |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::OnFailoverTimerLocked(grpc_error* error) { |
||||
if (error == GRPC_ERROR_NONE && failover_timer_callback_pending_ && |
||||
!priority_policy_->shutting_down_) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] child %s (%p): failover timer fired, " |
||||
"reporting TRANSIENT_FAILURE", |
||||
priority_policy_.get(), name_.c_str(), this); |
||||
} |
||||
failover_timer_callback_pending_ = false; |
||||
OnConnectivityStateUpdateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, nullptr); |
||||
} |
||||
Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked"); |
||||
GRPC_ERROR_UNREF(error); |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::DeactivateLocked() { |
||||
// If already deactivated, don't do it again.
|
||||
if (deactivation_timer_callback_pending_) return; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] child %s (%p): deactivating -- will remove in %d " |
||||
"ms.", |
||||
priority_policy_.get(), name_.c_str(), this, |
||||
kChildRetentionIntervalMs); |
||||
} |
||||
MaybeCancelFailoverTimerLocked(); |
||||
// Start a timer to delete the child.
|
||||
Ref(DEBUG_LOCATION, "ChildPriority+timer").release(); |
||||
grpc_timer_init(&deactivation_timer_, |
||||
ExecCtx::Get()->Now() + kChildRetentionIntervalMs, |
||||
&on_deactivation_timer_); |
||||
deactivation_timer_callback_pending_ = true; |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::MaybeReactivateLocked() { |
||||
if (deactivation_timer_callback_pending_) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating", |
||||
priority_policy_.get(), name_.c_str(), this); |
||||
} |
||||
deactivation_timer_callback_pending_ = false; |
||||
grpc_timer_cancel(&deactivation_timer_); |
||||
} |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg, |
||||
grpc_error* error) { |
||||
ChildPriority* self = static_cast<ChildPriority*>(arg); |
||||
GRPC_ERROR_REF(error); // ref owned by lambda
|
||||
self->priority_policy_->work_serializer()->Run( |
||||
[self, error]() { self->OnDeactivationTimerLocked(error); }, |
||||
DEBUG_LOCATION); |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::OnDeactivationTimerLocked(grpc_error* error) { |
||||
if (error == GRPC_ERROR_NONE && deactivation_timer_callback_pending_ && |
||||
!priority_policy_->shutting_down_) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[priority_lb %p] child %s (%p): deactivation timer fired, " |
||||
"deleting child", |
||||
priority_policy_.get(), name_.c_str(), this); |
||||
} |
||||
deactivation_timer_callback_pending_ = false; |
||||
priority_policy_->DeleteChild(this); |
||||
} |
||||
Unref(DEBUG_LOCATION, "ChildPriority+timer"); |
||||
GRPC_ERROR_UNREF(error); |
||||
} |
||||
|
||||
//
|
||||
// PriorityLb::ChildPriority::Helper
|
||||
//
|
||||
|
||||
void PriorityLb::ChildPriority::Helper::RequestReresolution() { |
||||
if (priority_->priority_policy_->shutting_down_) return; |
||||
priority_->priority_policy_->channel_control_helper()->RequestReresolution(); |
||||
} |
||||
|
||||
RefCountedPtr<SubchannelInterface> |
||||
PriorityLb::ChildPriority::Helper::CreateSubchannel( |
||||
const grpc_channel_args& args) { |
||||
if (priority_->priority_policy_->shutting_down_) return nullptr; |
||||
return priority_->priority_policy_->channel_control_helper() |
||||
->CreateSubchannel(args); |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::Helper::UpdateState( |
||||
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { |
||||
if (priority_->priority_policy_->shutting_down_) return; |
||||
// Notify the priority.
|
||||
priority_->OnConnectivityStateUpdateLocked(state, std::move(picker)); |
||||
} |
||||
|
||||
void PriorityLb::ChildPriority::Helper::AddTraceEvent(TraceSeverity severity, |
||||
StringView message) { |
||||
if (priority_->priority_policy_->shutting_down_) return; |
||||
priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity, |
||||
message); |
||||
} |
||||
|
||||
//
|
||||
// factory
|
||||
//
|
||||
|
||||
class PriorityLbFactory : public LoadBalancingPolicyFactory { |
||||
public: |
||||
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
||||
LoadBalancingPolicy::Args args) const override { |
||||
return MakeOrphanable<PriorityLb>(std::move(args)); |
||||
} |
||||
|
||||
const char* name() const override { return kPriority; } |
||||
|
||||
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
||||
const Json& json, grpc_error** error) const override { |
||||
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); |
||||
if (json.type() == Json::Type::JSON_NULL) { |
||||
// priority was mentioned as a policy in the deprecated
|
||||
// loadBalancingPolicy field or in the client API.
|
||||
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:loadBalancingPolicy error:priority policy requires " |
||||
"configuration. Please use loadBalancingConfig field of service " |
||||
"config instead."); |
||||
return nullptr; |
||||
} |
||||
std::vector<grpc_error*> error_list; |
||||
// Children.
|
||||
std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>> children; |
||||
auto it = json.object_value().find("children"); |
||||
if (it == json.object_value().end()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:children error:required field missing")); |
||||
} else if (it->second.type() != Json::Type::OBJECT) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:children error:type should be object")); |
||||
} else { |
||||
const Json::Object& object = it->second.object_value(); |
||||
for (const auto& p : object) { |
||||
const std::string& child_name = p.first; |
||||
const Json& element = p.second; |
||||
if (element.type() != Json::Type::OBJECT) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||
absl::StrCat("field:children key:", child_name, |
||||
" error:should be type object") |
||||
.c_str())); |
||||
} else { |
||||
auto it2 = element.object_value().find("config"); |
||||
if (it2 == element.object_value().end()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||
absl::StrCat("field:children key:", child_name, |
||||
" error:missing 'config' field") |
||||
.c_str())); |
||||
} else { |
||||
grpc_error* parse_error = GRPC_ERROR_NONE; |
||||
auto config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( |
||||
it2->second, &parse_error); |
||||
if (config == nullptr) { |
||||
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); |
||||
error_list.push_back( |
||||
GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING( |
||||
absl::StrCat("field:children key:", child_name).c_str(), |
||||
&parse_error, 1)); |
||||
GRPC_ERROR_UNREF(parse_error); |
||||
} |
||||
children[child_name] = std::move(config); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
// Priorities.
|
||||
std::vector<std::string> priorities; |
||||
it = json.object_value().find("priorities"); |
||||
if (it == json.object_value().end()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:priorities error:required field missing")); |
||||
} else if (it->second.type() != Json::Type::ARRAY) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:priorities error:type should be array")); |
||||
} else { |
||||
const Json::Array& array = it->second.array_value(); |
||||
for (size_t i = 0; i < array.size(); ++i) { |
||||
const Json& element = array[i]; |
||||
if (element.type() != Json::Type::STRING) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||
absl::StrCat("field:priorities element:", i, |
||||
" error:should be type string") |
||||
.c_str())); |
||||
} else if (children.find(element.string_value()) == children.end()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||
absl::StrCat("field:priorities element:", i, |
||||
" error:unknown child '", element.string_value(), |
||||
"'") |
||||
.c_str())); |
||||
} else { |
||||
priorities.emplace_back(element.string_value()); |
||||
} |
||||
} |
||||
if (priorities.size() != children.size()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||
absl::StrCat("field:priorities error:priorities size (", |
||||
priorities.size(), ") != children size (", |
||||
children.size(), ")") |
||||
.c_str())); |
||||
} |
||||
} |
||||
if (error_list.empty()) { |
||||
return MakeRefCounted<PriorityLbConfig>(std::move(children), |
||||
std::move(priorities)); |
||||
} else { |
||||
*error = GRPC_ERROR_CREATE_FROM_VECTOR( |
||||
"priority_experimental LB policy config", &error_list); |
||||
return nullptr; |
||||
} |
||||
} |
||||
}; |
||||
|
||||
} // namespace
|
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
//
|
||||
// Plugin registration
|
||||
//
|
||||
|
||||
void grpc_lb_policy_priority_init() { |
||||
grpc_core::LoadBalancingPolicyRegistry::Builder:: |
||||
RegisterLoadBalancingPolicyFactory( |
||||
absl::make_unique<grpc_core::PriorityLbFactory>()); |
||||
} |
||||
|
||||
void grpc_lb_policy_priority_shutdown() {} |
@ -0,0 +1,721 @@ |
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <inttypes.h> |
||||
#include <limits.h> |
||||
#include <string.h> |
||||
|
||||
#include "absl/strings/str_cat.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gpr/string.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
#include "src/core/lib/iomgr/work_serializer.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb"); |
||||
|
||||
namespace { |
||||
|
||||
constexpr char kWeightedTarget[] = "weighted_target_experimental"; |
||||
|
||||
// How long we keep a child around for after it has been removed from
|
||||
// the config.
|
||||
constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000; |
||||
|
||||
// Config for weighted_target LB policy.
|
||||
class WeightedTargetLbConfig : public LoadBalancingPolicy::Config { |
||||
public: |
||||
struct ChildConfig { |
||||
uint32_t weight; |
||||
RefCountedPtr<LoadBalancingPolicy::Config> config; |
||||
}; |
||||
|
||||
using TargetMap = std::map<std::string, ChildConfig>; |
||||
|
||||
explicit WeightedTargetLbConfig(TargetMap target_map) |
||||
: target_map_(std::move(target_map)) {} |
||||
|
||||
const char* name() const override { return kWeightedTarget; } |
||||
|
||||
const TargetMap& target_map() const { return target_map_; } |
||||
|
||||
private: |
||||
TargetMap target_map_; |
||||
}; |
||||
|
||||
// weighted_target LB policy.
|
||||
class WeightedTargetLb : public LoadBalancingPolicy { |
||||
public: |
||||
explicit WeightedTargetLb(Args args); |
||||
|
||||
const char* name() const override { return kWeightedTarget; } |
||||
|
||||
void UpdateLocked(UpdateArgs args) override; |
||||
void ResetBackoffLocked() override; |
||||
|
||||
private: |
||||
// A simple wrapper for ref-counting a picker from the child policy.
|
||||
class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> { |
||||
public: |
||||
explicit ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker) |
||||
: picker_(std::move(picker)) {} |
||||
PickResult Pick(PickArgs args) { return picker_->Pick(args); } |
||||
|
||||
private: |
||||
std::unique_ptr<SubchannelPicker> picker_; |
||||
}; |
||||
|
||||
// Picks a child using stateless WRR and then delegates to that
|
||||
// child's picker.
|
||||
class WeightedPicker : public SubchannelPicker { |
||||
public: |
||||
// Maintains a weighted list of pickers from each child that is in
|
||||
// ready state. The first element in the pair represents the end of a
|
||||
// range proportional to the child's weight. The start of the range
|
||||
// is the previous value in the vector and is 0 for the first element.
|
||||
using PickerList = |
||||
InlinedVector<std::pair<uint32_t, RefCountedPtr<ChildPickerWrapper>>, |
||||
1>; |
||||
|
||||
explicit WeightedPicker(PickerList pickers) |
||||
: pickers_(std::move(pickers)) {} |
||||
|
||||
PickResult Pick(PickArgs args) override; |
||||
|
||||
private: |
||||
PickerList pickers_; |
||||
}; |
||||
|
||||
// Each WeightedChild holds a ref to its parent WeightedTargetLb.
|
||||
class WeightedChild : public InternallyRefCounted<WeightedChild> { |
||||
public: |
||||
WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy, |
||||
const std::string& name); |
||||
~WeightedChild(); |
||||
|
||||
void Orphan() override; |
||||
|
||||
void UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config, |
||||
ServerAddressList addresses, |
||||
const grpc_channel_args* args); |
||||
void ResetBackoffLocked(); |
||||
void DeactivateLocked(); |
||||
|
||||
uint32_t weight() const { return weight_; } |
||||
grpc_connectivity_state connectivity_state() const { |
||||
return connectivity_state_; |
||||
} |
||||
RefCountedPtr<ChildPickerWrapper> picker_wrapper() const { |
||||
return picker_wrapper_; |
||||
} |
||||
|
||||
private: |
||||
class Helper : public ChannelControlHelper { |
||||
public: |
||||
explicit Helper(RefCountedPtr<WeightedChild> weighted_child) |
||||
: weighted_child_(std::move(weighted_child)) {} |
||||
|
||||
~Helper() { weighted_child_.reset(DEBUG_LOCATION, "Helper"); } |
||||
|
||||
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
||||
const grpc_channel_args& args) override; |
||||
void UpdateState(grpc_connectivity_state state, |
||||
std::unique_ptr<SubchannelPicker> picker) override; |
||||
void RequestReresolution() override; |
||||
void AddTraceEvent(TraceSeverity severity, StringView message) override; |
||||
|
||||
private: |
||||
RefCountedPtr<WeightedChild> weighted_child_; |
||||
}; |
||||
|
||||
// Methods for dealing with the child policy.
|
||||
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
||||
const grpc_channel_args* args); |
||||
|
||||
void OnConnectivityStateUpdateLocked( |
||||
grpc_connectivity_state state, |
||||
std::unique_ptr<SubchannelPicker> picker); |
||||
|
||||
static void OnDelayedRemovalTimer(void* arg, grpc_error* error); |
||||
void OnDelayedRemovalTimerLocked(grpc_error* error); |
||||
|
||||
// The owning LB policy.
|
||||
RefCountedPtr<WeightedTargetLb> weighted_target_policy_; |
||||
|
||||
const std::string& name_; |
||||
|
||||
uint32_t weight_; |
||||
|
||||
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
||||
|
||||
RefCountedPtr<ChildPickerWrapper> picker_wrapper_; |
||||
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING; |
||||
bool seen_failure_since_ready_ = false; |
||||
|
||||
// States for delayed removal.
|
||||
grpc_timer delayed_removal_timer_; |
||||
grpc_closure on_delayed_removal_timer_; |
||||
bool delayed_removal_timer_callback_pending_ = false; |
||||
bool shutdown_ = false; |
||||
}; |
||||
|
||||
~WeightedTargetLb(); |
||||
|
||||
void ShutdownLocked() override; |
||||
|
||||
void UpdateStateLocked(); |
||||
|
||||
// Current config from the resolver.
|
||||
RefCountedPtr<WeightedTargetLbConfig> config_; |
||||
|
||||
// Internal state.
|
||||
bool shutting_down_ = false; |
||||
|
||||
// Children.
|
||||
std::map<std::string, OrphanablePtr<WeightedChild>> targets_; |
||||
}; |
||||
|
||||
//
|
||||
// WeightedTargetLb::WeightedPicker
|
||||
//
|
||||
|
||||
WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick( |
||||
PickArgs args) { |
||||
// Generate a random number in [0, total weight).
|
||||
const uint32_t key = rand() % pickers_[pickers_.size() - 1].first; |
||||
// Find the index in pickers_ corresponding to key.
|
||||
size_t mid = 0; |
||||
size_t start_index = 0; |
||||
size_t end_index = pickers_.size() - 1; |
||||
size_t index = 0; |
||||
while (end_index > start_index) { |
||||
mid = (start_index + end_index) / 2; |
||||
if (pickers_[mid].first > key) { |
||||
end_index = mid; |
||||
} else if (pickers_[mid].first < key) { |
||||
start_index = mid + 1; |
||||
} else { |
||||
index = mid + 1; |
||||
break; |
||||
} |
||||
} |
||||
if (index == 0) index = start_index; |
||||
GPR_ASSERT(pickers_[index].first > key); |
||||
// Delegate to the child picker.
|
||||
return pickers_[index].second->Pick(args); |
||||
} |
||||
|
||||
//
|
||||
// WeightedTargetLb
|
||||
//
|
||||
|
||||
WeightedTargetLb::WeightedTargetLb(Args args) |
||||
: LoadBalancingPolicy(std::move(args)) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, "[weighted_target_lb %p] created", this); |
||||
} |
||||
} |
||||
|
||||
WeightedTargetLb::~WeightedTargetLb() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[weighted_target_lb %p] destroying weighted_target LB policy", |
||||
this); |
||||
} |
||||
} |
||||
|
||||
void WeightedTargetLb::ShutdownLocked() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, "[weighted_target_lb %p] shutting down", this); |
||||
} |
||||
shutting_down_ = true; |
||||
targets_.clear(); |
||||
} |
||||
|
||||
void WeightedTargetLb::ResetBackoffLocked() { |
||||
for (auto& p : targets_) p.second->ResetBackoffLocked(); |
||||
} |
||||
|
||||
void WeightedTargetLb::UpdateLocked(UpdateArgs args) { |
||||
if (shutting_down_) return; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this); |
||||
} |
||||
// Update config.
|
||||
config_ = std::move(args.config); |
||||
// Deactivate the targets not in the new config.
|
||||
for (const auto& p : targets_) { |
||||
const std::string& name = p.first; |
||||
WeightedChild* child = p.second.get(); |
||||
if (config_->target_map().find(name) == config_->target_map().end()) { |
||||
child->DeactivateLocked(); |
||||
} |
||||
} |
||||
// Add or update the targets in the new config.
|
||||
HierarchicalAddressMap address_map = |
||||
MakeHierarchicalAddressMap(args.addresses); |
||||
for (const auto& p : config_->target_map()) { |
||||
const std::string& name = p.first; |
||||
const WeightedTargetLbConfig::ChildConfig& config = p.second; |
||||
auto it = targets_.find(name); |
||||
if (it == targets_.end()) { |
||||
it = targets_.emplace(std::make_pair(name, nullptr)).first; |
||||
it->second = MakeOrphanable<WeightedChild>( |
||||
Ref(DEBUG_LOCATION, "WeightedChild"), it->first); |
||||
} |
||||
it->second->UpdateLocked(config, std::move(address_map[name]), args.args); |
||||
} |
||||
} |
||||
|
||||
void WeightedTargetLb::UpdateStateLocked() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[weighted_target_lb %p] scanning children to determine " |
||||
"connectivity state", |
||||
this); |
||||
} |
||||
// Construct a new picker which maintains a map of all child pickers
|
||||
// that are ready. Each child is represented by a portion of the range
|
||||
// proportional to its weight, such that the total range is the sum of the
|
||||
// weights of all children.
|
||||
WeightedPicker::PickerList picker_list; |
||||
uint32_t end = 0; |
||||
// Also count the number of children in each state, to determine the
|
||||
// overall state.
|
||||
size_t num_connecting = 0; |
||||
size_t num_idle = 0; |
||||
size_t num_transient_failures = 0; |
||||
for (const auto& p : targets_) { |
||||
const std::string& child_name = p.first; |
||||
const WeightedChild* child = p.second.get(); |
||||
// Skip the targets that are not in the latest update.
|
||||
if (config_->target_map().find(child_name) == config_->target_map().end()) { |
||||
continue; |
||||
} |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[weighted_target_lb %p] child=%s state=%s weight=%d picker=%p", |
||||
this, child_name.c_str(), |
||||
ConnectivityStateName(child->connectivity_state()), |
||||
child->weight(), child->picker_wrapper().get()); |
||||
} |
||||
switch (child->connectivity_state()) { |
||||
case GRPC_CHANNEL_READY: { |
||||
end += child->weight(); |
||||
picker_list.push_back(std::make_pair(end, child->picker_wrapper())); |
||||
break; |
||||
} |
||||
case GRPC_CHANNEL_CONNECTING: { |
||||
++num_connecting; |
||||
break; |
||||
} |
||||
case GRPC_CHANNEL_IDLE: { |
||||
++num_idle; |
||||
break; |
||||
} |
||||
case GRPC_CHANNEL_TRANSIENT_FAILURE: { |
||||
++num_transient_failures; |
||||
break; |
||||
} |
||||
default: |
||||
GPR_UNREACHABLE_CODE(return ); |
||||
} |
||||
} |
||||
// Determine aggregated connectivity state.
|
||||
grpc_connectivity_state connectivity_state; |
||||
if (!picker_list.empty()) { |
||||
connectivity_state = GRPC_CHANNEL_READY; |
||||
} else if (num_connecting > 0) { |
||||
connectivity_state = GRPC_CHANNEL_CONNECTING; |
||||
} else if (num_idle > 0) { |
||||
connectivity_state = GRPC_CHANNEL_IDLE; |
||||
} else { |
||||
connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; |
||||
} |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, "[weighted_target_lb %p] connectivity changed to %s", |
||||
this, ConnectivityStateName(connectivity_state)); |
||||
} |
||||
std::unique_ptr<SubchannelPicker> picker; |
||||
switch (connectivity_state) { |
||||
case GRPC_CHANNEL_READY: |
||||
picker = absl::make_unique<WeightedPicker>(std::move(picker_list)); |
||||
break; |
||||
case GRPC_CHANNEL_CONNECTING: |
||||
case GRPC_CHANNEL_IDLE: |
||||
picker = |
||||
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")); |
||||
break; |
||||
default: |
||||
picker = absl::make_unique<TransientFailurePicker>( |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"weighted_target: all children report state TRANSIENT_FAILURE")); |
||||
} |
||||
channel_control_helper()->UpdateState(connectivity_state, std::move(picker)); |
||||
} |
||||
|
||||
//
|
||||
// WeightedTargetLb::WeightedChild
|
||||
//
|
||||
|
||||
WeightedTargetLb::WeightedChild::WeightedChild( |
||||
RefCountedPtr<WeightedTargetLb> weighted_target_policy, |
||||
const std::string& name) |
||||
: weighted_target_policy_(std::move(weighted_target_policy)), name_(name) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s", |
||||
weighted_target_policy_.get(), this, name_.c_str()); |
||||
} |
||||
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, |
||||
grpc_schedule_on_exec_ctx); |
||||
} |
||||
|
||||
WeightedTargetLb::WeightedChild::~WeightedChild() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[weighted_target_lb %p] WeightedChild %p %s: destroying child", |
||||
weighted_target_policy_.get(), this, name_.c_str()); |
||||
} |
||||
weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild"); |
||||
} |
||||
|
||||
void WeightedTargetLb::WeightedChild::Orphan() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[weighted_target_lb %p] WeightedChild %p %s: shutting down child", |
||||
weighted_target_policy_.get(), this, name_.c_str()); |
||||
} |
||||
// Remove the child policy's interested_parties pollset_set from the
|
||||
// xDS policy.
|
||||
grpc_pollset_set_del_pollset_set( |
||||
child_policy_->interested_parties(), |
||||
weighted_target_policy_->interested_parties()); |
||||
child_policy_.reset(); |
||||
// Drop our ref to the child's picker, in case it's holding a ref to
|
||||
// the child.
|
||||
picker_wrapper_.reset(); |
||||
if (delayed_removal_timer_callback_pending_) { |
||||
delayed_removal_timer_callback_pending_ = false; |
||||
grpc_timer_cancel(&delayed_removal_timer_); |
||||
} |
||||
shutdown_ = true; |
||||
Unref(); |
||||
} |
||||
|
||||
OrphanablePtr<LoadBalancingPolicy> |
||||
WeightedTargetLb::WeightedChild::CreateChildPolicyLocked( |
||||
const grpc_channel_args* args) { |
||||
LoadBalancingPolicy::Args lb_policy_args; |
||||
lb_policy_args.work_serializer = weighted_target_policy_->work_serializer(); |
||||
lb_policy_args.args = args; |
||||
lb_policy_args.channel_control_helper = |
||||
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper")); |
||||
OrphanablePtr<LoadBalancingPolicy> lb_policy = |
||||
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
||||
&grpc_lb_weighted_target_trace); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[weighted_target_lb %p] WeightedChild %p %s: Created new child " |
||||
"policy handler %p", |
||||
weighted_target_policy_.get(), this, name_.c_str(), |
||||
lb_policy.get()); |
||||
} |
||||
// Add the xDS's interested_parties pollset_set to that of the newly created
|
||||
// child policy. This will make the child policy progress upon activity on
|
||||
// xDS LB, which in turn is tied to the application's call.
|
||||
grpc_pollset_set_add_pollset_set( |
||||
lb_policy->interested_parties(), |
||||
weighted_target_policy_->interested_parties()); |
||||
return lb_policy; |
||||
} |
||||
|
||||
void WeightedTargetLb::WeightedChild::UpdateLocked( |
||||
const WeightedTargetLbConfig::ChildConfig& config, |
||||
ServerAddressList addresses, const grpc_channel_args* args) { |
||||
if (weighted_target_policy_->shutting_down_) return; |
||||
// Update child weight.
|
||||
weight_ = config.weight; |
||||
// Reactivate if needed.
|
||||
if (delayed_removal_timer_callback_pending_) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[weighted_target_lb %p] WeightedChild %p %s: reactivating", |
||||
weighted_target_policy_.get(), this, name_.c_str()); |
||||
} |
||||
delayed_removal_timer_callback_pending_ = false; |
||||
grpc_timer_cancel(&delayed_removal_timer_); |
||||
} |
||||
// Create child policy if needed.
|
||||
if (child_policy_ == nullptr) { |
||||
child_policy_ = CreateChildPolicyLocked(args); |
||||
} |
||||
// Construct update args.
|
||||
UpdateArgs update_args; |
||||
update_args.config = config.config; |
||||
update_args.addresses = std::move(addresses); |
||||
update_args.args = grpc_channel_args_copy(args); |
||||
// Update the policy.
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[weighted_target_lb %p] WeightedChild %p %s: Updating child " |
||||
"policy handler %p", |
||||
weighted_target_policy_.get(), this, name_.c_str(), |
||||
child_policy_.get()); |
||||
} |
||||
child_policy_->UpdateLocked(std::move(update_args)); |
||||
} |
||||
|
||||
void WeightedTargetLb::WeightedChild::ResetBackoffLocked() { |
||||
child_policy_->ResetBackoffLocked(); |
||||
} |
||||
|
||||
void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked( |
||||
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { |
||||
// Cache the picker in the WeightedChild.
|
||||
picker_wrapper_ = MakeRefCounted<ChildPickerWrapper>(std::move(picker)); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[weighted_target_lb %p] WeightedChild %p %s: connectivity " |
||||
"state update: state=%s picker_wrapper=%p", |
||||
weighted_target_policy_.get(), this, name_.c_str(), |
||||
ConnectivityStateName(state), picker_wrapper_.get()); |
||||
} |
||||
// If the child reports IDLE, immediately tell it to exit idle.
|
||||
if (state == GRPC_CHANNEL_IDLE) child_policy_->ExitIdleLocked(); |
||||
// Decide what state to report for aggregation purposes.
|
||||
// If we haven't seen a failure since the last time we were in state
|
||||
// READY, then we report the state change as-is. However, once we do see
|
||||
// a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
|
||||
// changes until we go back into state READY.
|
||||
if (!seen_failure_since_ready_) { |
||||
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
||||
seen_failure_since_ready_ = true; |
||||
} |
||||
} else { |
||||
if (state != GRPC_CHANNEL_READY) return; |
||||
seen_failure_since_ready_ = false; |
||||
} |
||||
connectivity_state_ = state; |
||||
// Notify the LB policy.
|
||||
weighted_target_policy_->UpdateStateLocked(); |
||||
} |
||||
|
||||
void WeightedTargetLb::WeightedChild::DeactivateLocked() { |
||||
// If already deactivated, don't do that again.
|
||||
if (weight_ == 0) return; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[weighted_target_lb %p] WeightedChild %p %s: deactivating", |
||||
weighted_target_policy_.get(), this, name_.c_str()); |
||||
} |
||||
// Set the child weight to 0 so that future picker won't contain this child.
|
||||
weight_ = 0; |
||||
// Start a timer to delete the child.
|
||||
Ref(DEBUG_LOCATION, "WeightedChild+timer").release(); |
||||
delayed_removal_timer_callback_pending_ = true; |
||||
grpc_timer_init(&delayed_removal_timer_, |
||||
ExecCtx::Get()->Now() + kChildRetentionIntervalMs, |
||||
&on_delayed_removal_timer_); |
||||
} |
||||
|
||||
void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimer(void* arg, |
||||
grpc_error* error) { |
||||
WeightedChild* self = static_cast<WeightedChild*>(arg); |
||||
GRPC_ERROR_REF(error); // ref owned by lambda
|
||||
self->weighted_target_policy_->work_serializer()->Run( |
||||
[self, error]() { self->OnDelayedRemovalTimerLocked(error); }, |
||||
DEBUG_LOCATION); |
||||
} |
||||
|
||||
void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimerLocked( |
||||
grpc_error* error) { |
||||
if (error == GRPC_ERROR_NONE && delayed_removal_timer_callback_pending_ && |
||||
!shutdown_ && weight_ == 0) { |
||||
delayed_removal_timer_callback_pending_ = false; |
||||
weighted_target_policy_->targets_.erase(name_); |
||||
} |
||||
Unref(DEBUG_LOCATION, "WeightedChild+timer"); |
||||
GRPC_ERROR_UNREF(error); |
||||
} |
||||
|
||||
//
|
||||
// WeightedTargetLb::WeightedChild::Helper
|
||||
//
|
||||
|
||||
RefCountedPtr<SubchannelInterface> |
||||
WeightedTargetLb::WeightedChild::Helper::CreateSubchannel( |
||||
const grpc_channel_args& args) { |
||||
if (weighted_child_->weighted_target_policy_->shutting_down_) return nullptr; |
||||
return weighted_child_->weighted_target_policy_->channel_control_helper() |
||||
->CreateSubchannel(args); |
||||
} |
||||
|
||||
void WeightedTargetLb::WeightedChild::Helper::UpdateState( |
||||
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { |
||||
if (weighted_child_->weighted_target_policy_->shutting_down_) return; |
||||
weighted_child_->OnConnectivityStateUpdateLocked(state, std::move(picker)); |
||||
} |
||||
|
||||
void WeightedTargetLb::WeightedChild::Helper::RequestReresolution() { |
||||
if (weighted_child_->weighted_target_policy_->shutting_down_) return; |
||||
weighted_child_->weighted_target_policy_->channel_control_helper() |
||||
->RequestReresolution(); |
||||
} |
||||
|
||||
void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent( |
||||
TraceSeverity severity, StringView message) { |
||||
if (weighted_child_->weighted_target_policy_->shutting_down_) return; |
||||
weighted_child_->weighted_target_policy_->channel_control_helper() |
||||
->AddTraceEvent(severity, message); |
||||
} |
||||
|
||||
//
|
||||
// factory
|
||||
//
|
||||
|
||||
class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { |
||||
public: |
||||
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
||||
LoadBalancingPolicy::Args args) const override { |
||||
return MakeOrphanable<WeightedTargetLb>(std::move(args)); |
||||
} |
||||
|
||||
const char* name() const override { return kWeightedTarget; } |
||||
|
||||
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
||||
const Json& json, grpc_error** error) const override { |
||||
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); |
||||
if (json.type() == Json::Type::JSON_NULL) { |
||||
// weighted_target was mentioned as a policy in the deprecated
|
||||
// loadBalancingPolicy field or in the client API.
|
||||
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:loadBalancingPolicy error:weighted_target policy requires " |
||||
"configuration. Please use loadBalancingConfig field of service " |
||||
"config instead."); |
||||
return nullptr; |
||||
} |
||||
std::vector<grpc_error*> error_list; |
||||
// Weight map.
|
||||
WeightedTargetLbConfig::TargetMap target_map; |
||||
auto it = json.object_value().find("targets"); |
||||
if (it == json.object_value().end()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:targets error:required field not present")); |
||||
} else if (it->second.type() != Json::Type::OBJECT) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:targets error:type should be object")); |
||||
} else { |
||||
for (const auto& p : it->second.object_value()) { |
||||
WeightedTargetLbConfig::ChildConfig child_config; |
||||
std::vector<grpc_error*> child_errors = |
||||
ParseChildConfig(p.second, &child_config); |
||||
if (!child_errors.empty()) { |
||||
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
|
||||
// string is not static in this case.
|
||||
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||
absl::StrCat("field:targets key:", p.first).c_str()); |
||||
for (grpc_error* child_error : child_errors) { |
||||
error = grpc_error_add_child(error, child_error); |
||||
} |
||||
error_list.push_back(error); |
||||
} else { |
||||
target_map[p.first] = std::move(child_config); |
||||
} |
||||
} |
||||
} |
||||
if (!error_list.empty()) { |
||||
*error = GRPC_ERROR_CREATE_FROM_VECTOR( |
||||
"weighted_target_experimental LB policy config", &error_list); |
||||
return nullptr; |
||||
} |
||||
return MakeRefCounted<WeightedTargetLbConfig>(std::move(target_map)); |
||||
} |
||||
|
||||
private: |
||||
static std::vector<grpc_error*> ParseChildConfig( |
||||
const Json& json, WeightedTargetLbConfig::ChildConfig* child_config) { |
||||
std::vector<grpc_error*> error_list; |
||||
if (json.type() != Json::Type::OBJECT) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"value should be of type object")); |
||||
return error_list; |
||||
} |
||||
// Weight.
|
||||
auto it = json.object_value().find("weight"); |
||||
if (it == json.object_value().end()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"required field \"weight\" not specified")); |
||||
} else if (it->second.type() != Json::Type::NUMBER) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:weight error:must be of type number")); |
||||
} else { |
||||
child_config->weight = |
||||
gpr_parse_nonnegative_int(it->second.string_value().c_str()); |
||||
if (child_config->weight == -1) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:weight error:unparseable value")); |
||||
} else if (child_config->weight == 0) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:weight error:value must be greater than zero")); |
||||
} |
||||
} |
||||
// Child policy.
|
||||
it = json.object_value().find("childPolicy"); |
||||
if (it != json.object_value().end()) { |
||||
grpc_error* parse_error = GRPC_ERROR_NONE; |
||||
child_config->config = |
||||
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(it->second, |
||||
&parse_error); |
||||
if (child_config->config == nullptr) { |
||||
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); |
||||
std::vector<grpc_error*> child_errors; |
||||
child_errors.push_back(parse_error); |
||||
error_list.push_back( |
||||
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); |
||||
} |
||||
} |
||||
return error_list; |
||||
} |
||||
}; |
||||
|
||||
} // namespace
|
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
//
|
||||
// Plugin registration
|
||||
//
|
||||
|
||||
void grpc_lb_policy_weighted_target_init() { |
||||
grpc_core::LoadBalancingPolicyRegistry::Builder:: |
||||
RegisterLoadBalancingPolicyFactory( |
||||
absl::make_unique<grpc_core::WeightedTargetLbFactory>()); |
||||
} |
||||
|
||||
void grpc_lb_policy_weighted_target_shutdown() {} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,524 @@ |
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
||||
#include "src/core/ext/filters/client_channel/xds/xds_client.h" |
||||
#include "src/core/ext/filters/client_channel/xds/xds_client_stats.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/iomgr/work_serializer.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
TraceFlag grpc_lb_lrs_trace(false, "lrs_lb"); |
||||
|
||||
namespace { |
||||
|
||||
constexpr char kLrs[] = "lrs_experimental"; |
||||
|
||||
// Config for LRS LB policy.
|
||||
class LrsLbConfig : public LoadBalancingPolicy::Config { |
||||
public: |
||||
LrsLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy, |
||||
std::string cluster_name, std::string eds_service_name, |
||||
std::string lrs_load_reporting_server_name, |
||||
RefCountedPtr<XdsLocalityName> locality_name) |
||||
: child_policy_(std::move(child_policy)), |
||||
cluster_name_(std::move(cluster_name)), |
||||
eds_service_name_(std::move(eds_service_name)), |
||||
lrs_load_reporting_server_name_( |
||||
std::move(lrs_load_reporting_server_name)), |
||||
locality_name_(std::move(locality_name)) {} |
||||
|
||||
const char* name() const override { return kLrs; } |
||||
|
||||
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const { |
||||
return child_policy_; |
||||
} |
||||
const std::string& cluster_name() const { return cluster_name_; } |
||||
const std::string& eds_service_name() const { return eds_service_name_; } |
||||
const std::string& lrs_load_reporting_server_name() const { |
||||
return lrs_load_reporting_server_name_; |
||||
}; |
||||
RefCountedPtr<XdsLocalityName> locality_name() const { |
||||
return locality_name_; |
||||
} |
||||
|
||||
private: |
||||
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_; |
||||
std::string cluster_name_; |
||||
std::string eds_service_name_; |
||||
std::string lrs_load_reporting_server_name_; |
||||
RefCountedPtr<XdsLocalityName> locality_name_; |
||||
}; |
||||
|
||||
// LRS LB policy.
|
||||
class LrsLb : public LoadBalancingPolicy { |
||||
public: |
||||
LrsLb(RefCountedPtr<XdsClient> xds_client, Args args); |
||||
|
||||
const char* name() const override { return kLrs; } |
||||
|
||||
void UpdateLocked(UpdateArgs args) override; |
||||
void ExitIdleLocked() override; |
||||
void ResetBackoffLocked() override; |
||||
|
||||
private: |
||||
// A simple wrapper for ref-counting a picker from the child policy.
|
||||
class RefCountedPicker : public RefCounted<RefCountedPicker> { |
||||
public: |
||||
explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker) |
||||
: picker_(std::move(picker)) {} |
||||
PickResult Pick(PickArgs args) { return picker_->Pick(args); } |
||||
|
||||
private: |
||||
std::unique_ptr<SubchannelPicker> picker_; |
||||
}; |
||||
|
||||
// A picker that wraps the picker from the child to perform load reporting.
|
||||
class LoadReportingPicker : public SubchannelPicker { |
||||
public: |
||||
LoadReportingPicker(RefCountedPtr<RefCountedPicker> picker, |
||||
RefCountedPtr<XdsClusterLocalityStats> locality_stats) |
||||
: picker_(std::move(picker)), |
||||
locality_stats_(std::move(locality_stats)) {} |
||||
|
||||
PickResult Pick(PickArgs args); |
||||
|
||||
private: |
||||
RefCountedPtr<RefCountedPicker> picker_; |
||||
RefCountedPtr<XdsClusterLocalityStats> locality_stats_; |
||||
}; |
||||
|
||||
class Helper : public ChannelControlHelper { |
||||
public: |
||||
explicit Helper(RefCountedPtr<LrsLb> lrs_policy) |
||||
: lrs_policy_(std::move(lrs_policy)) {} |
||||
|
||||
~Helper() { lrs_policy_.reset(DEBUG_LOCATION, "Helper"); } |
||||
|
||||
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
||||
const grpc_channel_args& args) override; |
||||
void UpdateState(grpc_connectivity_state state, |
||||
std::unique_ptr<SubchannelPicker> picker) override; |
||||
void RequestReresolution() override; |
||||
void AddTraceEvent(TraceSeverity severity, StringView message) override; |
||||
|
||||
private: |
||||
RefCountedPtr<LrsLb> lrs_policy_; |
||||
}; |
||||
|
||||
~LrsLb(); |
||||
|
||||
void ShutdownLocked() override; |
||||
|
||||
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
||||
const grpc_channel_args* args); |
||||
void UpdateChildPolicyLocked(ServerAddressList addresses, |
||||
const grpc_channel_args* args); |
||||
|
||||
void MaybeUpdatePickerLocked(); |
||||
|
||||
// Current config from the resolver.
|
||||
RefCountedPtr<LrsLbConfig> config_; |
||||
|
||||
// Internal state.
|
||||
bool shutting_down_ = false; |
||||
|
||||
// The xds client.
|
||||
RefCountedPtr<XdsClient> xds_client_; |
||||
|
||||
// The stats for client-side load reporting.
|
||||
RefCountedPtr<XdsClusterLocalityStats> locality_stats_; |
||||
|
||||
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
||||
|
||||
// Latest state and picker reported by the child policy.
|
||||
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE; |
||||
RefCountedPtr<RefCountedPicker> picker_; |
||||
}; |
||||
|
||||
//
|
||||
// LrsLb::LoadReportingPicker
|
||||
//
|
||||
|
||||
LoadBalancingPolicy::PickResult LrsLb::LoadReportingPicker::Pick( |
||||
LoadBalancingPolicy::PickArgs args) { |
||||
// Forward the pick to the picker returned from the child policy.
|
||||
PickResult result = picker_->Pick(args); |
||||
if (result.type == PickResult::PICK_COMPLETE && |
||||
result.subchannel != nullptr) { |
||||
// Record a call started.
|
||||
locality_stats_->AddCallStarted(); |
||||
// Intercept the recv_trailing_metadata op to record call completion.
|
||||
XdsClusterLocalityStats* locality_stats = |
||||
locality_stats_->Ref(DEBUG_LOCATION, "LocalityStats+call").release(); |
||||
result.recv_trailing_metadata_ready = |
||||
// Note: This callback does not run in either the control plane
|
||||
// work serializer or in the data plane mutex.
|
||||
[locality_stats](grpc_error* error, MetadataInterface* /*metadata*/, |
||||
CallState* /*call_state*/) { |
||||
const bool call_failed = error != GRPC_ERROR_NONE; |
||||
locality_stats->AddCallFinished(call_failed); |
||||
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call"); |
||||
}; |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
//
|
||||
// LrsLb
|
||||
//
|
||||
|
||||
LrsLb::LrsLb(RefCountedPtr<XdsClient> xds_client, Args args) |
||||
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||
gpr_log(GPR_INFO, "[lrs_lb %p] created -- using xds client %p from channel", |
||||
this, xds_client_.get()); |
||||
} |
||||
} |
||||
|
||||
LrsLb::~LrsLb() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||
gpr_log(GPR_INFO, "[lrs_lb %p] destroying xds LB policy", this); |
||||
} |
||||
} |
||||
|
||||
void LrsLb::ShutdownLocked() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||
gpr_log(GPR_INFO, "[lrs_lb %p] shutting down", this); |
||||
} |
||||
shutting_down_ = true; |
||||
// Remove the child policy's interested_parties pollset_set from the
|
||||
// xDS policy.
|
||||
if (child_policy_ != nullptr) { |
||||
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), |
||||
interested_parties()); |
||||
child_policy_.reset(); |
||||
} |
||||
// Drop our ref to the child's picker, in case it's holding a ref to
|
||||
// the child.
|
||||
picker_.reset(); |
||||
locality_stats_.reset(); |
||||
xds_client_.reset(); |
||||
} |
||||
|
||||
void LrsLb::ExitIdleLocked() { |
||||
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); |
||||
} |
||||
|
||||
void LrsLb::ResetBackoffLocked() { |
||||
// The XdsClient will have its backoff reset by the xds resolver, so we
|
||||
// don't need to do it here.
|
||||
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); |
||||
} |
||||
|
||||
void LrsLb::UpdateLocked(UpdateArgs args) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||
gpr_log(GPR_INFO, "[lrs_lb %p] Received update", this); |
||||
} |
||||
// Update config.
|
||||
auto old_config = std::move(config_); |
||||
config_ = std::move(args.config); |
||||
// Update load reporting if needed.
|
||||
if (old_config == nullptr || |
||||
config_->lrs_load_reporting_server_name() != |
||||
old_config->lrs_load_reporting_server_name() || |
||||
config_->cluster_name() != old_config->cluster_name() || |
||||
config_->eds_service_name() != old_config->eds_service_name() || |
||||
*config_->locality_name() != *old_config->locality_name()) { |
||||
locality_stats_ = xds_client_->AddClusterLocalityStats( |
||||
config_->lrs_load_reporting_server_name(), config_->cluster_name(), |
||||
config_->eds_service_name(), config_->locality_name()); |
||||
MaybeUpdatePickerLocked(); |
||||
} |
||||
// Update child policy.
|
||||
UpdateChildPolicyLocked(std::move(args.addresses), args.args); |
||||
args.args = nullptr; // Ownership passed to UpdateChildPolicyLocked().
|
||||
} |
||||
|
||||
void LrsLb::MaybeUpdatePickerLocked() { |
||||
if (picker_ != nullptr) { |
||||
auto lrs_picker = |
||||
absl::make_unique<LoadReportingPicker>(picker_, locality_stats_); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||
gpr_log(GPR_INFO, "[lrs_lb %p] updating connectivity: state=%s picker=%p", |
||||
this, ConnectivityStateName(state_), lrs_picker.get()); |
||||
} |
||||
channel_control_helper()->UpdateState(state_, std::move(lrs_picker)); |
||||
} |
||||
} |
||||
|
||||
OrphanablePtr<LoadBalancingPolicy> LrsLb::CreateChildPolicyLocked( |
||||
const grpc_channel_args* args) { |
||||
LoadBalancingPolicy::Args lb_policy_args; |
||||
lb_policy_args.work_serializer = work_serializer(); |
||||
lb_policy_args.args = args; |
||||
lb_policy_args.channel_control_helper = |
||||
absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper")); |
||||
OrphanablePtr<LoadBalancingPolicy> lb_policy = |
||||
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
||||
&grpc_lb_lrs_trace); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||
gpr_log(GPR_INFO, "[lrs_lb %p] Created new child policy handler %p", this, |
||||
lb_policy.get()); |
||||
} |
||||
// Add our interested_parties pollset_set to that of the newly created
|
||||
// child policy. This will make the child policy progress upon activity on
|
||||
// this policy, which in turn is tied to the application's call.
|
||||
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
||||
interested_parties()); |
||||
return lb_policy; |
||||
} |
||||
|
||||
void LrsLb::UpdateChildPolicyLocked(ServerAddressList addresses, |
||||
const grpc_channel_args* args) { |
||||
// Create policy if needed.
|
||||
if (child_policy_ == nullptr) { |
||||
child_policy_ = CreateChildPolicyLocked(args); |
||||
} |
||||
// Construct update args.
|
||||
UpdateArgs update_args; |
||||
update_args.addresses = std::move(addresses); |
||||
update_args.config = config_->child_policy(); |
||||
update_args.args = args; |
||||
// Update the policy.
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||
gpr_log(GPR_INFO, "[lrs_lb %p] Updating child policy handler %p", this, |
||||
child_policy_.get()); |
||||
} |
||||
child_policy_->UpdateLocked(std::move(update_args)); |
||||
} |
||||
|
||||
//
|
||||
// LrsLb::Helper
|
||||
//
|
||||
|
||||
RefCountedPtr<SubchannelInterface> LrsLb::Helper::CreateSubchannel( |
||||
const grpc_channel_args& args) { |
||||
if (lrs_policy_->shutting_down_) return nullptr; |
||||
return lrs_policy_->channel_control_helper()->CreateSubchannel(args); |
||||
} |
||||
|
||||
void LrsLb::Helper::UpdateState(grpc_connectivity_state state, |
||||
std::unique_ptr<SubchannelPicker> picker) { |
||||
if (lrs_policy_->shutting_down_) return; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"[lrs_lb %p] child connectivity state update: state=%s picker=%p", |
||||
lrs_policy_.get(), ConnectivityStateName(state), picker.get()); |
||||
} |
||||
// Save the state and picker.
|
||||
lrs_policy_->state_ = state; |
||||
lrs_policy_->picker_ = MakeRefCounted<RefCountedPicker>(std::move(picker)); |
||||
// Wrap the picker and return it to the channel.
|
||||
lrs_policy_->MaybeUpdatePickerLocked(); |
||||
} |
||||
|
||||
void LrsLb::Helper::RequestReresolution() { |
||||
if (lrs_policy_->shutting_down_) return; |
||||
lrs_policy_->channel_control_helper()->RequestReresolution(); |
||||
} |
||||
|
||||
void LrsLb::Helper::AddTraceEvent(TraceSeverity severity, StringView message) { |
||||
if (lrs_policy_->shutting_down_) return; |
||||
lrs_policy_->channel_control_helper()->AddTraceEvent(severity, message); |
||||
} |
||||
|
||||
//
|
||||
// factory
|
||||
//
|
||||
|
||||
class LrsLbFactory : public LoadBalancingPolicyFactory { |
||||
public: |
||||
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
||||
LoadBalancingPolicy::Args args) const override { |
||||
RefCountedPtr<XdsClient> xds_client = |
||||
XdsClient::GetFromChannelArgs(*args.args); |
||||
if (xds_client == nullptr) { |
||||
gpr_log(GPR_ERROR, |
||||
"XdsClient not present in channel args -- cannot instantiate " |
||||
"lrs LB policy"); |
||||
return nullptr; |
||||
} |
||||
return MakeOrphanable<LrsLb>(std::move(xds_client), std::move(args)); |
||||
} |
||||
|
||||
const char* name() const override { return kLrs; } |
||||
|
||||
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
||||
const Json& json, grpc_error** error) const override { |
||||
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); |
||||
if (json.type() == Json::Type::JSON_NULL) { |
||||
// lrs was mentioned as a policy in the deprecated loadBalancingPolicy
|
||||
// field or in the client API.
|
||||
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:loadBalancingPolicy error:lrs policy requires configuration. " |
||||
"Please use loadBalancingConfig field of service config instead."); |
||||
return nullptr; |
||||
} |
||||
std::vector<grpc_error*> error_list; |
||||
// Child policy.
|
||||
RefCountedPtr<LoadBalancingPolicy::Config> child_policy; |
||||
auto it = json.object_value().find("childPolicy"); |
||||
if (it == json.object_value().end()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:childPolicy error:required field missing")); |
||||
} else { |
||||
grpc_error* parse_error = GRPC_ERROR_NONE; |
||||
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( |
||||
it->second, &parse_error); |
||||
if (child_policy == nullptr) { |
||||
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); |
||||
std::vector<grpc_error*> child_errors; |
||||
child_errors.push_back(parse_error); |
||||
error_list.push_back( |
||||
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); |
||||
} |
||||
} |
||||
// Cluster name.
|
||||
std::string cluster_name; |
||||
it = json.object_value().find("clusterName"); |
||||
if (it == json.object_value().end()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:clusterName error:required field missing")); |
||||
} else if (it->second.type() != Json::Type::STRING) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:clusterName error:type should be string")); |
||||
} else { |
||||
cluster_name = it->second.string_value(); |
||||
} |
||||
// EDS service name.
|
||||
std::string eds_service_name; |
||||
it = json.object_value().find("edsServiceName"); |
||||
if (it != json.object_value().end()) { |
||||
if (it->second.type() != Json::Type::STRING) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:edsServiceName error:type should be string")); |
||||
} else { |
||||
eds_service_name = it->second.string_value(); |
||||
} |
||||
} |
||||
// Locality.
|
||||
RefCountedPtr<XdsLocalityName> locality_name; |
||||
it = json.object_value().find("locality"); |
||||
if (it == json.object_value().end()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:locality error:required field missing")); |
||||
} else { |
||||
std::vector<grpc_error*> child_errors = |
||||
ParseLocality(it->second, &locality_name); |
||||
if (!child_errors.empty()) { |
||||
error_list.push_back( |
||||
GRPC_ERROR_CREATE_FROM_VECTOR("field:locality", &child_errors)); |
||||
} |
||||
} |
||||
// LRS load reporting server name.
|
||||
std::string lrs_load_reporting_server_name; |
||||
it = json.object_value().find("lrsLoadReportingServerName"); |
||||
if (it == json.object_value().end()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:lrsLoadReportingServerName error:required field missing")); |
||||
} else if (it->second.type() != Json::Type::STRING) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"field:lrsLoadReportingServerName error:type should be string")); |
||||
} else { |
||||
lrs_load_reporting_server_name = it->second.string_value(); |
||||
} |
||||
if (!error_list.empty()) { |
||||
*error = GRPC_ERROR_CREATE_FROM_VECTOR( |
||||
"lrs_experimental LB policy config", &error_list); |
||||
return nullptr; |
||||
} |
||||
return MakeRefCounted<LrsLbConfig>( |
||||
std::move(child_policy), std::move(cluster_name), |
||||
std::move(eds_service_name), std::move(lrs_load_reporting_server_name), |
||||
std::move(locality_name)); |
||||
} |
||||
|
||||
private: |
||||
static std::vector<grpc_error*> ParseLocality( |
||||
const Json& json, RefCountedPtr<XdsLocalityName>* name) { |
||||
std::vector<grpc_error*> error_list; |
||||
if (json.type() != Json::Type::OBJECT) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"locality field is not an object")); |
||||
return error_list; |
||||
} |
||||
std::string region; |
||||
auto it = json.object_value().find("region"); |
||||
if (it != json.object_value().end()) { |
||||
if (it->second.type() != Json::Type::STRING) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"\"region\" field is not a string")); |
||||
} else { |
||||
region = it->second.string_value(); |
||||
} |
||||
} |
||||
std::string zone; |
||||
it = json.object_value().find("zone"); |
||||
if (it != json.object_value().end()) { |
||||
if (it->second.type() != Json::Type::STRING) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"\"zone\" field is not a string")); |
||||
} else { |
||||
zone = it->second.string_value(); |
||||
} |
||||
} |
||||
std::string subzone; |
||||
it = json.object_value().find("subzone"); |
||||
if (it != json.object_value().end()) { |
||||
if (it->second.type() != Json::Type::STRING) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"\"subzone\" field is not a string")); |
||||
} else { |
||||
subzone = it->second.string_value(); |
||||
} |
||||
} |
||||
if (region.empty() && zone.empty() && subzone.empty()) { |
||||
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"at least one of region, zone, or subzone must be set")); |
||||
} |
||||
if (error_list.empty()) { |
||||
*name = MakeRefCounted<XdsLocalityName>(region, zone, subzone); |
||||
} |
||||
return error_list; |
||||
} |
||||
}; |
||||
|
||||
} // namespace
|
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
//
|
||||
// Plugin registration
|
||||
//
|
||||
|
||||
void grpc_lb_policy_lrs_init() { |
||||
grpc_core::LoadBalancingPolicyRegistry::Builder:: |
||||
RegisterLoadBalancingPolicyFactory( |
||||
absl::make_unique<grpc_core::LrsLbFactory>()); |
||||
} |
||||
|
||||
void grpc_lb_policy_lrs_shutdown() {} |
File diff suppressed because it is too large
Load Diff
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue