commit
d7c916be49
140 changed files with 5649 additions and 3985 deletions
@ -0,0 +1,83 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2020 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" |
||||||
|
|
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
|
||||||
|
#define GRPC_ARG_HIERARCHICAL_PATH "grpc.internal.address.hierarchical_path" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
void* HierarchicalPathCopy(void* p) { |
||||||
|
std::vector<std::string>* path = static_cast<std::vector<std::string>*>(p); |
||||||
|
return static_cast<void*>(new std::vector<std::string>(*path)); |
||||||
|
} |
||||||
|
|
||||||
|
void HierarchicalPathDestroy(void* p) { |
||||||
|
std::vector<std::string>* path = static_cast<std::vector<std::string>*>(p); |
||||||
|
delete path; |
||||||
|
} |
||||||
|
|
||||||
|
int HierarchicalPathCompare(void* p1, void* p2) { |
||||||
|
std::vector<std::string>* path1 = static_cast<std::vector<std::string>*>(p1); |
||||||
|
std::vector<std::string>* path2 = static_cast<std::vector<std::string>*>(p2); |
||||||
|
for (size_t i = 0; i < path1->size(); ++i) { |
||||||
|
if (path2->size() == i) return 1; |
||||||
|
int r = (*path1)[i].compare((*path2)[i]); |
||||||
|
if (r != 0) return r; |
||||||
|
} |
||||||
|
if (path2->size() > path1->size()) return -1; |
||||||
|
return 0; |
||||||
|
} |
||||||
|
|
||||||
|
const grpc_arg_pointer_vtable hierarchical_path_arg_vtable = { |
||||||
|
HierarchicalPathCopy, HierarchicalPathDestroy, HierarchicalPathCompare}; |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
grpc_arg MakeHierarchicalPathArg(const std::vector<std::string>& path) { |
||||||
|
return grpc_channel_arg_pointer_create( |
||||||
|
const_cast<char*>(GRPC_ARG_HIERARCHICAL_PATH), |
||||||
|
const_cast<std::vector<std::string>*>(&path), |
||||||
|
&hierarchical_path_arg_vtable); |
||||||
|
} |
||||||
|
|
||||||
|
HierarchicalAddressMap MakeHierarchicalAddressMap( |
||||||
|
const ServerAddressList& addresses) { |
||||||
|
HierarchicalAddressMap result; |
||||||
|
for (const ServerAddress& address : addresses) { |
||||||
|
auto* path = grpc_channel_args_find_pointer<std::vector<std::string>>( |
||||||
|
address.args(), GRPC_ARG_HIERARCHICAL_PATH); |
||||||
|
if (path == nullptr || path->empty()) continue; |
||||||
|
auto it = path->begin(); |
||||||
|
ServerAddressList& target_list = result[*it]; |
||||||
|
++it; |
||||||
|
std::vector<std::string> remaining_path(it, path->end()); |
||||||
|
const char* name_to_remove = GRPC_ARG_HIERARCHICAL_PATH; |
||||||
|
grpc_arg new_arg = MakeHierarchicalPathArg(remaining_path); |
||||||
|
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( |
||||||
|
address.args(), &name_to_remove, 1, &new_arg, 1); |
||||||
|
target_list.emplace_back(address.address(), new_args); |
||||||
|
} |
||||||
|
return result; |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace grpc_core
|
@ -0,0 +1,99 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2020 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ADDRESS_FILTERING_H |
||||||
|
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ADDRESS_FILTERING_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <map> |
||||||
|
#include <string> |
||||||
|
#include <vector> |
||||||
|
|
||||||
|
#include "absl/strings/string_view.h" |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/server_address.h" |
||||||
|
|
||||||
|
// The resolver returns a flat list of addresses. When a hierarchy of
|
||||||
|
// LB policies is in use, each leaf of the hierarchy will need a
|
||||||
|
// different subset of those addresses. This library provides a
|
||||||
|
// mechanism for determining which address is passed to which leaf
|
||||||
|
// policy.
|
||||||
|
//
|
||||||
|
// Each address will have an associated path that indicates which child
|
||||||
|
// it should be sent to at each level of the hierarchy to wind up at the
|
||||||
|
// right leaf policy. Each LB policy will look at the first element of
|
||||||
|
// the path of each address to determine which child to send the address
|
||||||
|
// to. It will then remove that first element when passing the address
|
||||||
|
// down to its child.
|
||||||
|
//
|
||||||
|
// For example, consider the following LB policy hierarchy:
|
||||||
|
//
|
||||||
|
// - priority
|
||||||
|
// - child0 (weighted_target)
|
||||||
|
// - localityA (round_robin)
|
||||||
|
// - localityB (round_robin)
|
||||||
|
// - child1 (weighted_target)
|
||||||
|
// - localityC (round_robin)
|
||||||
|
// - localityD (round_robin)
|
||||||
|
//
|
||||||
|
// Now consider the following addresses:
|
||||||
|
// - 10.0.0.1:80 path=["child0", "localityA"]
|
||||||
|
// - 10.0.0.2:80 path=["child0", "localityB"]
|
||||||
|
// - 10.0.0.3:80 path=["child1", "localityC"]
|
||||||
|
// - 10.0.0.4:80 path=["child1", "localityD"]
|
||||||
|
//
|
||||||
|
// The priority policy will split this up into two lists, one for each
|
||||||
|
// of its children:
|
||||||
|
// - child0:
|
||||||
|
// - 10.0.0.1:80 path=["localityA"]
|
||||||
|
// - 10.0.0.2:80 path=["localityB"]
|
||||||
|
// - child1:
|
||||||
|
// - 10.0.0.3:80 path=["localityC"]
|
||||||
|
// - 10.0.0.4:80 path=["localityD"]
|
||||||
|
//
|
||||||
|
// The weighted_target policy for child0 will split its list up into two
|
||||||
|
// lists, one for each of its children:
|
||||||
|
// - localityA:
|
||||||
|
// - 10.0.0.1:80 path=[]
|
||||||
|
// - localityB:
|
||||||
|
// - 10.0.0.2:80 path=[]
|
||||||
|
//
|
||||||
|
// Similarly, the weighted_target policy for child1 will split its list
|
||||||
|
// up into two lists, one for each of its children:
|
||||||
|
// - localityC:
|
||||||
|
// - 10.0.0.3:80 path=[]
|
||||||
|
// - localityD:
|
||||||
|
// - 10.0.0.4:80 path=[]
|
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
// Constructs a channel arg containing the hierarchical path
|
||||||
|
// to be associated with an address.
|
||||||
|
grpc_arg MakeHierarchicalPathArg(const std::vector<std::string>& path); |
||||||
|
|
||||||
|
// A map from the next path element to the addresses that fall under
|
||||||
|
// that path element.
|
||||||
|
using HierarchicalAddressMap = std::map<std::string, ServerAddressList>; |
||||||
|
|
||||||
|
// Splits up the addresses into a separate list for each child.
|
||||||
|
HierarchicalAddressMap MakeHierarchicalAddressMap( |
||||||
|
const ServerAddressList& addresses); |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ADDRESS_FILTERING_H \ |
||||||
|
*/ |
@ -0,0 +1,89 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2019 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h" |
||||||
|
|
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/gpr/useful.h" |
||||||
|
|
||||||
|
// Channel arg key for the list of balancer addresses.
|
||||||
|
#define GRPC_ARG_GRPCLB_BALANCER_ADDRESSES "grpc.grpclb_balancer_addresses" |
||||||
|
// Channel arg key for a string indicating an address's balancer name.
|
||||||
|
#define GRPC_ARG_ADDRESS_BALANCER_NAME "grpc.address_balancer_name" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
void* BalancerAddressesArgCopy(void* p) { |
||||||
|
ServerAddressList* address_list = static_cast<ServerAddressList*>(p); |
||||||
|
return new ServerAddressList(*address_list); |
||||||
|
} |
||||||
|
|
||||||
|
void BalancerAddressesArgDestroy(void* p) { |
||||||
|
ServerAddressList* address_list = static_cast<ServerAddressList*>(p); |
||||||
|
delete address_list; |
||||||
|
} |
||||||
|
|
||||||
|
int BalancerAddressesArgCmp(void* p, void* q) { |
||||||
|
ServerAddressList* address_list1 = static_cast<ServerAddressList*>(p); |
||||||
|
ServerAddressList* address_list2 = static_cast<ServerAddressList*>(q); |
||||||
|
if (address_list1 == nullptr || address_list2 == nullptr) { |
||||||
|
return GPR_ICMP(address_list1, address_list2); |
||||||
|
} |
||||||
|
if (address_list1->size() > address_list2->size()) return 1; |
||||||
|
if (address_list1->size() < address_list2->size()) return -1; |
||||||
|
for (size_t i = 0; i < address_list1->size(); ++i) { |
||||||
|
int retval = (*address_list1)[i].Cmp((*address_list2)[i]); |
||||||
|
if (retval != 0) return retval; |
||||||
|
} |
||||||
|
return 0; |
||||||
|
} |
||||||
|
|
||||||
|
const grpc_arg_pointer_vtable kBalancerAddressesArgVtable = { |
||||||
|
BalancerAddressesArgCopy, BalancerAddressesArgDestroy, |
||||||
|
BalancerAddressesArgCmp}; |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
grpc_arg CreateGrpclbBalancerAddressesArg( |
||||||
|
const ServerAddressList* address_list) { |
||||||
|
return grpc_channel_arg_pointer_create( |
||||||
|
const_cast<char*>(GRPC_ARG_GRPCLB_BALANCER_ADDRESSES), |
||||||
|
const_cast<ServerAddressList*>(address_list), |
||||||
|
&kBalancerAddressesArgVtable); |
||||||
|
} |
||||||
|
|
||||||
|
const ServerAddressList* FindGrpclbBalancerAddressesInChannelArgs( |
||||||
|
const grpc_channel_args& args) { |
||||||
|
return grpc_channel_args_find_pointer<const ServerAddressList>( |
||||||
|
&args, const_cast<char*>(GRPC_ARG_GRPCLB_BALANCER_ADDRESSES)); |
||||||
|
} |
||||||
|
|
||||||
|
grpc_arg CreateGrpclbBalancerNameArg(const char* balancer_name) { |
||||||
|
return grpc_channel_arg_string_create( |
||||||
|
const_cast<char*>(GRPC_ARG_ADDRESS_BALANCER_NAME), |
||||||
|
const_cast<char*>(balancer_name)); |
||||||
|
} |
||||||
|
|
||||||
|
const char* FindGrpclbBalancerNameInChannelArgs(const grpc_channel_args& args) { |
||||||
|
return grpc_channel_args_find_string( |
||||||
|
&args, const_cast<char*>(GRPC_ARG_ADDRESS_BALANCER_NAME)); |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace grpc_core
|
@ -0,0 +1,40 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2019 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_BALANCER_ADDRESSES_H |
||||||
|
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_BALANCER_ADDRESSES_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <grpc/impl/codegen/grpc_types.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/server_address.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
grpc_arg CreateGrpclbBalancerAddressesArg( |
||||||
|
const ServerAddressList* address_list); |
||||||
|
const ServerAddressList* FindGrpclbBalancerAddressesInChannelArgs( |
||||||
|
const grpc_channel_args& args); |
||||||
|
|
||||||
|
grpc_arg CreateGrpclbBalancerNameArg(const char* balancer_name); |
||||||
|
const char* FindGrpclbBalancerNameInChannelArgs(const grpc_channel_args& args); |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif /* \ |
||||||
|
GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_BALANCER_ADDRESSES_H \
|
||||||
|
*/ |
@ -0,0 +1,875 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2018 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <inttypes.h> |
||||||
|
#include <limits.h> |
||||||
|
|
||||||
|
#include "absl/strings/str_cat.h" |
||||||
|
|
||||||
|
#include <grpc/grpc.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/gprpp/orphanable.h" |
||||||
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||||
|
#include "src/core/lib/iomgr/combiner.h" |
||||||
|
#include "src/core/lib/iomgr/timer.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
TraceFlag grpc_lb_priority_trace(false, "priority_lb"); |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
constexpr char kPriority[] = "priority_experimental"; |
||||||
|
|
||||||
|
// How long we keep a child around for after it is no longer being used
|
||||||
|
// (either because it has been removed from the config or because we
|
||||||
|
// have switched to a higher-priority child).
|
||||||
|
constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000; |
||||||
|
|
||||||
|
// Default for how long we wait for a newly created child to get connected
|
||||||
|
// before starting to attempt the next priority. Overridable via channel arg.
|
||||||
|
constexpr int kDefaultChildFailoverTimeoutMs = 10000; |
||||||
|
|
||||||
|
// Config for priority LB policy.
|
||||||
|
class PriorityLbConfig : public LoadBalancingPolicy::Config { |
||||||
|
public: |
||||||
|
PriorityLbConfig( |
||||||
|
std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>> |
||||||
|
children, |
||||||
|
std::vector<std::string> priorities) |
||||||
|
: children_(std::move(children)), priorities_(std::move(priorities)) {} |
||||||
|
|
||||||
|
const char* name() const override { return kPriority; } |
||||||
|
|
||||||
|
const std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>& |
||||||
|
children() const { |
||||||
|
return children_; |
||||||
|
} |
||||||
|
const std::vector<std::string>& priorities() const { return priorities_; } |
||||||
|
|
||||||
|
private: |
||||||
|
const std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>> |
||||||
|
children_; |
||||||
|
const std::vector<std::string> priorities_; |
||||||
|
}; |
||||||
|
|
||||||
|
// priority LB policy.
|
||||||
|
class PriorityLb : public LoadBalancingPolicy { |
||||||
|
public: |
||||||
|
explicit PriorityLb(Args args); |
||||||
|
|
||||||
|
const char* name() const override { return kPriority; } |
||||||
|
|
||||||
|
void UpdateLocked(UpdateArgs args) override; |
||||||
|
void ExitIdleLocked() override; |
||||||
|
void ResetBackoffLocked() override; |
||||||
|
|
||||||
|
private: |
||||||
|
// Each ChildPriority holds a ref to the PriorityLb.
|
||||||
|
class ChildPriority : public InternallyRefCounted<ChildPriority> { |
||||||
|
public: |
||||||
|
ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name); |
||||||
|
|
||||||
|
~ChildPriority() { |
||||||
|
priority_policy_.reset(DEBUG_LOCATION, "ChildPriority"); |
||||||
|
} |
||||||
|
|
||||||
|
const std::string& name() const { return name_; } |
||||||
|
|
||||||
|
void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config); |
||||||
|
void ExitIdleLocked(); |
||||||
|
void ResetBackoffLocked(); |
||||||
|
void DeactivateLocked(); |
||||||
|
void MaybeReactivateLocked(); |
||||||
|
void MaybeCancelFailoverTimerLocked(); |
||||||
|
|
||||||
|
void Orphan() override; |
||||||
|
|
||||||
|
std::unique_ptr<SubchannelPicker> GetPicker() { |
||||||
|
return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_); |
||||||
|
} |
||||||
|
|
||||||
|
grpc_connectivity_state connectivity_state() const { |
||||||
|
return connectivity_state_; |
||||||
|
} |
||||||
|
bool failover_timer_callback_pending() const { |
||||||
|
return failover_timer_callback_pending_; |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
// A simple wrapper for ref-counting a picker from the child policy.
|
||||||
|
class RefCountedPicker : public RefCounted<RefCountedPicker> { |
||||||
|
public: |
||||||
|
explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker) |
||||||
|
: picker_(std::move(picker)) {} |
||||||
|
PickResult Pick(PickArgs args) { return picker_->Pick(args); } |
||||||
|
|
||||||
|
private: |
||||||
|
std::unique_ptr<SubchannelPicker> picker_; |
||||||
|
}; |
||||||
|
|
||||||
|
// A non-ref-counted wrapper for RefCountedPicker.
|
||||||
|
class RefCountedPickerWrapper : public SubchannelPicker { |
||||||
|
public: |
||||||
|
explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker) |
||||||
|
: picker_(std::move(picker)) {} |
||||||
|
PickResult Pick(PickArgs args) override { return picker_->Pick(args); } |
||||||
|
|
||||||
|
private: |
||||||
|
RefCountedPtr<RefCountedPicker> picker_; |
||||||
|
}; |
||||||
|
|
||||||
|
class Helper : public ChannelControlHelper { |
||||||
|
public: |
||||||
|
explicit Helper(RefCountedPtr<ChildPriority> priority) |
||||||
|
: priority_(std::move(priority)) {} |
||||||
|
|
||||||
|
~Helper() { priority_.reset(DEBUG_LOCATION, "Helper"); } |
||||||
|
|
||||||
|
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
||||||
|
const grpc_channel_args& args) override; |
||||||
|
void UpdateState(grpc_connectivity_state state, |
||||||
|
std::unique_ptr<SubchannelPicker> picker) override; |
||||||
|
void RequestReresolution() override; |
||||||
|
void AddTraceEvent(TraceSeverity severity, StringView message) override; |
||||||
|
|
||||||
|
private: |
||||||
|
RefCountedPtr<ChildPriority> priority_; |
||||||
|
}; |
||||||
|
|
||||||
|
// Methods for dealing with the child policy.
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
||||||
|
const grpc_channel_args* args); |
||||||
|
|
||||||
|
void OnConnectivityStateUpdateLocked( |
||||||
|
grpc_connectivity_state state, |
||||||
|
std::unique_ptr<SubchannelPicker> picker); |
||||||
|
|
||||||
|
void StartFailoverTimerLocked(); |
||||||
|
|
||||||
|
static void OnFailoverTimer(void* arg, grpc_error* error); |
||||||
|
static void OnFailoverTimerLocked(void* arg, grpc_error* error); |
||||||
|
static void OnDeactivationTimer(void* arg, grpc_error* error); |
||||||
|
static void OnDeactivationTimerLocked(void* arg, grpc_error* error); |
||||||
|
|
||||||
|
RefCountedPtr<PriorityLb> priority_policy_; |
||||||
|
const std::string name_; |
||||||
|
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
||||||
|
|
||||||
|
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING; |
||||||
|
RefCountedPtr<RefCountedPicker> picker_wrapper_; |
||||||
|
|
||||||
|
// States for delayed removal.
|
||||||
|
grpc_timer deactivation_timer_; |
||||||
|
grpc_closure on_deactivation_timer_; |
||||||
|
bool deactivation_timer_callback_pending_ = false; |
||||||
|
|
||||||
|
// States of failover.
|
||||||
|
grpc_timer failover_timer_; |
||||||
|
grpc_closure on_failover_timer_; |
||||||
|
grpc_closure on_failover_timer_locked_; |
||||||
|
bool failover_timer_callback_pending_ = false; |
||||||
|
}; |
||||||
|
|
||||||
|
~PriorityLb(); |
||||||
|
|
||||||
|
void ShutdownLocked() override; |
||||||
|
|
||||||
|
// Returns UINT32_MAX if child is not in current priority list.
|
||||||
|
uint32_t GetChildPriorityLocked(const std::string& child_name) const; |
||||||
|
|
||||||
|
void HandleChildConnectivityStateChangeLocked(ChildPriority* child); |
||||||
|
void DeleteChild(ChildPriority* child); |
||||||
|
|
||||||
|
void TryNextPriorityLocked(bool report_connecting); |
||||||
|
void SelectPriorityLocked(uint32_t priority); |
||||||
|
|
||||||
|
const int child_failover_timeout_ms_; |
||||||
|
|
||||||
|
// Current channel args and config from the resolver.
|
||||||
|
const grpc_channel_args* args_ = nullptr; |
||||||
|
RefCountedPtr<PriorityLbConfig> config_; |
||||||
|
HierarchicalAddressMap addresses_; |
||||||
|
|
||||||
|
// Internal state.
|
||||||
|
bool shutting_down_ = false; |
||||||
|
|
||||||
|
std::map<std::string, OrphanablePtr<ChildPriority>> children_; |
||||||
|
// The priority that is being used.
|
||||||
|
uint32_t current_priority_ = UINT32_MAX; |
||||||
|
// Points to the current child from before the most recent update.
|
||||||
|
// We will continue to use this child until we decide which of the new
|
||||||
|
// children to use.
|
||||||
|
ChildPriority* current_child_from_before_update_ = nullptr; |
||||||
|
}; |
||||||
|
|
||||||
|
//
|
||||||
|
// PriorityLb
|
||||||
|
//
|
||||||
|
|
||||||
|
PriorityLb::PriorityLb(Args args) |
||||||
|
: LoadBalancingPolicy(std::move(args)), |
||||||
|
child_failover_timeout_ms_(grpc_channel_args_find_integer( |
||||||
|
args.args, GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS, |
||||||
|
{kDefaultChildFailoverTimeoutMs, 0, INT_MAX})) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[priority_lb %p] created", this); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
PriorityLb::~PriorityLb() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this); |
||||||
|
} |
||||||
|
grpc_channel_args_destroy(args_); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ShutdownLocked() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this); |
||||||
|
} |
||||||
|
shutting_down_ = true; |
||||||
|
children_.clear(); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ExitIdleLocked() { |
||||||
|
if (current_priority_ != UINT32_MAX) { |
||||||
|
const std::string& child_name = config_->priorities()[current_priority_]; |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] exiting IDLE for current priority %d child %s", |
||||||
|
this, current_priority_, child_name.c_str()); |
||||||
|
} |
||||||
|
children_[child_name]->ExitIdleLocked(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ResetBackoffLocked() { |
||||||
|
for (const auto& p : children_) p.second->ResetBackoffLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::UpdateLocked(UpdateArgs args) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[priority_lb %p] received update", this); |
||||||
|
} |
||||||
|
// Save current child.
|
||||||
|
if (current_priority_ != UINT32_MAX) { |
||||||
|
const std::string& child_name = config_->priorities()[current_priority_]; |
||||||
|
current_child_from_before_update_ = children_[child_name].get(); |
||||||
|
// Unset current_priority_, since it was an index into the old
|
||||||
|
// config's priority list and may no longer be valid. It will be
|
||||||
|
// reset later by TryNextPriorityLocked(), but we unset it here in
|
||||||
|
// case updating any of our children triggers a state update.
|
||||||
|
current_priority_ = UINT32_MAX; |
||||||
|
} |
||||||
|
// Update config.
|
||||||
|
config_ = std::move(args.config); |
||||||
|
// Update args.
|
||||||
|
grpc_channel_args_destroy(args_); |
||||||
|
args_ = args.args; |
||||||
|
args.args = nullptr; |
||||||
|
// Update addresses.
|
||||||
|
addresses_ = MakeHierarchicalAddressMap(args.addresses); |
||||||
|
// Check all existing children against the new config.
|
||||||
|
for (const auto& p : children_) { |
||||||
|
const std::string& child_name = p.first; |
||||||
|
auto& child = p.second; |
||||||
|
auto config_it = config_->children().find(child_name); |
||||||
|
if (config_it == config_->children().end()) { |
||||||
|
// Existing child not found in new config. Deactivate it.
|
||||||
|
child->DeactivateLocked(); |
||||||
|
} else { |
||||||
|
// Existing child found in new config. Update it.
|
||||||
|
child->UpdateLocked(config_it->second); |
||||||
|
} |
||||||
|
} |
||||||
|
// Try to get connected.
|
||||||
|
TryNextPriorityLocked(/*report_connecting=*/children_.empty()); |
||||||
|
} |
||||||
|
|
||||||
|
uint32_t PriorityLb::GetChildPriorityLocked( |
||||||
|
const std::string& child_name) const { |
||||||
|
for (uint32_t priority = 0; priority < config_->priorities().size(); |
||||||
|
++priority) { |
||||||
|
if (config_->priorities()[priority] == child_name) return priority; |
||||||
|
} |
||||||
|
return UINT32_MAX; |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::HandleChildConnectivityStateChangeLocked( |
||||||
|
ChildPriority* child) { |
||||||
|
// Special case for the child that was the current child before the
|
||||||
|
// most recent update.
|
||||||
|
if (child == current_child_from_before_update_) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] state update for current child from before " |
||||||
|
"config update", |
||||||
|
this); |
||||||
|
} |
||||||
|
if (child->connectivity_state() == GRPC_CHANNEL_READY || |
||||||
|
child->connectivity_state() == GRPC_CHANNEL_IDLE) { |
||||||
|
// If it's still READY or IDLE, we stick with this child, so pass
|
||||||
|
// the new picker up to our parent.
|
||||||
|
channel_control_helper()->UpdateState(child->connectivity_state(), |
||||||
|
child->GetPicker()); |
||||||
|
} else { |
||||||
|
// If it's no longer READY or IDLE, we should stop using it.
|
||||||
|
// We already started trying other priorities as a result of the
|
||||||
|
// update, but calling TryNextPriorityLocked() ensures that we will
|
||||||
|
// properly select between CONNECTING and TRANSIENT_FAILURE as the
|
||||||
|
// new state to report to our parent.
|
||||||
|
current_child_from_before_update_ = nullptr; |
||||||
|
TryNextPriorityLocked(/*report_connecting=*/true); |
||||||
|
} |
||||||
|
return; |
||||||
|
} |
||||||
|
// Otherwise, find the child's priority.
|
||||||
|
uint32_t child_priority = GetChildPriorityLocked(child->name()); |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[priority_lb %p] state update for priority %d, child %s", |
||||||
|
this, child_priority, child->name().c_str()); |
||||||
|
} |
||||||
|
// Ignore priorities not in the current config.
|
||||||
|
if (child_priority == UINT32_MAX) return; |
||||||
|
// Ignore lower-than-current priorities.
|
||||||
|
if (child_priority > current_priority_) return; |
||||||
|
// If a child reports TRANSIENT_FAILURE, start trying the next priority.
|
||||||
|
// Note that even if this is for a higher-than-current priority, we
|
||||||
|
// may still need to create some children between this priority and
|
||||||
|
// the current one (e.g., if we got an update that inserted new
|
||||||
|
// priorities ahead of the current one).
|
||||||
|
if (child->connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
||||||
|
TryNextPriorityLocked( |
||||||
|
/*report_connecting=*/child_priority == current_priority_); |
||||||
|
return; |
||||||
|
} |
||||||
|
// The update is for a higher-than-current priority (or for any
|
||||||
|
// priority if we don't have any current priority).
|
||||||
|
if (child_priority < current_priority_) { |
||||||
|
// If the child reports READY or IDLE, switch to that priority.
|
||||||
|
// Otherwise, ignore the update.
|
||||||
|
if (child->connectivity_state() == GRPC_CHANNEL_READY || |
||||||
|
child->connectivity_state() == GRPC_CHANNEL_IDLE) { |
||||||
|
SelectPriorityLocked(child_priority); |
||||||
|
} |
||||||
|
return; |
||||||
|
} |
||||||
|
// The current priority has returned a new picker, so pass it up to
|
||||||
|
// our parent.
|
||||||
|
channel_control_helper()->UpdateState(child->connectivity_state(), |
||||||
|
child->GetPicker()); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::DeleteChild(ChildPriority* child) { |
||||||
|
// If this was the current child from before the most recent update,
|
||||||
|
// stop using it. We already started trying other priorities as a
|
||||||
|
// result of the update, but calling TryNextPriorityLocked() ensures that
|
||||||
|
// we will properly select between CONNECTING and TRANSIENT_FAILURE as the
|
||||||
|
// new state to report to our parent.
|
||||||
|
if (current_child_from_before_update_ == child) { |
||||||
|
current_child_from_before_update_ = nullptr; |
||||||
|
TryNextPriorityLocked(/*report_connecting=*/true); |
||||||
|
} |
||||||
|
children_.erase(child->name()); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::TryNextPriorityLocked(bool report_connecting) { |
||||||
|
for (uint32_t priority = 0; priority < config_->priorities().size(); |
||||||
|
++priority) { |
||||||
|
// If the child for the priority does not exist yet, create it.
|
||||||
|
const std::string& child_name = config_->priorities()[priority]; |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[priority_lb %p] trying priority %d, child %s", this, |
||||||
|
priority, child_name.c_str()); |
||||||
|
} |
||||||
|
auto& child = children_[child_name]; |
||||||
|
if (child == nullptr) { |
||||||
|
if (report_connecting) { |
||||||
|
channel_control_helper()->UpdateState( |
||||||
|
GRPC_CHANNEL_CONNECTING, |
||||||
|
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"))); |
||||||
|
} |
||||||
|
child = MakeOrphanable<ChildPriority>( |
||||||
|
Ref(DEBUG_LOCATION, "ChildPriority"), child_name); |
||||||
|
child->UpdateLocked(config_->children().find(child_name)->second); |
||||||
|
return; |
||||||
|
} |
||||||
|
// The child already exists.
|
||||||
|
child->MaybeReactivateLocked(); |
||||||
|
// If the child is in state READY or IDLE, switch to it.
|
||||||
|
if (child->connectivity_state() == GRPC_CHANNEL_READY || |
||||||
|
child->connectivity_state() == GRPC_CHANNEL_IDLE) { |
||||||
|
SelectPriorityLocked(priority); |
||||||
|
return; |
||||||
|
} |
||||||
|
// Child is not READY or IDLE.
|
||||||
|
// If its failover timer is still pending, give it time to fire.
|
||||||
|
if (child->failover_timer_callback_pending()) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] priority %d, child %s: child still " |
||||||
|
"attempting to connect, will wait", |
||||||
|
this, priority, child_name.c_str()); |
||||||
|
} |
||||||
|
if (report_connecting) { |
||||||
|
channel_control_helper()->UpdateState( |
||||||
|
GRPC_CHANNEL_CONNECTING, |
||||||
|
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"))); |
||||||
|
} |
||||||
|
return; |
||||||
|
} |
||||||
|
// Child has been failing for a while. Move on to the next priority.
|
||||||
|
} |
||||||
|
// If there are no more priorities to try, report TRANSIENT_FAILURE.
|
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] no priority reachable, putting channel in " |
||||||
|
"TRANSIENT_FAILURE", |
||||||
|
this); |
||||||
|
} |
||||||
|
current_priority_ = UINT32_MAX; |
||||||
|
current_child_from_before_update_ = nullptr; |
||||||
|
grpc_error* error = grpc_error_set_int( |
||||||
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready priority"), |
||||||
|
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); |
||||||
|
channel_control_helper()->UpdateState( |
||||||
|
GRPC_CHANNEL_TRANSIENT_FAILURE, |
||||||
|
absl::make_unique<TransientFailurePicker>(error)); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::SelectPriorityLocked(uint32_t priority) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[priority_lb %p] selected priority %d, child %s", this, |
||||||
|
priority, config_->priorities()[priority].c_str()); |
||||||
|
} |
||||||
|
current_priority_ = priority; |
||||||
|
current_child_from_before_update_ = nullptr; |
||||||
|
// Deactivate lower priorities.
|
||||||
|
for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) { |
||||||
|
const std::string& child_name = config_->priorities()[p]; |
||||||
|
auto it = children_.find(child_name); |
||||||
|
if (it != children_.end()) it->second->DeactivateLocked(); |
||||||
|
} |
||||||
|
// Update picker.
|
||||||
|
auto& child = children_[config_->priorities()[priority]]; |
||||||
|
channel_control_helper()->UpdateState(child->connectivity_state(), |
||||||
|
child->GetPicker()); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// PriorityLb::ChildPriority
|
||||||
|
//
|
||||||
|
|
||||||
|
PriorityLb::ChildPriority::ChildPriority( |
||||||
|
RefCountedPtr<PriorityLb> priority_policy, std::string name) |
||||||
|
: priority_policy_(std::move(priority_policy)), name_(std::move(name)) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)", |
||||||
|
priority_policy_.get(), name_.c_str(), this); |
||||||
|
} |
||||||
|
GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
GRPC_CLOSURE_INIT(&on_failover_timer_locked_, OnFailoverTimerLocked, this, |
||||||
|
nullptr); |
||||||
|
// Start the failover timer.
|
||||||
|
StartFailoverTimerLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::Orphan() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned", |
||||||
|
priority_policy_.get(), name_.c_str(), this); |
||||||
|
} |
||||||
|
MaybeCancelFailoverTimerLocked(); |
||||||
|
if (deactivation_timer_callback_pending_) { |
||||||
|
grpc_timer_cancel(&deactivation_timer_); |
||||||
|
} |
||||||
|
// Remove the child policy's interested_parties pollset_set from the
|
||||||
|
// xDS policy.
|
||||||
|
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), |
||||||
|
priority_policy_->interested_parties()); |
||||||
|
child_policy_.reset(); |
||||||
|
// Drop our ref to the child's picker, in case it's holding a ref to
|
||||||
|
// the child.
|
||||||
|
picker_wrapper_.reset(); |
||||||
|
if (deactivation_timer_callback_pending_) { |
||||||
|
grpc_timer_cancel(&deactivation_timer_); |
||||||
|
} |
||||||
|
Unref(DEBUG_LOCATION, "ChildPriority+Orphan"); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::UpdateLocked( |
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> config) { |
||||||
|
if (priority_policy_->shutting_down_) return; |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update", |
||||||
|
priority_policy_.get(), name_.c_str(), this); |
||||||
|
} |
||||||
|
// Create policy if needed.
|
||||||
|
if (child_policy_ == nullptr) { |
||||||
|
child_policy_ = CreateChildPolicyLocked(priority_policy_->args_); |
||||||
|
} |
||||||
|
// Construct update args.
|
||||||
|
UpdateArgs update_args; |
||||||
|
update_args.config = std::move(config); |
||||||
|
update_args.addresses = priority_policy_->addresses_[name_]; |
||||||
|
update_args.args = grpc_channel_args_copy(priority_policy_->args_); |
||||||
|
// Update the policy.
|
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] child %s (%p): updating child policy handler %p", |
||||||
|
priority_policy_.get(), name_.c_str(), this, child_policy_.get()); |
||||||
|
} |
||||||
|
child_policy_->UpdateLocked(std::move(update_args)); |
||||||
|
} |
||||||
|
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> |
||||||
|
PriorityLb::ChildPriority::CreateChildPolicyLocked( |
||||||
|
const grpc_channel_args* args) { |
||||||
|
LoadBalancingPolicy::Args lb_policy_args; |
||||||
|
lb_policy_args.combiner = priority_policy_->combiner(); |
||||||
|
lb_policy_args.args = args; |
||||||
|
lb_policy_args.channel_control_helper = |
||||||
|
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper")); |
||||||
|
OrphanablePtr<LoadBalancingPolicy> lb_policy = |
||||||
|
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
||||||
|
&grpc_lb_priority_trace); |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] child %s (%p): created new child policy " |
||||||
|
"handler %p", |
||||||
|
priority_policy_.get(), name_.c_str(), this, lb_policy.get()); |
||||||
|
} |
||||||
|
// Add the parent's interested_parties pollset_set to that of the newly
|
||||||
|
// created child policy. This will make the child policy progress upon
|
||||||
|
// activity on the parent LB, which in turn is tied to the application's call.
|
||||||
|
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
||||||
|
priority_policy_->interested_parties()); |
||||||
|
return lb_policy; |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::ExitIdleLocked() { |
||||||
|
if (connectivity_state_ == GRPC_CHANNEL_IDLE && |
||||||
|
!failover_timer_callback_pending_) { |
||||||
|
StartFailoverTimerLocked(); |
||||||
|
} |
||||||
|
child_policy_->ExitIdleLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::ResetBackoffLocked() { |
||||||
|
child_policy_->ResetBackoffLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked( |
||||||
|
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] child %s (%p): state update: %s, picker %p", |
||||||
|
priority_policy_.get(), name_.c_str(), this, |
||||||
|
ConnectivityStateName(state), picker.get()); |
||||||
|
} |
||||||
|
// Store the state and picker.
|
||||||
|
connectivity_state_ = state; |
||||||
|
picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker)); |
||||||
|
// If READY or TRANSIENT_FAILURE, cancel failover timer.
|
||||||
|
if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
||||||
|
MaybeCancelFailoverTimerLocked(); |
||||||
|
} |
||||||
|
// Notify the parent policy.
|
||||||
|
priority_policy_->HandleChildConnectivityStateChangeLocked(this); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::StartFailoverTimerLocked() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] child %s (%p): starting failover timer for %d ms", |
||||||
|
priority_policy_.get(), name_.c_str(), this, |
||||||
|
priority_policy_->child_failover_timeout_ms_); |
||||||
|
} |
||||||
|
Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release(); |
||||||
|
grpc_timer_init( |
||||||
|
&failover_timer_, |
||||||
|
ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_ms_, |
||||||
|
&on_failover_timer_); |
||||||
|
failover_timer_callback_pending_ = true; |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() { |
||||||
|
if (failover_timer_callback_pending_) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] child %s (%p): cancelling failover timer", |
||||||
|
priority_policy_.get(), name_.c_str(), this); |
||||||
|
} |
||||||
|
grpc_timer_cancel(&failover_timer_); |
||||||
|
failover_timer_callback_pending_ = false; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::OnFailoverTimer(void* arg, grpc_error* error) { |
||||||
|
ChildPriority* self = static_cast<ChildPriority*>(arg); |
||||||
|
self->priority_policy_->combiner()->Run(&self->on_failover_timer_locked_, |
||||||
|
GRPC_ERROR_REF(error)); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::OnFailoverTimerLocked(void* arg, |
||||||
|
grpc_error* error) { |
||||||
|
ChildPriority* self = static_cast<ChildPriority*>(arg); |
||||||
|
if (error == GRPC_ERROR_NONE && self->failover_timer_callback_pending_ && |
||||||
|
!self->priority_policy_->shutting_down_) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] child %s (%p): failover timer fired, " |
||||||
|
"reporting TRANSIENT_FAILURE", |
||||||
|
self->priority_policy_.get(), self->name_.c_str(), self); |
||||||
|
} |
||||||
|
self->failover_timer_callback_pending_ = false; |
||||||
|
self->OnConnectivityStateUpdateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, |
||||||
|
nullptr); |
||||||
|
} |
||||||
|
self->Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked"); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::DeactivateLocked() { |
||||||
|
// If already deactivated, don't do it again.
|
||||||
|
if (deactivation_timer_callback_pending_) return; |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] child %s (%p): deactivating -- will remove in %d " |
||||||
|
"ms.", |
||||||
|
priority_policy_.get(), name_.c_str(), this, |
||||||
|
kChildRetentionIntervalMs); |
||||||
|
} |
||||||
|
MaybeCancelFailoverTimerLocked(); |
||||||
|
// Start a timer to delete the child.
|
||||||
|
Ref(DEBUG_LOCATION, "ChildPriority+timer").release(); |
||||||
|
GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
grpc_timer_init(&deactivation_timer_, |
||||||
|
ExecCtx::Get()->Now() + kChildRetentionIntervalMs, |
||||||
|
&on_deactivation_timer_); |
||||||
|
deactivation_timer_callback_pending_ = true; |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::MaybeReactivateLocked() { |
||||||
|
if (deactivation_timer_callback_pending_) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating", |
||||||
|
priority_policy_.get(), name_.c_str(), this); |
||||||
|
} |
||||||
|
deactivation_timer_callback_pending_ = false; |
||||||
|
grpc_timer_cancel(&deactivation_timer_); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg, |
||||||
|
grpc_error* error) { |
||||||
|
ChildPriority* self = static_cast<ChildPriority*>(arg); |
||||||
|
self->priority_policy_->combiner()->Run( |
||||||
|
GRPC_CLOSURE_INIT(&self->on_deactivation_timer_, |
||||||
|
OnDeactivationTimerLocked, self, nullptr), |
||||||
|
GRPC_ERROR_REF(error)); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::OnDeactivationTimerLocked(void* arg, |
||||||
|
grpc_error* error) { |
||||||
|
ChildPriority* self = static_cast<ChildPriority*>(arg); |
||||||
|
if (error == GRPC_ERROR_NONE && self->deactivation_timer_callback_pending_ && |
||||||
|
!self->priority_policy_->shutting_down_) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[priority_lb %p] child %s (%p): deactivation timer fired, " |
||||||
|
"deleting child", |
||||||
|
self->priority_policy_.get(), self->name_.c_str(), self); |
||||||
|
} |
||||||
|
self->deactivation_timer_callback_pending_ = false; |
||||||
|
self->priority_policy_->DeleteChild(self); |
||||||
|
} |
||||||
|
self->Unref(DEBUG_LOCATION, "ChildPriority+timer"); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// PriorityLb::ChildPriority::Helper
|
||||||
|
//
|
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::Helper::RequestReresolution() { |
||||||
|
if (priority_->priority_policy_->shutting_down_) return; |
||||||
|
priority_->priority_policy_->channel_control_helper()->RequestReresolution(); |
||||||
|
} |
||||||
|
|
||||||
|
RefCountedPtr<SubchannelInterface> |
||||||
|
PriorityLb::ChildPriority::Helper::CreateSubchannel( |
||||||
|
const grpc_channel_args& args) { |
||||||
|
if (priority_->priority_policy_->shutting_down_) return nullptr; |
||||||
|
return priority_->priority_policy_->channel_control_helper() |
||||||
|
->CreateSubchannel(args); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::Helper::UpdateState( |
||||||
|
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { |
||||||
|
if (priority_->priority_policy_->shutting_down_) return; |
||||||
|
// Notify the priority.
|
||||||
|
priority_->OnConnectivityStateUpdateLocked(state, std::move(picker)); |
||||||
|
} |
||||||
|
|
||||||
|
void PriorityLb::ChildPriority::Helper::AddTraceEvent(TraceSeverity severity, |
||||||
|
StringView message) { |
||||||
|
if (priority_->priority_policy_->shutting_down_) return; |
||||||
|
priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity, |
||||||
|
message); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// factory
|
||||||
|
//
|
||||||
|
|
||||||
|
class PriorityLbFactory : public LoadBalancingPolicyFactory { |
||||||
|
public: |
||||||
|
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
||||||
|
LoadBalancingPolicy::Args args) const override { |
||||||
|
return MakeOrphanable<PriorityLb>(std::move(args)); |
||||||
|
} |
||||||
|
|
||||||
|
const char* name() const override { return kPriority; } |
||||||
|
|
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
||||||
|
const Json& json, grpc_error** error) const override { |
||||||
|
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); |
||||||
|
if (json.type() == Json::Type::JSON_NULL) { |
||||||
|
// priority was mentioned as a policy in the deprecated
|
||||||
|
// loadBalancingPolicy field or in the client API.
|
||||||
|
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:loadBalancingPolicy error:priority policy requires " |
||||||
|
"configuration. Please use loadBalancingConfig field of service " |
||||||
|
"config instead."); |
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
std::vector<grpc_error*> error_list; |
||||||
|
// Children.
|
||||||
|
std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>> children; |
||||||
|
auto it = json.object_value().find("children"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:children error:required field missing")); |
||||||
|
} else if (it->second.type() != Json::Type::OBJECT) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:children error:type should be object")); |
||||||
|
} else { |
||||||
|
const Json::Object& object = it->second.object_value(); |
||||||
|
for (const auto& p : object) { |
||||||
|
const std::string& child_name = p.first; |
||||||
|
const Json& element = p.second; |
||||||
|
if (element.type() != Json::Type::OBJECT) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||||
|
absl::StrCat("field:children key:", child_name, |
||||||
|
" error:should be type object") |
||||||
|
.c_str())); |
||||||
|
} else { |
||||||
|
auto it2 = element.object_value().find("config"); |
||||||
|
if (it2 == element.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||||
|
absl::StrCat("field:children key:", child_name, |
||||||
|
" error:missing 'config' field") |
||||||
|
.c_str())); |
||||||
|
} else { |
||||||
|
grpc_error* parse_error = GRPC_ERROR_NONE; |
||||||
|
auto config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( |
||||||
|
it2->second, &parse_error); |
||||||
|
if (config == nullptr) { |
||||||
|
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); |
||||||
|
error_list.push_back( |
||||||
|
GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING( |
||||||
|
absl::StrCat("field:children key:", child_name).c_str(), |
||||||
|
&parse_error, 1)); |
||||||
|
GRPC_ERROR_UNREF(parse_error); |
||||||
|
} |
||||||
|
children[child_name] = std::move(config); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
// Priorities.
|
||||||
|
std::vector<std::string> priorities; |
||||||
|
it = json.object_value().find("priorities"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:priorities error:required field missing")); |
||||||
|
} else if (it->second.type() != Json::Type::ARRAY) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:priorities error:type should be array")); |
||||||
|
} else { |
||||||
|
const Json::Array& array = it->second.array_value(); |
||||||
|
for (size_t i = 0; i < array.size(); ++i) { |
||||||
|
const Json& element = array[i]; |
||||||
|
if (element.type() != Json::Type::STRING) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||||
|
absl::StrCat("field:priorities element:", i, |
||||||
|
" error:should be type string") |
||||||
|
.c_str())); |
||||||
|
} else if (children.find(element.string_value()) == children.end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||||
|
absl::StrCat("field:priorities element:", i, |
||||||
|
" error:unknown child '", element.string_value(), |
||||||
|
"'") |
||||||
|
.c_str())); |
||||||
|
} else { |
||||||
|
priorities.emplace_back(element.string_value()); |
||||||
|
} |
||||||
|
} |
||||||
|
if (priorities.size() != children.size()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||||
|
absl::StrCat("field:priorities error:priorities size (", |
||||||
|
priorities.size(), ") != children size (", |
||||||
|
children.size(), ")") |
||||||
|
.c_str())); |
||||||
|
} |
||||||
|
} |
||||||
|
if (error_list.empty()) { |
||||||
|
return MakeRefCounted<PriorityLbConfig>(std::move(children), |
||||||
|
std::move(priorities)); |
||||||
|
} else { |
||||||
|
*error = GRPC_ERROR_CREATE_FROM_VECTOR( |
||||||
|
"priority_experimental LB policy config", &error_list); |
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
//
|
||||||
|
// Plugin registration
|
||||||
|
//
|
||||||
|
|
||||||
|
void grpc_lb_policy_priority_init() { |
||||||
|
grpc_core::LoadBalancingPolicyRegistry::Builder:: |
||||||
|
RegisterLoadBalancingPolicyFactory( |
||||||
|
absl::make_unique<grpc_core::PriorityLbFactory>()); |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_lb_policy_priority_shutdown() {} |
@ -0,0 +1,722 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2018 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <inttypes.h> |
||||||
|
#include <limits.h> |
||||||
|
#include <string.h> |
||||||
|
|
||||||
|
#include "absl/strings/str_cat.h" |
||||||
|
|
||||||
|
#include <grpc/grpc.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/gpr/string.h" |
||||||
|
#include "src/core/lib/gprpp/orphanable.h" |
||||||
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||||
|
#include "src/core/lib/iomgr/combiner.h" |
||||||
|
#include "src/core/lib/iomgr/timer.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb"); |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
constexpr char kWeightedTarget[] = "weighted_target_experimental"; |
||||||
|
|
||||||
|
// How long we keep a child around for after it has been removed from
|
||||||
|
// the config.
|
||||||
|
constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000; |
||||||
|
|
||||||
|
// Config for weighted_target LB policy.
|
||||||
|
class WeightedTargetLbConfig : public LoadBalancingPolicy::Config { |
||||||
|
public: |
||||||
|
struct ChildConfig { |
||||||
|
uint32_t weight; |
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> config; |
||||||
|
}; |
||||||
|
|
||||||
|
using TargetMap = std::map<std::string, ChildConfig>; |
||||||
|
|
||||||
|
explicit WeightedTargetLbConfig(TargetMap target_map) |
||||||
|
: target_map_(std::move(target_map)) {} |
||||||
|
|
||||||
|
const char* name() const override { return kWeightedTarget; } |
||||||
|
|
||||||
|
const TargetMap& target_map() const { return target_map_; } |
||||||
|
|
||||||
|
private: |
||||||
|
TargetMap target_map_; |
||||||
|
}; |
||||||
|
|
||||||
|
// weighted_target LB policy.
|
||||||
|
class WeightedTargetLb : public LoadBalancingPolicy { |
||||||
|
public: |
||||||
|
explicit WeightedTargetLb(Args args); |
||||||
|
|
||||||
|
const char* name() const override { return kWeightedTarget; } |
||||||
|
|
||||||
|
void UpdateLocked(UpdateArgs args) override; |
||||||
|
void ResetBackoffLocked() override; |
||||||
|
|
||||||
|
private: |
||||||
|
// A simple wrapper for ref-counting a picker from the child policy.
|
||||||
|
class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> { |
||||||
|
public: |
||||||
|
explicit ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker) |
||||||
|
: picker_(std::move(picker)) {} |
||||||
|
PickResult Pick(PickArgs args) { return picker_->Pick(args); } |
||||||
|
|
||||||
|
private: |
||||||
|
std::unique_ptr<SubchannelPicker> picker_; |
||||||
|
}; |
||||||
|
|
||||||
|
// Picks a child using stateless WRR and then delegates to that
|
||||||
|
// child's picker.
|
||||||
|
class WeightedPicker : public SubchannelPicker { |
||||||
|
public: |
||||||
|
// Maintains a weighted list of pickers from each child that is in
|
||||||
|
// ready state. The first element in the pair represents the end of a
|
||||||
|
// range proportional to the child's weight. The start of the range
|
||||||
|
// is the previous value in the vector and is 0 for the first element.
|
||||||
|
using PickerList = |
||||||
|
InlinedVector<std::pair<uint32_t, RefCountedPtr<ChildPickerWrapper>>, |
||||||
|
1>; |
||||||
|
|
||||||
|
explicit WeightedPicker(PickerList pickers) |
||||||
|
: pickers_(std::move(pickers)) {} |
||||||
|
|
||||||
|
PickResult Pick(PickArgs args) override; |
||||||
|
|
||||||
|
private: |
||||||
|
PickerList pickers_; |
||||||
|
}; |
||||||
|
|
||||||
|
// Each WeightedChild holds a ref to its parent WeightedTargetLb.
|
||||||
|
class WeightedChild : public InternallyRefCounted<WeightedChild> { |
||||||
|
public: |
||||||
|
WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy, |
||||||
|
const std::string& name); |
||||||
|
~WeightedChild(); |
||||||
|
|
||||||
|
void Orphan() override; |
||||||
|
|
||||||
|
void UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config, |
||||||
|
ServerAddressList addresses, |
||||||
|
const grpc_channel_args* args); |
||||||
|
void ResetBackoffLocked(); |
||||||
|
void DeactivateLocked(); |
||||||
|
|
||||||
|
uint32_t weight() const { return weight_; } |
||||||
|
grpc_connectivity_state connectivity_state() const { |
||||||
|
return connectivity_state_; |
||||||
|
} |
||||||
|
RefCountedPtr<ChildPickerWrapper> picker_wrapper() const { |
||||||
|
return picker_wrapper_; |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
class Helper : public ChannelControlHelper { |
||||||
|
public: |
||||||
|
explicit Helper(RefCountedPtr<WeightedChild> weighted_child) |
||||||
|
: weighted_child_(std::move(weighted_child)) {} |
||||||
|
|
||||||
|
~Helper() { weighted_child_.reset(DEBUG_LOCATION, "Helper"); } |
||||||
|
|
||||||
|
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
||||||
|
const grpc_channel_args& args) override; |
||||||
|
void UpdateState(grpc_connectivity_state state, |
||||||
|
std::unique_ptr<SubchannelPicker> picker) override; |
||||||
|
void RequestReresolution() override; |
||||||
|
void AddTraceEvent(TraceSeverity severity, StringView message) override; |
||||||
|
|
||||||
|
private: |
||||||
|
RefCountedPtr<WeightedChild> weighted_child_; |
||||||
|
}; |
||||||
|
|
||||||
|
// Methods for dealing with the child policy.
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
||||||
|
const grpc_channel_args* args); |
||||||
|
|
||||||
|
void OnConnectivityStateUpdateLocked( |
||||||
|
grpc_connectivity_state state, |
||||||
|
std::unique_ptr<SubchannelPicker> picker); |
||||||
|
|
||||||
|
static void OnDelayedRemovalTimer(void* arg, grpc_error* error); |
||||||
|
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); |
||||||
|
|
||||||
|
// The owning LB policy.
|
||||||
|
RefCountedPtr<WeightedTargetLb> weighted_target_policy_; |
||||||
|
|
||||||
|
const std::string& name_; |
||||||
|
|
||||||
|
uint32_t weight_; |
||||||
|
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
||||||
|
|
||||||
|
RefCountedPtr<ChildPickerWrapper> picker_wrapper_; |
||||||
|
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING; |
||||||
|
bool seen_failure_since_ready_ = false; |
||||||
|
|
||||||
|
// States for delayed removal.
|
||||||
|
grpc_timer delayed_removal_timer_; |
||||||
|
grpc_closure on_delayed_removal_timer_; |
||||||
|
bool delayed_removal_timer_callback_pending_ = false; |
||||||
|
bool shutdown_ = false; |
||||||
|
}; |
||||||
|
|
||||||
|
~WeightedTargetLb(); |
||||||
|
|
||||||
|
void ShutdownLocked() override; |
||||||
|
|
||||||
|
void UpdateStateLocked(); |
||||||
|
|
||||||
|
// Current config from the resolver.
|
||||||
|
RefCountedPtr<WeightedTargetLbConfig> config_; |
||||||
|
|
||||||
|
// Internal state.
|
||||||
|
bool shutting_down_ = false; |
||||||
|
|
||||||
|
// Children.
|
||||||
|
std::map<std::string, OrphanablePtr<WeightedChild>> targets_; |
||||||
|
}; |
||||||
|
|
||||||
|
//
|
||||||
|
// WeightedTargetLb::WeightedPicker
|
||||||
|
//
|
||||||
|
|
||||||
|
WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick( |
||||||
|
PickArgs args) { |
||||||
|
// Generate a random number in [0, total weight).
|
||||||
|
const uint32_t key = rand() % pickers_[pickers_.size() - 1].first; |
||||||
|
// Find the index in pickers_ corresponding to key.
|
||||||
|
size_t mid = 0; |
||||||
|
size_t start_index = 0; |
||||||
|
size_t end_index = pickers_.size() - 1; |
||||||
|
size_t index = 0; |
||||||
|
while (end_index > start_index) { |
||||||
|
mid = (start_index + end_index) / 2; |
||||||
|
if (pickers_[mid].first > key) { |
||||||
|
end_index = mid; |
||||||
|
} else if (pickers_[mid].first < key) { |
||||||
|
start_index = mid + 1; |
||||||
|
} else { |
||||||
|
index = mid + 1; |
||||||
|
break; |
||||||
|
} |
||||||
|
} |
||||||
|
if (index == 0) index = start_index; |
||||||
|
GPR_ASSERT(pickers_[index].first > key); |
||||||
|
// Delegate to the child picker.
|
||||||
|
return pickers_[index].second->Pick(args); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// WeightedTargetLb
|
||||||
|
//
|
||||||
|
|
||||||
|
WeightedTargetLb::WeightedTargetLb(Args args) |
||||||
|
: LoadBalancingPolicy(std::move(args)) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[weighted_target_lb %p] created", this); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
WeightedTargetLb::~WeightedTargetLb() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[weighted_target_lb %p] destroying weighted_target LB policy", |
||||||
|
this); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::ShutdownLocked() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[weighted_target_lb %p] shutting down", this); |
||||||
|
} |
||||||
|
shutting_down_ = true; |
||||||
|
targets_.clear(); |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::ResetBackoffLocked() { |
||||||
|
for (auto& p : targets_) p.second->ResetBackoffLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::UpdateLocked(UpdateArgs args) { |
||||||
|
if (shutting_down_) return; |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this); |
||||||
|
} |
||||||
|
// Update config.
|
||||||
|
config_ = std::move(args.config); |
||||||
|
// Deactivate the targets not in the new config.
|
||||||
|
for (const auto& p : targets_) { |
||||||
|
const std::string& name = p.first; |
||||||
|
WeightedChild* child = p.second.get(); |
||||||
|
if (config_->target_map().find(name) == config_->target_map().end()) { |
||||||
|
child->DeactivateLocked(); |
||||||
|
} |
||||||
|
} |
||||||
|
// Add or update the targets in the new config.
|
||||||
|
HierarchicalAddressMap address_map = |
||||||
|
MakeHierarchicalAddressMap(args.addresses); |
||||||
|
for (const auto& p : config_->target_map()) { |
||||||
|
const std::string& name = p.first; |
||||||
|
const WeightedTargetLbConfig::ChildConfig& config = p.second; |
||||||
|
auto it = targets_.find(name); |
||||||
|
if (it == targets_.end()) { |
||||||
|
it = targets_.emplace(std::make_pair(name, nullptr)).first; |
||||||
|
it->second = MakeOrphanable<WeightedChild>( |
||||||
|
Ref(DEBUG_LOCATION, "WeightedChild"), it->first); |
||||||
|
} |
||||||
|
it->second->UpdateLocked(config, std::move(address_map[name]), args.args); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::UpdateStateLocked() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[weighted_target_lb %p] scanning children to determine " |
||||||
|
"connectivity state", |
||||||
|
this); |
||||||
|
} |
||||||
|
// Construct a new picker which maintains a map of all child pickers
|
||||||
|
// that are ready. Each child is represented by a portion of the range
|
||||||
|
// proportional to its weight, such that the total range is the sum of the
|
||||||
|
// weights of all children.
|
||||||
|
WeightedPicker::PickerList picker_list; |
||||||
|
uint32_t end = 0; |
||||||
|
// Also count the number of children in each state, to determine the
|
||||||
|
// overall state.
|
||||||
|
size_t num_connecting = 0; |
||||||
|
size_t num_idle = 0; |
||||||
|
size_t num_transient_failures = 0; |
||||||
|
for (const auto& p : targets_) { |
||||||
|
const std::string& child_name = p.first; |
||||||
|
const WeightedChild* child = p.second.get(); |
||||||
|
// Skip the targets that are not in the latest update.
|
||||||
|
if (config_->target_map().find(child_name) == config_->target_map().end()) { |
||||||
|
continue; |
||||||
|
} |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[weighted_target_lb %p] child=%s state=%s weight=%d picker=%p", |
||||||
|
this, child_name.c_str(), |
||||||
|
ConnectivityStateName(child->connectivity_state()), |
||||||
|
child->weight(), child->picker_wrapper().get()); |
||||||
|
} |
||||||
|
switch (child->connectivity_state()) { |
||||||
|
case GRPC_CHANNEL_READY: { |
||||||
|
end += child->weight(); |
||||||
|
picker_list.push_back(std::make_pair(end, child->picker_wrapper())); |
||||||
|
break; |
||||||
|
} |
||||||
|
case GRPC_CHANNEL_CONNECTING: { |
||||||
|
++num_connecting; |
||||||
|
break; |
||||||
|
} |
||||||
|
case GRPC_CHANNEL_IDLE: { |
||||||
|
++num_idle; |
||||||
|
break; |
||||||
|
} |
||||||
|
case GRPC_CHANNEL_TRANSIENT_FAILURE: { |
||||||
|
++num_transient_failures; |
||||||
|
break; |
||||||
|
} |
||||||
|
default: |
||||||
|
GPR_UNREACHABLE_CODE(return ); |
||||||
|
} |
||||||
|
} |
||||||
|
// Determine aggregated connectivity state.
|
||||||
|
grpc_connectivity_state connectivity_state; |
||||||
|
if (!picker_list.empty()) { |
||||||
|
connectivity_state = GRPC_CHANNEL_READY; |
||||||
|
} else if (num_connecting > 0) { |
||||||
|
connectivity_state = GRPC_CHANNEL_CONNECTING; |
||||||
|
} else if (num_idle > 0) { |
||||||
|
connectivity_state = GRPC_CHANNEL_IDLE; |
||||||
|
} else { |
||||||
|
connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; |
||||||
|
} |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[weighted_target_lb %p] connectivity changed to %s", |
||||||
|
this, ConnectivityStateName(connectivity_state)); |
||||||
|
} |
||||||
|
std::unique_ptr<SubchannelPicker> picker; |
||||||
|
switch (connectivity_state) { |
||||||
|
case GRPC_CHANNEL_READY: |
||||||
|
picker = absl::make_unique<WeightedPicker>(std::move(picker_list)); |
||||||
|
break; |
||||||
|
case GRPC_CHANNEL_CONNECTING: |
||||||
|
case GRPC_CHANNEL_IDLE: |
||||||
|
picker = |
||||||
|
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")); |
||||||
|
break; |
||||||
|
default: |
||||||
|
picker = absl::make_unique<TransientFailurePicker>( |
||||||
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"weighted_target: all children report state TRANSIENT_FAILURE")); |
||||||
|
} |
||||||
|
channel_control_helper()->UpdateState(connectivity_state, std::move(picker)); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// WeightedTargetLb::WeightedChild
|
||||||
|
//
|
||||||
|
|
||||||
|
WeightedTargetLb::WeightedChild::WeightedChild( |
||||||
|
RefCountedPtr<WeightedTargetLb> weighted_target_policy, |
||||||
|
const std::string& name) |
||||||
|
: weighted_target_policy_(std::move(weighted_target_policy)), name_(name) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s", |
||||||
|
weighted_target_policy_.get(), this, name_.c_str()); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
WeightedTargetLb::WeightedChild::~WeightedChild() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[weighted_target_lb %p] WeightedChild %p %s: destroying child", |
||||||
|
weighted_target_policy_.get(), this, name_.c_str()); |
||||||
|
} |
||||||
|
weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild"); |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::WeightedChild::Orphan() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[weighted_target_lb %p] WeightedChild %p %s: shutting down child", |
||||||
|
weighted_target_policy_.get(), this, name_.c_str()); |
||||||
|
} |
||||||
|
// Remove the child policy's interested_parties pollset_set from the
|
||||||
|
// xDS policy.
|
||||||
|
grpc_pollset_set_del_pollset_set( |
||||||
|
child_policy_->interested_parties(), |
||||||
|
weighted_target_policy_->interested_parties()); |
||||||
|
child_policy_.reset(); |
||||||
|
// Drop our ref to the child's picker, in case it's holding a ref to
|
||||||
|
// the child.
|
||||||
|
picker_wrapper_.reset(); |
||||||
|
if (delayed_removal_timer_callback_pending_) { |
||||||
|
delayed_removal_timer_callback_pending_ = false; |
||||||
|
grpc_timer_cancel(&delayed_removal_timer_); |
||||||
|
} |
||||||
|
shutdown_ = true; |
||||||
|
Unref(); |
||||||
|
} |
||||||
|
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> |
||||||
|
WeightedTargetLb::WeightedChild::CreateChildPolicyLocked( |
||||||
|
const grpc_channel_args* args) { |
||||||
|
LoadBalancingPolicy::Args lb_policy_args; |
||||||
|
lb_policy_args.combiner = weighted_target_policy_->combiner(); |
||||||
|
lb_policy_args.args = args; |
||||||
|
lb_policy_args.channel_control_helper = |
||||||
|
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper")); |
||||||
|
OrphanablePtr<LoadBalancingPolicy> lb_policy = |
||||||
|
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
||||||
|
&grpc_lb_weighted_target_trace); |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[weighted_target_lb %p] WeightedChild %p %s: Created new child " |
||||||
|
"policy handler %p", |
||||||
|
weighted_target_policy_.get(), this, name_.c_str(), |
||||||
|
lb_policy.get()); |
||||||
|
} |
||||||
|
// Add the xDS's interested_parties pollset_set to that of the newly created
|
||||||
|
// child policy. This will make the child policy progress upon activity on
|
||||||
|
// xDS LB, which in turn is tied to the application's call.
|
||||||
|
grpc_pollset_set_add_pollset_set( |
||||||
|
lb_policy->interested_parties(), |
||||||
|
weighted_target_policy_->interested_parties()); |
||||||
|
return lb_policy; |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::WeightedChild::UpdateLocked( |
||||||
|
const WeightedTargetLbConfig::ChildConfig& config, |
||||||
|
ServerAddressList addresses, const grpc_channel_args* args) { |
||||||
|
if (weighted_target_policy_->shutting_down_) return; |
||||||
|
// Update child weight.
|
||||||
|
weight_ = config.weight; |
||||||
|
// Reactivate if needed.
|
||||||
|
if (delayed_removal_timer_callback_pending_) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[weighted_target_lb %p] WeightedChild %p %s: reactivating", |
||||||
|
weighted_target_policy_.get(), this, name_.c_str()); |
||||||
|
} |
||||||
|
delayed_removal_timer_callback_pending_ = false; |
||||||
|
grpc_timer_cancel(&delayed_removal_timer_); |
||||||
|
} |
||||||
|
// Create child policy if needed.
|
||||||
|
if (child_policy_ == nullptr) { |
||||||
|
child_policy_ = CreateChildPolicyLocked(args); |
||||||
|
} |
||||||
|
// Construct update args.
|
||||||
|
UpdateArgs update_args; |
||||||
|
update_args.config = config.config; |
||||||
|
update_args.addresses = std::move(addresses); |
||||||
|
update_args.args = grpc_channel_args_copy(args); |
||||||
|
// Update the policy.
|
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[weighted_target_lb %p] WeightedChild %p %s: Updating child " |
||||||
|
"policy handler %p", |
||||||
|
weighted_target_policy_.get(), this, name_.c_str(), |
||||||
|
child_policy_.get()); |
||||||
|
} |
||||||
|
child_policy_->UpdateLocked(std::move(update_args)); |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::WeightedChild::ResetBackoffLocked() { |
||||||
|
child_policy_->ResetBackoffLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked( |
||||||
|
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { |
||||||
|
// Cache the picker in the WeightedChild.
|
||||||
|
picker_wrapper_ = MakeRefCounted<ChildPickerWrapper>(std::move(picker)); |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[weighted_target_lb %p] WeightedChild %p %s: connectivity " |
||||||
|
"state update: state=%s picker_wrapper=%p", |
||||||
|
weighted_target_policy_.get(), this, name_.c_str(), |
||||||
|
ConnectivityStateName(state), picker_wrapper_.get()); |
||||||
|
} |
||||||
|
// If the child reports IDLE, immediately tell it to exit idle.
|
||||||
|
if (state == GRPC_CHANNEL_IDLE) child_policy_->ExitIdleLocked(); |
||||||
|
// Decide what state to report for aggregation purposes.
|
||||||
|
// If we haven't seen a failure since the last time we were in state
|
||||||
|
// READY, then we report the state change as-is. However, once we do see
|
||||||
|
// a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
|
||||||
|
// changes until we go back into state READY.
|
||||||
|
if (!seen_failure_since_ready_) { |
||||||
|
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
||||||
|
seen_failure_since_ready_ = true; |
||||||
|
} |
||||||
|
} else { |
||||||
|
if (state != GRPC_CHANNEL_READY) return; |
||||||
|
seen_failure_since_ready_ = false; |
||||||
|
} |
||||||
|
connectivity_state_ = state; |
||||||
|
// Notify the LB policy.
|
||||||
|
weighted_target_policy_->UpdateStateLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::WeightedChild::DeactivateLocked() { |
||||||
|
// If already deactivated, don't do that again.
|
||||||
|
if (weight_ == 0) return; |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[weighted_target_lb %p] WeightedChild %p %s: deactivating", |
||||||
|
weighted_target_policy_.get(), this, name_.c_str()); |
||||||
|
} |
||||||
|
// Set the child weight to 0 so that future picker won't contain this child.
|
||||||
|
weight_ = 0; |
||||||
|
// Start a timer to delete the child.
|
||||||
|
Ref(DEBUG_LOCATION, "WeightedChild+timer").release(); |
||||||
|
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
delayed_removal_timer_callback_pending_ = true; |
||||||
|
grpc_timer_init(&delayed_removal_timer_, |
||||||
|
ExecCtx::Get()->Now() + kChildRetentionIntervalMs, |
||||||
|
&on_delayed_removal_timer_); |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimer(void* arg, |
||||||
|
grpc_error* error) { |
||||||
|
WeightedChild* self = static_cast<WeightedChild*>(arg); |
||||||
|
self->weighted_target_policy_->combiner()->Run( |
||||||
|
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, |
||||||
|
OnDelayedRemovalTimerLocked, self, nullptr), |
||||||
|
GRPC_ERROR_REF(error)); |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimerLocked( |
||||||
|
void* arg, grpc_error* error) { |
||||||
|
WeightedChild* self = static_cast<WeightedChild*>(arg); |
||||||
|
if (error == GRPC_ERROR_NONE && |
||||||
|
self->delayed_removal_timer_callback_pending_ && !self->shutdown_ && |
||||||
|
self->weight_ == 0) { |
||||||
|
self->delayed_removal_timer_callback_pending_ = false; |
||||||
|
self->weighted_target_policy_->targets_.erase(self->name_); |
||||||
|
} |
||||||
|
self->Unref(DEBUG_LOCATION, "WeightedChild+timer"); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// WeightedTargetLb::WeightedChild::Helper
|
||||||
|
//
|
||||||
|
|
||||||
|
RefCountedPtr<SubchannelInterface> |
||||||
|
WeightedTargetLb::WeightedChild::Helper::CreateSubchannel( |
||||||
|
const grpc_channel_args& args) { |
||||||
|
if (weighted_child_->weighted_target_policy_->shutting_down_) return nullptr; |
||||||
|
return weighted_child_->weighted_target_policy_->channel_control_helper() |
||||||
|
->CreateSubchannel(args); |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::WeightedChild::Helper::UpdateState( |
||||||
|
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { |
||||||
|
if (weighted_child_->weighted_target_policy_->shutting_down_) return; |
||||||
|
weighted_child_->OnConnectivityStateUpdateLocked(state, std::move(picker)); |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::WeightedChild::Helper::RequestReresolution() { |
||||||
|
if (weighted_child_->weighted_target_policy_->shutting_down_) return; |
||||||
|
weighted_child_->weighted_target_policy_->channel_control_helper() |
||||||
|
->RequestReresolution(); |
||||||
|
} |
||||||
|
|
||||||
|
void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent( |
||||||
|
TraceSeverity severity, StringView message) { |
||||||
|
if (weighted_child_->weighted_target_policy_->shutting_down_) return; |
||||||
|
weighted_child_->weighted_target_policy_->channel_control_helper() |
||||||
|
->AddTraceEvent(severity, message); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// factory
|
||||||
|
//
|
||||||
|
|
||||||
|
class WeightedTargetLbFactory : public LoadBalancingPolicyFactory { |
||||||
|
public: |
||||||
|
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
||||||
|
LoadBalancingPolicy::Args args) const override { |
||||||
|
return MakeOrphanable<WeightedTargetLb>(std::move(args)); |
||||||
|
} |
||||||
|
|
||||||
|
const char* name() const override { return kWeightedTarget; } |
||||||
|
|
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
||||||
|
const Json& json, grpc_error** error) const override { |
||||||
|
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); |
||||||
|
if (json.type() == Json::Type::JSON_NULL) { |
||||||
|
// weighted_target was mentioned as a policy in the deprecated
|
||||||
|
// loadBalancingPolicy field or in the client API.
|
||||||
|
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:loadBalancingPolicy error:weighted_target policy requires " |
||||||
|
"configuration. Please use loadBalancingConfig field of service " |
||||||
|
"config instead."); |
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
std::vector<grpc_error*> error_list; |
||||||
|
// Weight map.
|
||||||
|
WeightedTargetLbConfig::TargetMap target_map; |
||||||
|
auto it = json.object_value().find("targets"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:targets error:required field not present")); |
||||||
|
} else if (it->second.type() != Json::Type::OBJECT) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:targets error:type should be object")); |
||||||
|
} else { |
||||||
|
for (const auto& p : it->second.object_value()) { |
||||||
|
WeightedTargetLbConfig::ChildConfig child_config; |
||||||
|
std::vector<grpc_error*> child_errors = |
||||||
|
ParseChildConfig(p.second, &child_config); |
||||||
|
if (!child_errors.empty()) { |
||||||
|
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
|
||||||
|
// string is not static in this case.
|
||||||
|
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||||
|
absl::StrCat("field:targets key:", p.first).c_str()); |
||||||
|
for (grpc_error* child_error : child_errors) { |
||||||
|
error = grpc_error_add_child(error, child_error); |
||||||
|
} |
||||||
|
error_list.push_back(error); |
||||||
|
} else { |
||||||
|
target_map[p.first] = std::move(child_config); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
if (!error_list.empty()) { |
||||||
|
*error = GRPC_ERROR_CREATE_FROM_VECTOR( |
||||||
|
"weighted_target_experimental LB policy config", &error_list); |
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
return MakeRefCounted<WeightedTargetLbConfig>(std::move(target_map)); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
static std::vector<grpc_error*> ParseChildConfig( |
||||||
|
const Json& json, WeightedTargetLbConfig::ChildConfig* child_config) { |
||||||
|
std::vector<grpc_error*> error_list; |
||||||
|
if (json.type() != Json::Type::OBJECT) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"value should be of type object")); |
||||||
|
return error_list; |
||||||
|
} |
||||||
|
// Weight.
|
||||||
|
auto it = json.object_value().find("weight"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"required field \"weight\" not specified")); |
||||||
|
} else if (it->second.type() != Json::Type::NUMBER) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:weight error:must be of type number")); |
||||||
|
} else { |
||||||
|
child_config->weight = |
||||||
|
gpr_parse_nonnegative_int(it->second.string_value().c_str()); |
||||||
|
if (child_config->weight == -1) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:weight error:unparseable value")); |
||||||
|
} else if (child_config->weight == 0) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:weight error:value must be greater than zero")); |
||||||
|
} |
||||||
|
} |
||||||
|
// Child policy.
|
||||||
|
it = json.object_value().find("childPolicy"); |
||||||
|
if (it != json.object_value().end()) { |
||||||
|
grpc_error* parse_error = GRPC_ERROR_NONE; |
||||||
|
child_config->config = |
||||||
|
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(it->second, |
||||||
|
&parse_error); |
||||||
|
if (child_config->config == nullptr) { |
||||||
|
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); |
||||||
|
std::vector<grpc_error*> child_errors; |
||||||
|
child_errors.push_back(parse_error); |
||||||
|
error_list.push_back( |
||||||
|
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); |
||||||
|
} |
||||||
|
} |
||||||
|
return error_list; |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
//
|
||||||
|
// Plugin registration
|
||||||
|
//
|
||||||
|
|
||||||
|
void grpc_lb_policy_weighted_target_init() { |
||||||
|
grpc_core::LoadBalancingPolicyRegistry::Builder:: |
||||||
|
RegisterLoadBalancingPolicyFactory( |
||||||
|
absl::make_unique<grpc_core::WeightedTargetLbFactory>()); |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_lb_policy_weighted_target_shutdown() {} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,524 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2018 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <grpc/grpc.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
||||||
|
#include "src/core/ext/filters/client_channel/xds/xds_client.h" |
||||||
|
#include "src/core/ext/filters/client_channel/xds/xds_client_stats.h" |
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/gprpp/orphanable.h" |
||||||
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||||
|
#include "src/core/lib/iomgr/combiner.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
TraceFlag grpc_lb_lrs_trace(false, "lrs_lb"); |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
constexpr char kLrs[] = "lrs_experimental"; |
||||||
|
|
||||||
|
// Config for LRS LB policy.
|
||||||
|
class LrsLbConfig : public LoadBalancingPolicy::Config { |
||||||
|
public: |
||||||
|
LrsLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy, |
||||||
|
std::string cluster_name, std::string eds_service_name, |
||||||
|
std::string lrs_load_reporting_server_name, |
||||||
|
RefCountedPtr<XdsLocalityName> locality_name) |
||||||
|
: child_policy_(std::move(child_policy)), |
||||||
|
cluster_name_(std::move(cluster_name)), |
||||||
|
eds_service_name_(std::move(eds_service_name)), |
||||||
|
lrs_load_reporting_server_name_( |
||||||
|
std::move(lrs_load_reporting_server_name)), |
||||||
|
locality_name_(std::move(locality_name)) {} |
||||||
|
|
||||||
|
const char* name() const override { return kLrs; } |
||||||
|
|
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const { |
||||||
|
return child_policy_; |
||||||
|
} |
||||||
|
const std::string& cluster_name() const { return cluster_name_; } |
||||||
|
const std::string& eds_service_name() const { return eds_service_name_; } |
||||||
|
const std::string& lrs_load_reporting_server_name() const { |
||||||
|
return lrs_load_reporting_server_name_; |
||||||
|
}; |
||||||
|
RefCountedPtr<XdsLocalityName> locality_name() const { |
||||||
|
return locality_name_; |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_; |
||||||
|
std::string cluster_name_; |
||||||
|
std::string eds_service_name_; |
||||||
|
std::string lrs_load_reporting_server_name_; |
||||||
|
RefCountedPtr<XdsLocalityName> locality_name_; |
||||||
|
}; |
||||||
|
|
||||||
|
// LRS LB policy.
|
||||||
|
class LrsLb : public LoadBalancingPolicy { |
||||||
|
public: |
||||||
|
LrsLb(RefCountedPtr<XdsClient> xds_client, Args args); |
||||||
|
|
||||||
|
const char* name() const override { return kLrs; } |
||||||
|
|
||||||
|
void UpdateLocked(UpdateArgs args) override; |
||||||
|
void ExitIdleLocked() override; |
||||||
|
void ResetBackoffLocked() override; |
||||||
|
|
||||||
|
private: |
||||||
|
// A simple wrapper for ref-counting a picker from the child policy.
|
||||||
|
class RefCountedPicker : public RefCounted<RefCountedPicker> { |
||||||
|
public: |
||||||
|
explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker) |
||||||
|
: picker_(std::move(picker)) {} |
||||||
|
PickResult Pick(PickArgs args) { return picker_->Pick(args); } |
||||||
|
|
||||||
|
private: |
||||||
|
std::unique_ptr<SubchannelPicker> picker_; |
||||||
|
}; |
||||||
|
|
||||||
|
// A picker that wraps the picker from the child to perform load reporting.
|
||||||
|
class LoadReportingPicker : public SubchannelPicker { |
||||||
|
public: |
||||||
|
LoadReportingPicker(RefCountedPtr<RefCountedPicker> picker, |
||||||
|
RefCountedPtr<XdsClusterLocalityStats> locality_stats) |
||||||
|
: picker_(std::move(picker)), |
||||||
|
locality_stats_(std::move(locality_stats)) {} |
||||||
|
|
||||||
|
PickResult Pick(PickArgs args); |
||||||
|
|
||||||
|
private: |
||||||
|
RefCountedPtr<RefCountedPicker> picker_; |
||||||
|
RefCountedPtr<XdsClusterLocalityStats> locality_stats_; |
||||||
|
}; |
||||||
|
|
||||||
|
class Helper : public ChannelControlHelper { |
||||||
|
public: |
||||||
|
explicit Helper(RefCountedPtr<LrsLb> lrs_policy) |
||||||
|
: lrs_policy_(std::move(lrs_policy)) {} |
||||||
|
|
||||||
|
~Helper() { lrs_policy_.reset(DEBUG_LOCATION, "Helper"); } |
||||||
|
|
||||||
|
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
||||||
|
const grpc_channel_args& args) override; |
||||||
|
void UpdateState(grpc_connectivity_state state, |
||||||
|
std::unique_ptr<SubchannelPicker> picker) override; |
||||||
|
void RequestReresolution() override; |
||||||
|
void AddTraceEvent(TraceSeverity severity, StringView message) override; |
||||||
|
|
||||||
|
private: |
||||||
|
RefCountedPtr<LrsLb> lrs_policy_; |
||||||
|
}; |
||||||
|
|
||||||
|
~LrsLb(); |
||||||
|
|
||||||
|
void ShutdownLocked() override; |
||||||
|
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
||||||
|
const grpc_channel_args* args); |
||||||
|
void UpdateChildPolicyLocked(ServerAddressList addresses, |
||||||
|
const grpc_channel_args* args); |
||||||
|
|
||||||
|
void MaybeUpdatePickerLocked(); |
||||||
|
|
||||||
|
// Current config from the resolver.
|
||||||
|
RefCountedPtr<LrsLbConfig> config_; |
||||||
|
|
||||||
|
// Internal state.
|
||||||
|
bool shutting_down_ = false; |
||||||
|
|
||||||
|
// The xds client.
|
||||||
|
RefCountedPtr<XdsClient> xds_client_; |
||||||
|
|
||||||
|
// The stats for client-side load reporting.
|
||||||
|
RefCountedPtr<XdsClusterLocalityStats> locality_stats_; |
||||||
|
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
||||||
|
|
||||||
|
// Latest state and picker reported by the child policy.
|
||||||
|
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE; |
||||||
|
RefCountedPtr<RefCountedPicker> picker_; |
||||||
|
}; |
||||||
|
|
||||||
|
//
|
||||||
|
// LrsLb::LoadReportingPicker
|
||||||
|
//
|
||||||
|
|
||||||
|
LoadBalancingPolicy::PickResult LrsLb::LoadReportingPicker::Pick( |
||||||
|
LoadBalancingPolicy::PickArgs args) { |
||||||
|
// Forward the pick to the picker returned from the child policy.
|
||||||
|
PickResult result = picker_->Pick(args); |
||||||
|
if (result.type == PickResult::PICK_COMPLETE && |
||||||
|
result.subchannel != nullptr) { |
||||||
|
// Record a call started.
|
||||||
|
locality_stats_->AddCallStarted(); |
||||||
|
// Intercept the recv_trailing_metadata op to record call completion.
|
||||||
|
XdsClusterLocalityStats* locality_stats = |
||||||
|
locality_stats_->Ref(DEBUG_LOCATION, "LocalityStats+call").release(); |
||||||
|
result.recv_trailing_metadata_ready = |
||||||
|
// Note: This callback does not run in either the control plane
|
||||||
|
// combiner or in the data plane mutex.
|
||||||
|
[locality_stats](grpc_error* error, MetadataInterface* /*metadata*/, |
||||||
|
CallState* /*call_state*/) { |
||||||
|
const bool call_failed = error != GRPC_ERROR_NONE; |
||||||
|
locality_stats->AddCallFinished(call_failed); |
||||||
|
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call"); |
||||||
|
}; |
||||||
|
} |
||||||
|
return result; |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// LrsLb
|
||||||
|
//
|
||||||
|
|
||||||
|
LrsLb::LrsLb(RefCountedPtr<XdsClient> xds_client, Args args) |
||||||
|
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[lrs_lb %p] created -- using xds client %p from channel", |
||||||
|
this, xds_client_.get()); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
LrsLb::~LrsLb() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[lrs_lb %p] destroying xds LB policy", this); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void LrsLb::ShutdownLocked() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[lrs_lb %p] shutting down", this); |
||||||
|
} |
||||||
|
shutting_down_ = true; |
||||||
|
// Remove the child policy's interested_parties pollset_set from the
|
||||||
|
// xDS policy.
|
||||||
|
if (child_policy_ != nullptr) { |
||||||
|
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), |
||||||
|
interested_parties()); |
||||||
|
child_policy_.reset(); |
||||||
|
} |
||||||
|
// Drop our ref to the child's picker, in case it's holding a ref to
|
||||||
|
// the child.
|
||||||
|
picker_.reset(); |
||||||
|
locality_stats_.reset(); |
||||||
|
xds_client_.reset(); |
||||||
|
} |
||||||
|
|
||||||
|
void LrsLb::ExitIdleLocked() { |
||||||
|
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void LrsLb::ResetBackoffLocked() { |
||||||
|
// The XdsClient will have its backoff reset by the xds resolver, so we
|
||||||
|
// don't need to do it here.
|
||||||
|
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void LrsLb::UpdateLocked(UpdateArgs args) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[lrs_lb %p] Received update", this); |
||||||
|
} |
||||||
|
// Update config.
|
||||||
|
auto old_config = std::move(config_); |
||||||
|
config_ = std::move(args.config); |
||||||
|
// Update load reporting if needed.
|
||||||
|
if (old_config == nullptr || |
||||||
|
config_->lrs_load_reporting_server_name() != |
||||||
|
old_config->lrs_load_reporting_server_name() || |
||||||
|
config_->cluster_name() != old_config->cluster_name() || |
||||||
|
config_->eds_service_name() != old_config->eds_service_name() || |
||||||
|
*config_->locality_name() != *old_config->locality_name()) { |
||||||
|
locality_stats_ = xds_client_->AddClusterLocalityStats( |
||||||
|
config_->lrs_load_reporting_server_name(), config_->cluster_name(), |
||||||
|
config_->eds_service_name(), config_->locality_name()); |
||||||
|
MaybeUpdatePickerLocked(); |
||||||
|
} |
||||||
|
// Update child policy.
|
||||||
|
UpdateChildPolicyLocked(std::move(args.addresses), args.args); |
||||||
|
args.args = nullptr; // Ownership passed to UpdateChildPolicyLocked().
|
||||||
|
} |
||||||
|
|
||||||
|
void LrsLb::MaybeUpdatePickerLocked() { |
||||||
|
if (picker_ != nullptr) { |
||||||
|
auto lrs_picker = |
||||||
|
absl::make_unique<LoadReportingPicker>(picker_, locality_stats_); |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[lrs_lb %p] updating connectivity: state=%s picker=%p", |
||||||
|
this, ConnectivityStateName(state_), lrs_picker.get()); |
||||||
|
} |
||||||
|
channel_control_helper()->UpdateState(state_, std::move(lrs_picker)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> LrsLb::CreateChildPolicyLocked( |
||||||
|
const grpc_channel_args* args) { |
||||||
|
LoadBalancingPolicy::Args lb_policy_args; |
||||||
|
lb_policy_args.combiner = combiner(); |
||||||
|
lb_policy_args.args = args; |
||||||
|
lb_policy_args.channel_control_helper = |
||||||
|
absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper")); |
||||||
|
OrphanablePtr<LoadBalancingPolicy> lb_policy = |
||||||
|
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
||||||
|
&grpc_lb_lrs_trace); |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[lrs_lb %p] Created new child policy handler %p", this, |
||||||
|
lb_policy.get()); |
||||||
|
} |
||||||
|
// Add our interested_parties pollset_set to that of the newly created
|
||||||
|
// child policy. This will make the child policy progress upon activity on
|
||||||
|
// this policy, which in turn is tied to the application's call.
|
||||||
|
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
||||||
|
interested_parties()); |
||||||
|
return lb_policy; |
||||||
|
} |
||||||
|
|
||||||
|
void LrsLb::UpdateChildPolicyLocked(ServerAddressList addresses, |
||||||
|
const grpc_channel_args* args) { |
||||||
|
// Create policy if needed.
|
||||||
|
if (child_policy_ == nullptr) { |
||||||
|
child_policy_ = CreateChildPolicyLocked(args); |
||||||
|
} |
||||||
|
// Construct update args.
|
||||||
|
UpdateArgs update_args; |
||||||
|
update_args.addresses = std::move(addresses); |
||||||
|
update_args.config = config_->child_policy(); |
||||||
|
update_args.args = args; |
||||||
|
// Update the policy.
|
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[lrs_lb %p] Updating child policy handler %p", this, |
||||||
|
child_policy_.get()); |
||||||
|
} |
||||||
|
child_policy_->UpdateLocked(std::move(update_args)); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// LrsLb::Helper
|
||||||
|
//
|
||||||
|
|
||||||
|
RefCountedPtr<SubchannelInterface> LrsLb::Helper::CreateSubchannel( |
||||||
|
const grpc_channel_args& args) { |
||||||
|
if (lrs_policy_->shutting_down_) return nullptr; |
||||||
|
return lrs_policy_->channel_control_helper()->CreateSubchannel(args); |
||||||
|
} |
||||||
|
|
||||||
|
void LrsLb::Helper::UpdateState(grpc_connectivity_state state, |
||||||
|
std::unique_ptr<SubchannelPicker> picker) { |
||||||
|
if (lrs_policy_->shutting_down_) return; |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[lrs_lb %p] child connectivity state update: state=%s picker=%p", |
||||||
|
lrs_policy_.get(), ConnectivityStateName(state), picker.get()); |
||||||
|
} |
||||||
|
// Save the state and picker.
|
||||||
|
lrs_policy_->state_ = state; |
||||||
|
lrs_policy_->picker_ = MakeRefCounted<RefCountedPicker>(std::move(picker)); |
||||||
|
// Wrap the picker and return it to the channel.
|
||||||
|
lrs_policy_->MaybeUpdatePickerLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void LrsLb::Helper::RequestReresolution() { |
||||||
|
if (lrs_policy_->shutting_down_) return; |
||||||
|
lrs_policy_->channel_control_helper()->RequestReresolution(); |
||||||
|
} |
||||||
|
|
||||||
|
void LrsLb::Helper::AddTraceEvent(TraceSeverity severity, StringView message) { |
||||||
|
if (lrs_policy_->shutting_down_) return; |
||||||
|
lrs_policy_->channel_control_helper()->AddTraceEvent(severity, message); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// factory
|
||||||
|
//
|
||||||
|
|
||||||
|
class LrsLbFactory : public LoadBalancingPolicyFactory { |
||||||
|
public: |
||||||
|
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
||||||
|
LoadBalancingPolicy::Args args) const override { |
||||||
|
RefCountedPtr<XdsClient> xds_client = |
||||||
|
XdsClient::GetFromChannelArgs(*args.args); |
||||||
|
if (xds_client == nullptr) { |
||||||
|
gpr_log(GPR_ERROR, |
||||||
|
"XdsClient not present in channel args -- cannot instantiate " |
||||||
|
"lrs LB policy"); |
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
return MakeOrphanable<LrsLb>(std::move(xds_client), std::move(args)); |
||||||
|
} |
||||||
|
|
||||||
|
const char* name() const override { return kLrs; } |
||||||
|
|
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
||||||
|
const Json& json, grpc_error** error) const override { |
||||||
|
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); |
||||||
|
if (json.type() == Json::Type::JSON_NULL) { |
||||||
|
// lrs was mentioned as a policy in the deprecated loadBalancingPolicy
|
||||||
|
// field or in the client API.
|
||||||
|
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:loadBalancingPolicy error:lrs policy requires configuration. " |
||||||
|
"Please use loadBalancingConfig field of service config instead."); |
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
std::vector<grpc_error*> error_list; |
||||||
|
// Child policy.
|
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> child_policy; |
||||||
|
auto it = json.object_value().find("childPolicy"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:childPolicy error:required field missing")); |
||||||
|
} else { |
||||||
|
grpc_error* parse_error = GRPC_ERROR_NONE; |
||||||
|
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( |
||||||
|
it->second, &parse_error); |
||||||
|
if (child_policy == nullptr) { |
||||||
|
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); |
||||||
|
std::vector<grpc_error*> child_errors; |
||||||
|
child_errors.push_back(parse_error); |
||||||
|
error_list.push_back( |
||||||
|
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); |
||||||
|
} |
||||||
|
} |
||||||
|
// Cluster name.
|
||||||
|
std::string cluster_name; |
||||||
|
it = json.object_value().find("clusterName"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:clusterName error:required field missing")); |
||||||
|
} else if (it->second.type() != Json::Type::STRING) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:clusterName error:type should be string")); |
||||||
|
} else { |
||||||
|
cluster_name = it->second.string_value(); |
||||||
|
} |
||||||
|
// EDS service name.
|
||||||
|
std::string eds_service_name; |
||||||
|
it = json.object_value().find("edsServiceName"); |
||||||
|
if (it != json.object_value().end()) { |
||||||
|
if (it->second.type() != Json::Type::STRING) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:edsServiceName error:type should be string")); |
||||||
|
} else { |
||||||
|
eds_service_name = it->second.string_value(); |
||||||
|
} |
||||||
|
} |
||||||
|
// Locality.
|
||||||
|
RefCountedPtr<XdsLocalityName> locality_name; |
||||||
|
it = json.object_value().find("locality"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:locality error:required field missing")); |
||||||
|
} else { |
||||||
|
std::vector<grpc_error*> child_errors = |
||||||
|
ParseLocality(it->second, &locality_name); |
||||||
|
if (!child_errors.empty()) { |
||||||
|
error_list.push_back( |
||||||
|
GRPC_ERROR_CREATE_FROM_VECTOR("field:locality", &child_errors)); |
||||||
|
} |
||||||
|
} |
||||||
|
// LRS load reporting server name.
|
||||||
|
std::string lrs_load_reporting_server_name; |
||||||
|
it = json.object_value().find("lrsLoadReportingServerName"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:lrsLoadReportingServerName error:required field missing")); |
||||||
|
} else if (it->second.type() != Json::Type::STRING) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:lrsLoadReportingServerName error:type should be string")); |
||||||
|
} else { |
||||||
|
lrs_load_reporting_server_name = it->second.string_value(); |
||||||
|
} |
||||||
|
if (!error_list.empty()) { |
||||||
|
*error = GRPC_ERROR_CREATE_FROM_VECTOR( |
||||||
|
"lrs_experimental LB policy config", &error_list); |
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
return MakeRefCounted<LrsLbConfig>( |
||||||
|
std::move(child_policy), std::move(cluster_name), |
||||||
|
std::move(eds_service_name), std::move(lrs_load_reporting_server_name), |
||||||
|
std::move(locality_name)); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
static std::vector<grpc_error*> ParseLocality( |
||||||
|
const Json& json, RefCountedPtr<XdsLocalityName>* name) { |
||||||
|
std::vector<grpc_error*> error_list; |
||||||
|
if (json.type() != Json::Type::OBJECT) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"locality field is not an object")); |
||||||
|
return error_list; |
||||||
|
} |
||||||
|
std::string region; |
||||||
|
auto it = json.object_value().find("region"); |
||||||
|
if (it != json.object_value().end()) { |
||||||
|
if (it->second.type() != Json::Type::STRING) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"\"region\" field is not a string")); |
||||||
|
} else { |
||||||
|
region = it->second.string_value(); |
||||||
|
} |
||||||
|
} |
||||||
|
std::string zone; |
||||||
|
it = json.object_value().find("zone"); |
||||||
|
if (it != json.object_value().end()) { |
||||||
|
if (it->second.type() != Json::Type::STRING) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"\"zone\" field is not a string")); |
||||||
|
} else { |
||||||
|
zone = it->second.string_value(); |
||||||
|
} |
||||||
|
} |
||||||
|
std::string subzone; |
||||||
|
it = json.object_value().find("subzone"); |
||||||
|
if (it != json.object_value().end()) { |
||||||
|
if (it->second.type() != Json::Type::STRING) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"\"subzone\" field is not a string")); |
||||||
|
} else { |
||||||
|
subzone = it->second.string_value(); |
||||||
|
} |
||||||
|
} |
||||||
|
if (region.empty() && zone.empty() && subzone.empty()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"at least one of region, zone, or subzone must be set")); |
||||||
|
} |
||||||
|
if (error_list.empty()) { |
||||||
|
*name = MakeRefCounted<XdsLocalityName>(region, zone, subzone); |
||||||
|
} |
||||||
|
return error_list; |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
//
|
||||||
|
// Plugin registration
|
||||||
|
//
|
||||||
|
|
||||||
|
void grpc_lb_policy_lrs_init() { |
||||||
|
grpc_core::LoadBalancingPolicyRegistry::Builder:: |
||||||
|
RegisterLoadBalancingPolicyFactory( |
||||||
|
absl::make_unique<grpc_core::LrsLbFactory>()); |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_lb_policy_lrs_shutdown() {} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,159 @@ |
|||||||
|
# Copyright 2020 The gRPC Authors |
||||||
|
# |
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
# you may not use this file except in compliance with the License. |
||||||
|
# You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, software |
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
# See the License for the specific language governing permissions and |
||||||
|
# limitations under the License. |
||||||
|
"""Tests behavior of the wait for connection API on client side.""" |
||||||
|
|
||||||
|
import asyncio |
||||||
|
import logging |
||||||
|
import unittest |
||||||
|
import datetime |
||||||
|
from typing import Callable, Tuple |
||||||
|
|
||||||
|
import grpc |
||||||
|
from grpc.experimental import aio |
||||||
|
|
||||||
|
from tests_aio.unit._test_base import AioTestBase |
||||||
|
from tests_aio.unit._test_server import start_test_server |
||||||
|
from tests_aio.unit import _common |
||||||
|
from src.proto.grpc.testing import messages_pb2, test_pb2_grpc |
||||||
|
from tests_aio.unit._constants import UNREACHABLE_TARGET |
||||||
|
|
||||||
|
_REQUEST = b'\x01\x02\x03' |
||||||
|
_TEST_METHOD = '/test/Test' |
||||||
|
|
||||||
|
_NUM_STREAM_RESPONSES = 5 |
||||||
|
_REQUEST_PAYLOAD_SIZE = 7 |
||||||
|
_RESPONSE_PAYLOAD_SIZE = 42 |
||||||
|
|
||||||
|
|
||||||
|
class TestWaitForConnection(AioTestBase): |
||||||
|
"""Tests if wait_for_connection raises connectivity issue.""" |
||||||
|
|
||||||
|
async def setUp(self): |
||||||
|
address, self._server = await start_test_server() |
||||||
|
self._channel = aio.insecure_channel(address) |
||||||
|
self._dummy_channel = aio.insecure_channel(UNREACHABLE_TARGET) |
||||||
|
self._stub = test_pb2_grpc.TestServiceStub(self._channel) |
||||||
|
|
||||||
|
async def tearDown(self): |
||||||
|
await self._dummy_channel.close() |
||||||
|
await self._channel.close() |
||||||
|
await self._server.stop(None) |
||||||
|
|
||||||
|
async def test_unary_unary_ok(self): |
||||||
|
call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) |
||||||
|
|
||||||
|
# No exception raised and no message swallowed. |
||||||
|
await call.wait_for_connection() |
||||||
|
|
||||||
|
response = await call |
||||||
|
self.assertIsInstance(response, messages_pb2.SimpleResponse) |
||||||
|
|
||||||
|
async def test_unary_stream_ok(self): |
||||||
|
request = messages_pb2.StreamingOutputCallRequest() |
||||||
|
for _ in range(_NUM_STREAM_RESPONSES): |
||||||
|
request.response_parameters.append( |
||||||
|
messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) |
||||||
|
|
||||||
|
call = self._stub.StreamingOutputCall(request) |
||||||
|
|
||||||
|
# No exception raised and no message swallowed. |
||||||
|
await call.wait_for_connection() |
||||||
|
|
||||||
|
response_cnt = 0 |
||||||
|
async for response in call: |
||||||
|
response_cnt += 1 |
||||||
|
self.assertIs(type(response), |
||||||
|
messages_pb2.StreamingOutputCallResponse) |
||||||
|
self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) |
||||||
|
|
||||||
|
self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt) |
||||||
|
self.assertEqual(await call.code(), grpc.StatusCode.OK) |
||||||
|
|
||||||
|
async def test_stream_unary_ok(self): |
||||||
|
call = self._stub.StreamingInputCall() |
||||||
|
|
||||||
|
# No exception raised and no message swallowed. |
||||||
|
await call.wait_for_connection() |
||||||
|
|
||||||
|
payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE) |
||||||
|
request = messages_pb2.StreamingInputCallRequest(payload=payload) |
||||||
|
|
||||||
|
for _ in range(_NUM_STREAM_RESPONSES): |
||||||
|
await call.write(request) |
||||||
|
await call.done_writing() |
||||||
|
|
||||||
|
response = await call |
||||||
|
self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse) |
||||||
|
self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE, |
||||||
|
response.aggregated_payload_size) |
||||||
|
|
||||||
|
self.assertEqual(await call.code(), grpc.StatusCode.OK) |
||||||
|
|
||||||
|
async def test_stream_stream_ok(self): |
||||||
|
call = self._stub.FullDuplexCall() |
||||||
|
|
||||||
|
# No exception raised and no message swallowed. |
||||||
|
await call.wait_for_connection() |
||||||
|
|
||||||
|
request = messages_pb2.StreamingOutputCallRequest() |
||||||
|
request.response_parameters.append( |
||||||
|
messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) |
||||||
|
|
||||||
|
for _ in range(_NUM_STREAM_RESPONSES): |
||||||
|
await call.write(request) |
||||||
|
response = await call.read() |
||||||
|
self.assertIsInstance(response, |
||||||
|
messages_pb2.StreamingOutputCallResponse) |
||||||
|
self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) |
||||||
|
|
||||||
|
await call.done_writing() |
||||||
|
|
||||||
|
self.assertEqual(grpc.StatusCode.OK, await call.code()) |
||||||
|
|
||||||
|
async def test_unary_unary_error(self): |
||||||
|
call = self._dummy_channel.unary_unary(_TEST_METHOD)(_REQUEST) |
||||||
|
|
||||||
|
with self.assertRaises(aio.AioRpcError) as exception_context: |
||||||
|
await call.wait_for_connection() |
||||||
|
rpc_error = exception_context.exception |
||||||
|
self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code()) |
||||||
|
|
||||||
|
async def test_unary_stream_error(self): |
||||||
|
call = self._dummy_channel.unary_stream(_TEST_METHOD)(_REQUEST) |
||||||
|
|
||||||
|
with self.assertRaises(aio.AioRpcError) as exception_context: |
||||||
|
await call.wait_for_connection() |
||||||
|
rpc_error = exception_context.exception |
||||||
|
self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code()) |
||||||
|
|
||||||
|
async def test_stream_unary_error(self): |
||||||
|
call = self._dummy_channel.stream_unary(_TEST_METHOD)() |
||||||
|
|
||||||
|
with self.assertRaises(aio.AioRpcError) as exception_context: |
||||||
|
await call.wait_for_connection() |
||||||
|
rpc_error = exception_context.exception |
||||||
|
self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code()) |
||||||
|
|
||||||
|
async def test_stream_stream_error(self): |
||||||
|
call = self._dummy_channel.stream_stream(_TEST_METHOD)() |
||||||
|
|
||||||
|
with self.assertRaises(aio.AioRpcError) as exception_context: |
||||||
|
await call.wait_for_connection() |
||||||
|
rpc_error = exception_context.exception |
||||||
|
self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code()) |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__': |
||||||
|
logging.basicConfig(level=logging.DEBUG) |
||||||
|
unittest.main(verbosity=2) |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue