xds_override_host LB: add basic support for overriding host (#31954)

This is same as #31819 but with fixed compilation issues.
pull/31035/head
Eugene Ostroukhov 2 years ago committed by GitHub
parent feb43c589f
commit 878a0ea6c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 2
      build_autogenerated.yaml
  3. 2
      gRPC-C++.podspec
  4. 2
      gRPC-Core.podspec
  5. 1
      grpc.gemspec
  6. 1
      package.xml
  7. 6
      src/core/BUILD
  8. 5
      src/core/ext/filters/client_channel/client_channel.h
  9. 39
      src/core/ext/filters/client_channel/lb_call_state_internal.h
  10. 187
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
  11. 72
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  12. 140
      test/core/client_channel/lb_policy/xds_override_host_test.cc
  13. 1
      tools/doxygen/Doxyfile.c++.internal
  14. 1
      tools/doxygen/Doxyfile.core.internal

@ -2647,6 +2647,7 @@ grpc_cc_library(
"//src/core:ext/filters/client_channel/global_subchannel_pool.h",
"//src/core:ext/filters/client_channel/health/health_check_client.h",
"//src/core:ext/filters/client_channel/http_proxy.h",
"//src/core:ext/filters/client_channel/lb_call_state_internal.h",
"//src/core:ext/filters/client_channel/lb_policy/child_policy_handler.h",
"//src/core:ext/filters/client_channel/lb_policy/oob_backend_metric.h",
"//src/core:ext/filters/client_channel/local_subchannel_pool.h",

@ -335,6 +335,7 @@ libs:
- src/core/ext/filters/client_channel/global_subchannel_pool.h
- src/core/ext/filters/client_channel/health/health_check_client.h
- src/core/ext/filters/client_channel/http_proxy.h
- src/core/ext/filters/client_channel/lb_call_state_internal.h
- src/core/ext/filters/client_channel/lb_policy/address_filtering.h
- src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h
@ -1935,6 +1936,7 @@ libs:
- src/core/ext/filters/client_channel/global_subchannel_pool.h
- src/core/ext/filters/client_channel/health/health_check_client.h
- src/core/ext/filters/client_channel/http_proxy.h
- src/core/ext/filters/client_channel/lb_call_state_internal.h
- src/core/ext/filters/client_channel/lb_policy/address_filtering.h
- src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h

2
gRPC-C++.podspec generated

@ -250,6 +250,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
'src/core/ext/filters/client_channel/http_proxy.h',
'src/core/ext/filters/client_channel/lb_call_state_internal.h',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.h',
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
@ -1161,6 +1162,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
'src/core/ext/filters/client_channel/http_proxy.h',
'src/core/ext/filters/client_channel/lb_call_state_internal.h',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.h',
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',

2
gRPC-Core.podspec generated

@ -234,6 +234,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/health/health_check_client.h',
'src/core/ext/filters/client_channel/http_proxy.cc',
'src/core/ext/filters/client_channel/http_proxy.h',
'src/core/ext/filters/client_channel/lb_call_state_internal.h',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.cc',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.h',
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
@ -1833,6 +1834,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/global_subchannel_pool.h',
'src/core/ext/filters/client_channel/health/health_check_client.h',
'src/core/ext/filters/client_channel/http_proxy.h',
'src/core/ext/filters/client_channel/lb_call_state_internal.h',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.h',
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',

1
grpc.gemspec generated

@ -145,6 +145,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/health/health_check_client.h )
s.files += %w( src/core/ext/filters/client_channel/http_proxy.cc )
s.files += %w( src/core/ext/filters/client_channel/http_proxy.h )
s.files += %w( src/core/ext/filters/client_channel/lb_call_state_internal.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/address_filtering.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/address_filtering.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h )

1
package.xml generated

@ -127,6 +127,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/health/health_check_client.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/http_proxy.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/http_proxy.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_call_state_internal.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/address_filtering.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/address_filtering.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" role="src" />

@ -4449,13 +4449,18 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/xds/xds_override_host.cc",
],
external_deps = [
"absl/base:core_headers",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/synchronization",
"absl/types:optional",
"absl/types:variant",
],
language = "c++",
deps = [
"channel_args",
"grpc_stateful_session_filter",
"json",
"json_args",
"json_object_loader",
@ -4474,6 +4479,7 @@ grpc_cc_library(
"//:orphanable",
"//:ref_counted_ptr",
"//:server_address",
"//:sockaddr_utils",
],
)

@ -40,6 +40,7 @@
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/dynamic_filters.h"
#include "src/core/ext/filters/client_channel/lb_call_state_internal.h"
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
@ -390,7 +391,7 @@ class ClientChannel {
class ClientChannel::LoadBalancedCall
: public InternallyRefCounted<LoadBalancedCall, kUnrefCallDtor> {
public:
class LbCallState : public LoadBalancingPolicy::CallState {
class LbCallState : public LbCallStateInternal {
public:
explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
@ -398,7 +399,7 @@ class ClientChannel::LoadBalancedCall
// Internal API to allow first-party LB policies to access per-call
// attributes set by the ConfigSelector.
absl::string_view GetCallAttribute(UniqueTypeName type);
absl::string_view GetCallAttribute(UniqueTypeName type) override;
private:
LoadBalancedCall* lb_call_;

@ -0,0 +1,39 @@
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_CALL_STATE_INTERNAL_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_CALL_STATE_INTERNAL_H
#include <grpc/support/port_platform.h>
#include "absl/strings/string_view.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/load_balancing/lb_policy.h"
namespace grpc_core {
//
// LbCallStateInternal
//
class LbCallStateInternal : public LoadBalancingPolicy::CallState {
public:
virtual absl::string_view GetCallAttribute(UniqueTypeName type) = 0;
};
} // namespace grpc_core
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_CALL_STATE_INTERNAL_H

@ -16,20 +16,30 @@
#include <grpc/support/port_platform.h>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_call_state_internal.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
@ -101,12 +111,13 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
// present.
class Picker : public SubchannelPicker {
public:
Picker(XdsOverrideHostLb* xds_override_host_lb,
Picker(RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,
RefCountedPtr<SubchannelPicker> picker);
PickResult Pick(PickArgs args) override;
private:
RefCountedPtr<XdsOverrideHostLb> policy_;
RefCountedPtr<SubchannelPicker> picker_;
};
@ -133,6 +144,42 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
RefCountedPtr<XdsOverrideHostLb> xds_override_host_policy_;
};
class SubchannelWrapper : public DelegatingSubchannel {
public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
RefCountedPtr<XdsOverrideHostLb> policy,
absl::optional<const std::string> key);
~SubchannelWrapper() override;
private:
const absl::optional<const std::string> key_;
RefCountedPtr<XdsOverrideHostLb> policy_;
};
class SubchannelEntry {
public:
void SetSubchannel(SubchannelWrapper* subchannel) {
subchannel_ = subchannel;
}
void ResetSubchannel(SubchannelWrapper* expected) {
if (subchannel_ == expected) {
subchannel_ = nullptr;
}
}
RefCountedPtr<SubchannelWrapper> GetSubchannel() {
if (subchannel_ == nullptr) {
return nullptr;
}
return subchannel_->Ref();
}
private:
SubchannelWrapper* subchannel_ = nullptr;
};
~XdsOverrideHostLb() override;
void ShutdownLocked() override;
@ -142,6 +189,15 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
void MaybeUpdatePickerLocked();
RefCountedPtr<SubchannelWrapper> LookupSubchannel(absl::string_view address);
void UpdateAddressMap(const absl::StatusOr<ServerAddressList>& addresses);
RefCountedPtr<SubchannelWrapper> AdoptSubchannel(
ServerAddress address, RefCountedPtr<SubchannelInterface> subchannel);
void ResetSubchannel(absl::string_view key, SubchannelWrapper* subchannel);
// Current config from the resolver.
RefCountedPtr<XdsOverrideHostLbConfig> config_;
@ -154,29 +210,47 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
absl::Status status_;
RefCountedPtr<SubchannelPicker> picker_;
absl::Mutex subchannel_map_mu_;
std::map<std::string, SubchannelEntry, std::less<>> subchannel_map_
ABSL_GUARDED_BY(subchannel_map_mu_);
};
//
// XdsOverrideHostLb::Picker
//
XdsOverrideHostLb::Picker::Picker(XdsOverrideHostLb* xds_override_host_lb,
RefCountedPtr<SubchannelPicker> picker)
: picker_(std::move(picker)) {
XdsOverrideHostLb::Picker::Picker(
RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,
RefCountedPtr<SubchannelPicker> picker)
: policy_(std::move(xds_override_host_lb)), picker_(std::move(picker)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] constructed new picker %p",
xds_override_host_lb, this);
policy_.get(), this);
}
}
LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick(
LoadBalancingPolicy::PickArgs args) {
auto* call_state = static_cast<LbCallStateInternal*>(args.call_state);
auto override_host = call_state->GetCallAttribute(XdsHostOverrideTypeName());
if (!override_host.empty()) {
auto subchannel = policy_->LookupSubchannel(override_host);
if (subchannel != nullptr) {
return PickResult::Complete(subchannel->wrapped_subchannel());
}
}
if (picker_ == nullptr) { // Should never happen.
return PickResult::Fail(absl::InternalError(
"xds_override_host picker not given any child picker"));
}
// Delegate to child picker
return picker_->Pick(args);
auto result = picker_->Pick(args);
auto complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
complete_pick->subchannel =
static_cast<SubchannelWrapper*>(complete_pick->subchannel.get())
->wrapped_subchannel();
}
return result;
}
//
@ -237,6 +311,7 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args.args);
}
UpdateAddressMap(args.addresses);
// Update child policy.
UpdateArgs update_args;
update_args.addresses = std::move(args.addresses);
@ -253,7 +328,7 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
void XdsOverrideHostLb::MaybeUpdatePickerLocked() {
if (picker_ != nullptr) {
auto xds_override_host_picker = MakeRefCounted<Picker>(this, picker_);
auto xds_override_host_picker = MakeRefCounted<Picker>(Ref(), picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] updating connectivity: state=%s "
@ -289,27 +364,87 @@ OrphanablePtr<LoadBalancingPolicy> XdsOverrideHostLb::CreateChildPolicyLocked(
return lb_policy;
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
XdsOverrideHostLb::LookupSubchannel(absl::string_view address) {
absl::MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(address);
if (it != subchannel_map_.end()) {
return it->second.GetSubchannel();
}
return nullptr;
}
void XdsOverrideHostLb::UpdateAddressMap(
const absl::StatusOr<ServerAddressList>& addresses) {
std::unordered_set<std::string> keys(addresses->size());
if (addresses.ok()) {
for (const auto& address : *addresses) {
auto key = grpc_sockaddr_to_string(&address.address(), false);
if (key.ok()) {
keys.insert(std::move(*key));
}
}
}
absl::MutexLock lock(&subchannel_map_mu_);
for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) {
if (keys.find(it->first) == keys.end()) {
it = subchannel_map_.erase(it);
} else {
++it;
}
}
for (const auto& key : keys) {
if (subchannel_map_.find(key) == subchannel_map_.end()) {
subchannel_map_.emplace(key, SubchannelEntry());
}
}
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
XdsOverrideHostLb::AdoptSubchannel(
ServerAddress address, RefCountedPtr<SubchannelInterface> subchannel) {
auto subchannel_key = grpc_sockaddr_to_string(&address.address(), false);
absl::optional<std::string> key;
if (subchannel_key.ok()) {
key = std::move(*subchannel_key);
}
auto wrapper =
MakeRefCounted<SubchannelWrapper>(std::move(subchannel), Ref(), key);
if (key.has_value()) {
absl::MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(*key);
if (it != subchannel_map_.end()) {
it->second.SetSubchannel(wrapper.get());
}
}
return wrapper;
}
void XdsOverrideHostLb::ResetSubchannel(absl::string_view key,
SubchannelWrapper* subchannel) {
absl::MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(key);
if (it != subchannel_map_.end()) {
it->second.ResetSubchannel(subchannel);
}
}
//
// XdsOverrideHostLb::Helper
//
RefCountedPtr<SubchannelInterface> XdsOverrideHostLb::Helper::CreateSubchannel(
ServerAddress address, const ChannelArgs& args) {
return xds_override_host_policy_->channel_control_helper()->CreateSubchannel(
address, args);
auto subchannel =
xds_override_host_policy_->channel_control_helper()->CreateSubchannel(
address, args);
return xds_override_host_policy_->AdoptSubchannel(address, subchannel);
}
void XdsOverrideHostLb::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker) {
if (xds_override_host_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] child connectivity state update: "
"state=%s (%s) picker=%p",
xds_override_host_policy_.get(), ConnectivityStateName(state),
status.ToString().c_str(), picker.get());
}
// Save the state and picker.
xds_override_host_policy_->state_ = state;
xds_override_host_policy_->status_ = status;
@ -339,6 +474,24 @@ void XdsOverrideHostLb::Helper::AddTraceEvent(TraceSeverity severity,
message);
}
//
// XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper
//
XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper(
RefCountedPtr<SubchannelInterface> subchannel,
RefCountedPtr<XdsOverrideHostLb> policy,
absl::optional<const std::string> key)
: DelegatingSubchannel(std::move(subchannel)),
key_(std::move(key)),
policy_(std::move(policy)) {}
XdsOverrideHostLb::SubchannelWrapper::~SubchannelWrapper() {
if (key_.has_value()) {
policy_->ResetSubchannel(*key_, this);
}
}
//
// factory
//

@ -48,6 +48,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_call_state_internal.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
@ -59,6 +60,7 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolved_address.h"
@ -358,8 +360,11 @@ class LoadBalancingPolicyTest : public ::testing::Test {
};
// A fake CallState implementation, for use in PickArgs.
class FakeCallState : public LoadBalancingPolicy::CallState {
class FakeCallState : public LbCallStateInternal {
public:
explicit FakeCallState(std::map<UniqueTypeName, std::string> attributes)
: attributes_(attributes) {}
~FakeCallState() override {
for (void* allocation : allocations_) {
gpr_free(allocation);
@ -373,7 +378,12 @@ class LoadBalancingPolicyTest : public ::testing::Test {
return allocation;
}
absl::string_view GetCallAttribute(UniqueTypeName type) override {
return attributes_[type];
}
std::vector<void*> allocations_;
std::map<UniqueTypeName, std::string> attributes_;
};
LoadBalancingPolicyTest()
@ -549,13 +559,17 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// Waits for the round_robin policy to start using an updated address list.
// There can be any number of READY updates where the picker is still using
// the old list followed by one READY update where the picker is using the
// new list. Returns true if the reported states match expectations.
bool WaitForRoundRobinListChange(
// new list. Returns a picker if the reported states match expectations.
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
WaitForRoundRobinListChange(
absl::Span<const absl::string_view> old_addresses,
absl::Span<const absl::string_view> new_addresses,
const std::map<UniqueTypeName, std::string> call_attributes = {},
size_t num_iterations = 3, SourceLocation location = SourceLocation()) {
gpr_log(GPR_INFO, "Waiting for expected RR addresses...");
bool retval = false;
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> retval;
size_t num_picks =
std::max(new_addresses.size(), old_addresses.size()) * num_iterations;
WaitForStateUpdate(
[&](FakeHelper::StateUpdate update) {
EXPECT_EQ(update.state, GRPC_CHANNEL_READY)
@ -563,8 +577,8 @@ class LoadBalancingPolicyTest : public ::testing::Test {
if (update.state != GRPC_CHANNEL_READY) return false;
// Get enough picks to round-robin num_iterations times across all
// expected addresses.
auto picks = GetCompletePicks(update.picker.get(),
new_addresses.size() * num_iterations);
auto picks = GetCompletePicks(update.picker.get(), num_picks,
call_attributes, location);
EXPECT_TRUE(picks.has_value())
<< location.file() << ":" << location.line();
if (!picks.has_value()) return false;
@ -572,11 +586,14 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// If the picks still match the old list, then keep going.
if (PicksAreRoundRobin(old_addresses, *picks)) return true;
// Otherwise, the picks should match the new list.
retval = PicksAreRoundRobin(new_addresses, *picks);
EXPECT_TRUE(retval)
bool matches = PicksAreRoundRobin(new_addresses, *picks);
EXPECT_TRUE(matches)
<< "Expected: " << absl::StrJoin(new_addresses, ", ")
<< "\nActual: " << absl::StrJoin(*picks, ", ") << "\nat "
<< location.file() << ":" << location.line();
if (matches) {
retval = std::move(update.picker);
}
return false; // Stop.
},
location);
@ -599,12 +616,18 @@ class LoadBalancingPolicyTest : public ::testing::Test {
location);
}
static std::unique_ptr<LoadBalancingPolicy::MetadataInterface> MakeMetadata(
std::map<std::string, std::string> init = {}) {
return std::make_unique<FakeMetadata>(init);
}
// Does a pick and returns the result.
LoadBalancingPolicy::PickResult DoPick(
LoadBalancingPolicy::SubchannelPicker* picker) {
LoadBalancingPolicy::SubchannelPicker* picker,
const std::map<UniqueTypeName, std::string>& call_attributes = {}) {
ExecCtx exec_ctx;
FakeMetadata metadata({});
FakeCallState call_state;
FakeCallState call_state(call_attributes);
return picker->Pick({"/service/method", &metadata, &call_state});
}
@ -623,8 +646,9 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// the result was something other than Complete.
absl::optional<std::string> ExpectPickComplete(
LoadBalancingPolicy::SubchannelPicker* picker,
const std::map<UniqueTypeName, std::string> call_attributes = {},
SourceLocation location = SourceLocation()) {
auto pick_result = DoPick(picker);
auto pick_result = DoPick(picker, call_attributes);
auto* complete = absl::get_if<LoadBalancingPolicy::PickResult::Complete>(
&pick_result.result);
EXPECT_NE(complete, nullptr) << PickResultString(pick_result) << " at "
@ -639,10 +663,15 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// list of addresses, or nullopt if a non-complete pick was returned.
absl::optional<std::vector<std::string>> GetCompletePicks(
LoadBalancingPolicy::SubchannelPicker* picker, size_t num_picks,
const std::map<UniqueTypeName, std::string> call_attributes = {},
SourceLocation location = SourceLocation()) {
EXPECT_NE(picker, nullptr);
if (picker == nullptr) {
return absl::nullopt;
}
std::vector<std::string> results;
for (size_t i = 0; i < num_picks; ++i) {
auto address = ExpectPickComplete(picker, location);
auto address = ExpectPickComplete(picker, call_attributes, location);
if (!address.has_value()) return absl::nullopt;
results.emplace_back(std::move(*address));
}
@ -668,17 +697,18 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// Checks that the picker has round-robin behavior over the specified
// set of addresses.
void ExpectRoundRobinPicks(LoadBalancingPolicy::SubchannelPicker* picker,
absl::Span<const absl::string_view> addresses,
size_t num_iterations = 3,
SourceLocation location = SourceLocation()) {
auto picks =
GetCompletePicks(picker, num_iterations * addresses.size(), location);
void ExpectRoundRobinPicks(
LoadBalancingPolicy::SubchannelPicker* picker,
absl::Span<const absl::string_view> addresses,
const std::map<UniqueTypeName, std::string> call_attributes = {},
size_t num_iterations = 3, SourceLocation location = SourceLocation()) {
auto picks = GetCompletePicks(picker, num_iterations * addresses.size(),
call_attributes, location);
ASSERT_TRUE(picks.has_value()) << location.file() << ":" << location.line();
EXPECT_TRUE(PicksAreRoundRobin(addresses, *picks))
<< "Expected: " << absl::StrJoin(addresses, ", ")
<< "Actual: " << absl::StrJoin(*picks, ", ") << location.file() << ":"
<< location.line();
<< " Actual: " << absl::StrJoin(*picks, ", ")
<< "\n Expected: " << absl::StrJoin(addresses, ", ") << "\n"
<< location.file() << ":" << location.line();
}
// Requests a picker on picker and expects a Fail result.

@ -17,6 +17,7 @@
#include <stddef.h>
#include <array>
#include <map>
#include <string>
#include "absl/status/status.h"
@ -26,8 +27,10 @@
#include <grpc/grpc.h>
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
@ -50,47 +53,38 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
Json::Object{{"childPolicy", Json::Array{{child_policy_config}}}}}}});
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses) {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, MakeXdsOverrideHostConfig()),
policy_.get()),
absl::OkStatus());
ExpectConnectingUpdate();
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
EXPECT_NE(subchannel, nullptr);
if (subchannel == nullptr) return nullptr;
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
if (i == 0) {
picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {addresses[0]});
} else {
picker = WaitForRoundRobinListChange(
absl::MakeSpan(addresses).subspan(0, i),
absl::MakeSpan(addresses).subspan(0, i + 1));
}
}
return picker;
}
OrphanablePtr<LoadBalancingPolicy> policy_;
};
TEST_F(XdsOverrideHostTest, DelegatesToChild) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, MakeXdsOverrideHostConfig()),
policy_.get()),
absl::OkStatus());
// Expect the initial CONNECTNG update with a picker that queues.
ExpectConnectingUpdate();
// RR should have created a subchannel for each address.
for (size_t i = 0; i < kAddresses.size(); ++i) {
auto* subchannel = FindSubchannel(kAddresses[i]);
ASSERT_NE(subchannel, nullptr);
// RR should ask each subchannel to connect.
EXPECT_TRUE(subchannel->ConnectionRequested());
// The subchannel will connect successfully.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// As each subchannel becomes READY, we should get a new picker that
// includes the behavior. Note that there may be any number of
// duplicate updates for the previous state in the queue before the
// update that we actually want to see.
if (i == 0) {
// When the first subchannel becomes READY, accept any number of
// CONNECTING updates with a picker that queues followed by a READY
// update with a picker that repeatedly returns only the first address.
auto picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {kAddresses[0]});
} else {
// When each subsequent subchannel becomes READY, we accept any number
// of READY updates where the picker returns only the previously
// connected subchannel(s) followed by a READY update where the picker
// returns the previously connected subchannel(s) *and* the newly
// connected subchannel.
WaitForRoundRobinListChange(absl::MakeSpan(kAddresses).subspan(0, i),
absl::MakeSpan(kAddresses).subspan(0, i + 1));
}
}
ExpectStartupWithRoundRobin(
{"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"});
}
TEST_F(XdsOverrideHostTest, NoConfigReportsError) {
@ -100,6 +94,78 @@ TEST_F(XdsOverrideHostTest, NoConfigReportsError) {
absl::InvalidArgumentError("Missing policy config"));
}
TEST_F(XdsOverrideHostTest, OverrideHost) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Check that the host is overridden
std::map<UniqueTypeName, std::string> call_attributes{
{XdsHostOverrideTypeName(), "127.0.0.1:442"}};
EXPECT_EQ(ExpectPickComplete(picker.get(), call_attributes), kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(), call_attributes), kAddresses[1]);
call_attributes[XdsHostOverrideTypeName()] = std::string("127.0.0.1:441");
EXPECT_EQ(ExpectPickComplete(picker.get(), call_attributes), kAddresses[0]);
EXPECT_EQ(ExpectPickComplete(picker.get(), call_attributes), kAddresses[0]);
}
TEST_F(XdsOverrideHostTest, OverrideHostSubchannelNotFound) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
std::map<UniqueTypeName, std::string> call_attributes{
{XdsHostOverrideTypeName(), "no such host"}};
ExpectRoundRobinPicks(picker.get(), kAddresses, call_attributes);
}
TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Check that the host is overridden
std::map<UniqueTypeName, std::string> call_attributes{
{XdsHostOverrideTypeName(), "127.0.0.1:442"}};
ExpectRoundRobinPicks(picker.get(), {kAddresses[1]}, call_attributes);
// Some other address is gone
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[1]},
MakeXdsOverrideHostConfig()),
policy_.get()),
absl::OkStatus());
// Wait for LB policy to return a new picker that uses the updated
// addresses. We can't use the host override for this, because then
// we won't know when the new picker is actually using all of the new
// addresses.
picker =
WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[1]});
// Make sure host override still works.
ExpectRoundRobinPicks(picker.get(), {kAddresses[1]}, call_attributes);
// "Our" address is gone so others get returned in round-robin order
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[2]},
MakeXdsOverrideHostConfig()),
policy_.get()),
absl::OkStatus());
// Wait for LB policy to return the new picker.
// In this case, we can pass call_attributes while we wait instead of
// checking again afterward, because the host override won't actually
// be used.
WaitForRoundRobinListChange({kAddresses[0], kAddresses[1]},
{kAddresses[0], kAddresses[2]}, call_attributes);
// And now it is back
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[1], kAddresses[2]},
MakeXdsOverrideHostConfig()),
policy_.get()),
absl::OkStatus());
// Wait for LB policy to return the new picker.
picker = WaitForRoundRobinListChange({kAddresses[0], kAddresses[2]},
{kAddresses[1], kAddresses[2]});
// Make sure host override works.
ExpectRoundRobinPicks(picker.get(), {kAddresses[1]}, call_attributes);
}
} // namespace
} // namespace testing
} // namespace grpc_core

@ -1101,6 +1101,7 @@ src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/health/health_check_client.h \
src/core/ext/filters/client_channel/http_proxy.cc \
src/core/ext/filters/client_channel/http_proxy.h \
src/core/ext/filters/client_channel/lb_call_state_internal.h \
src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \
src/core/ext/filters/client_channel/lb_policy/address_filtering.h \
src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h \

@ -908,6 +908,7 @@ src/core/ext/filters/client_channel/health/health_check_client.cc \
src/core/ext/filters/client_channel/health/health_check_client.h \
src/core/ext/filters/client_channel/http_proxy.cc \
src/core/ext/filters/client_channel/http_proxy.h \
src/core/ext/filters/client_channel/lb_call_state_internal.h \
src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \
src/core/ext/filters/client_channel/lb_policy/address_filtering.h \
src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h \

Loading…
Cancel
Save