[ring_hash] fix propagation of channel args to pick_first child in updates (#33568)

I missed this in #33093.
pull/33573/head
Mark D. Roth 2 years ago committed by GitHub
parent 51e54ed636
commit 21f2eba143
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 26
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc

@ -230,7 +230,6 @@ class PickFirst : public LoadBalancingPolicy {
// Backpointer to owning policy.
RefCountedPtr<PickFirst> policy_;
const bool enable_health_watch_;
ChannelArgs args_;
// The list of subchannels.
@ -285,6 +284,8 @@ class PickFirst : public LoadBalancingPolicy {
void UnsetSelectedSubchannel();
// Whether we should enable health watching.
const bool enable_health_watch_;
// Whether we should omit our status message prefix.
const bool omit_status_message_prefix_;
// Lateset update args.
@ -309,6 +310,10 @@ class PickFirst : public LoadBalancingPolicy {
PickFirst::PickFirst(Args args)
: LoadBalancingPolicy(std::move(args)),
enable_health_watch_(
channel_args()
.GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
.value_or(false)),
omit_status_message_prefix_(
channel_args()
.GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)
@ -742,7 +747,7 @@ void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() {
// report a new picker -- we want to stay in CONNECTING while we wait
// for the health status notification.
// If health checking is NOT enabled, report READY.
if (subchannel_list_->enable_health_watch_) {
if (p->enable_health_watch_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "[PF %p] starting health watch", p);
}
@ -777,9 +782,6 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) ? "SubchannelList"
: nullptr),
policy_(std::move(policy)),
enable_health_watch_(
args.GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
.value_or(false)),
args_(args.Remove(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING)
.Remove(
GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)) {

@ -172,7 +172,8 @@ class RingHash : public LoadBalancingPolicy {
void Orphan() override;
size_t index() const { return index_; }
void set_index(size_t index) { index_ = index; }
void UpdateLocked(size_t index);
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
@ -199,6 +200,7 @@ class RingHash : public LoadBalancingPolicy {
class Helper;
void CreateChildPolicy();
void UpdateChildPolicyLocked();
// Called when the child policy reports a connectivity state update.
void OnStateUpdate(grpc_connectivity_state new_state,
@ -504,6 +506,11 @@ void RingHash::RingHashEndpoint::Orphan() {
Unref();
}
void RingHash::RingHashEndpoint::UpdateLocked(size_t index) {
index_ = index;
if (child_policy_ != nullptr) UpdateChildPolicyLocked();
}
void RingHash::RingHashEndpoint::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
@ -518,20 +525,19 @@ void RingHash::RingHashEndpoint::RequestConnectionLocked() {
void RingHash::RingHashEndpoint::CreateChildPolicy() {
GPR_ASSERT(child_policy_ == nullptr);
const ServerAddress& address = ring_hash_->addresses_[index_];
LoadBalancingPolicy::Args lb_policy_args;
auto child_args =
lb_policy_args.work_serializer = ring_hash_->work_serializer();
lb_policy_args.args =
ring_hash_->args_
.Set(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true)
.Set(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX, true);
lb_policy_args.work_serializer = ring_hash_->work_serializer();
lb_policy_args.args = child_args;
lb_policy_args.channel_control_helper =
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
child_policy_ =
CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
"pick_first", std::move(lb_policy_args));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
const ServerAddress& address = ring_hash_->addresses_[index_];
gpr_log(GPR_INFO,
"[RH %p] endpoint %p (index %" PRIuPTR " of %" PRIuPTR
", %s): created child policy %p",
@ -543,6 +549,10 @@ void RingHash::RingHashEndpoint::CreateChildPolicy() {
// this policy, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
ring_hash_->interested_parties());
UpdateChildPolicyLocked();
}
void RingHash::RingHashEndpoint::UpdateChildPolicyLocked() {
// Construct pick_first config.
auto config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
@ -551,8 +561,8 @@ void RingHash::RingHashEndpoint::CreateChildPolicy() {
GPR_ASSERT(config.ok());
// Update child policy.
LoadBalancingPolicy::UpdateArgs update_args;
update_args.addresses.emplace().emplace_back(address);
update_args.args = std::move(child_args);
update_args.addresses.emplace().emplace_back(ring_hash_->addresses_[index_]);
update_args.args = ring_hash_->args_;
update_args.config = std::move(*config);
// TODO(roth): If the child reports a non-OK status with the update,
// we need to propagate that back to the resolver somehow.
@ -644,7 +654,7 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
// If present in old map, retain it; otherwise, create a new one.
auto it = endpoint_map_.find(address);
if (it != endpoint_map_.end()) {
it->second->set_index(i);
it->second->UpdateLocked(i);
endpoint_map.emplace(address, std::move(it->second));
} else {
endpoint_map.emplace(address, MakeOrphanable<RingHashEndpoint>(Ref(), i));

Loading…
Cancel
Save