Introduce the xds_override_host policy (#31730)

Introduce the xds_override_host policy
reviewable/pr31717/r8^2
Eugene Ostroukhov 2 years ago committed by GitHub
parent 1d968a36aa
commit 1e13612d4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 77
      CMakeLists.txt
  3. 2
      Makefile
  4. 20
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 1
      gRPC-Core.podspec
  8. 1
      grpc.gemspec
  9. 1
      grpc.gyp
  10. 1
      package.xml
  11. 34
      src/core/BUILD
  12. 410
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
  13. 6
      src/core/plugin_registry/grpc_plugin_registry.cc
  14. 3
      src/core/plugin_registry/grpc_plugin_registry_extra.cc
  15. 1
      src/python/grpcio/grpc_core_dependencies.py
  16. 14
      test/core/client_channel/BUILD
  17. 12
      test/core/client_channel/lb_policy/BUILD
  18. 53
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  19. 88
      test/core/client_channel/lb_policy/xds_override_host_test.cc
  20. 119
      test/core/client_channel/xds_override_host_lb_config_parser_test.cc
  21. 1
      tools/doxygen/Doxyfile.c++.internal
  22. 1
      tools/doxygen/Doxyfile.core.internal
  23. 48
      tools/run_tests/generated/tests.json

@ -523,6 +523,7 @@ GRPC_XDS_TARGETS = [
"//src/core:grpc_lb_policy_xds_cluster_impl",
"//src/core:grpc_lb_policy_xds_cluster_manager",
"//src/core:grpc_lb_policy_xds_cluster_resolver",
"//src/core:grpc_lb_policy_xds_override_host",
"//src/core:grpc_lb_policy_xds_wrr_locality",
"//src/core:grpc_resolver_xds",
"//src/core:grpc_resolver_c2p",

77
CMakeLists.txt generated

@ -1290,6 +1290,8 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx xds_outlier_detection_end2end_test)
endif()
add_dependencies(buildtests_cxx xds_override_host_lb_config_parser_test)
add_dependencies(buildtests_cxx xds_override_host_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx xds_ring_hash_end2end_test)
endif()
@ -1699,6 +1701,7 @@ add_library(grpc
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc
src/core/ext/filters/client_channel/local_subchannel_pool.cc
src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc
@ -24156,6 +24159,80 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
endif()
endif()
if(gRPC_BUILD_TESTS)
add_executable(xds_override_host_lb_config_parser_test
test/core/client_channel/xds_override_host_lb_config_parser_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(xds_override_host_lb_config_parser_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(xds_override_host_lb_config_parser_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(xds_override_host_test
test/core/client_channel/lb_policy/xds_override_host_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(xds_override_host_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(xds_override_host_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

2
Makefile generated

@ -994,6 +994,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc \
@ -2904,6 +2905,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc: $(OPENSSL_D
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP)

@ -1082,6 +1082,7 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
- src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
- src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
- src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
- src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc
- src/core/ext/filters/client_channel/local_subchannel_pool.cc
- src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc
@ -13130,6 +13131,25 @@ targets:
- linux
- posix
- mac
- name: xds_override_host_lb_config_parser_test
gtest: true
build: test
language: c++
headers: []
src:
- test/core/client_channel/xds_override_host_lb_config_parser_test.cc
deps:
- grpc_test_util
- name: xds_override_host_test
gtest: true
build: test
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
src:
- test/core/client_channel/lb_policy/xds_override_host_test.cc
deps:
- grpc_test_util
- name: xds_ring_hash_end2end_test
gtest: true
build: test

1
config.m4 generated

@ -76,6 +76,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc \

1
config.w32 generated

@ -42,6 +42,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_impl.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_manager.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_resolver.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_override_host.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_wrr_locality.cc " +
"src\\core\\ext\\filters\\client_channel\\local_subchannel_pool.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver\\binder\\binder_resolver.cc " +

1
gRPC-Core.podspec generated

@ -264,6 +264,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.h',

1
grpc.gemspec generated

@ -175,6 +175,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc )
s.files += %w( src/core/ext/filters/client_channel/local_subchannel_pool.cc )
s.files += %w( src/core/ext/filters/client_channel/local_subchannel_pool.h )

1
grpc.gyp generated

@ -406,6 +406,7 @@
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',
'src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc',

1
package.xml generated

@ -157,6 +157,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/local_subchannel_pool.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/local_subchannel_pool.h" role="src" />

@ -4309,6 +4309,40 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "grpc_lb_policy_xds_override_host",
srcs = [
"ext/filters/client_channel/lb_policy/xds/xds_override_host.cc",
],
external_deps = [
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
],
language = "c++",
deps = [
"channel_args",
"grpc_lb_subchannel_list",
"json",
"json_args",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"pollset_set",
"subchannel_interface",
"//:config",
"//:debug_location",
"//:gpr",
"//:grpc_base",
"//:grpc_client_channel",
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
"//:server_address",
],
)
grpc_cc_library(
name = "lb_server_load_reporting_filter",
srcs = [

@ -0,0 +1,410 @@
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <string.h>
#include <algorithm>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
TraceFlag grpc_lb_xds_override_host_trace(false, "xds_override_host_lb");
namespace {
//
// xds_override_host LB policy
//
constexpr absl::string_view kXdsOverrideHost = "xds_override_host_experimental";
// Config for stateful session LB policy.
class XdsOverrideHostLbConfig : public LoadBalancingPolicy::Config {
public:
XdsOverrideHostLbConfig() = default;
XdsOverrideHostLbConfig(const XdsOverrideHostLbConfig&) = delete;
XdsOverrideHostLbConfig& operator=(const XdsOverrideHostLbConfig&) = delete;
XdsOverrideHostLbConfig(XdsOverrideHostLbConfig&& other) = delete;
XdsOverrideHostLbConfig& operator=(XdsOverrideHostLbConfig&& other) = delete;
absl::string_view name() const override { return kXdsOverrideHost; }
RefCountedPtr<LoadBalancingPolicy::Config> child_config() const {
return child_config_;
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&,
ValidationErrors* errors);
private:
RefCountedPtr<LoadBalancingPolicy::Config> child_config_;
};
// xDS Cluster Impl LB policy.
class XdsOverrideHostLb : public LoadBalancingPolicy {
public:
explicit XdsOverrideHostLb(Args args);
absl::string_view name() const override { return kXdsOverrideHost; }
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
// A picker that wraps the picker from the child for cases when cookie is
// present.
class Picker : public SubchannelPicker {
public:
Picker(XdsOverrideHostLb* xds_override_host_lb,
RefCountedPtr<SubchannelPicker> picker);
PickResult Pick(PickArgs args) override;
private:
RefCountedPtr<SubchannelPicker> picker_;
};
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<XdsOverrideHostLb> xds_override_host_policy)
: xds_override_host_policy_(std::move(xds_override_host_policy)) {}
~Helper() override {
xds_override_host_policy_.reset(DEBUG_LOCATION, "Helper");
}
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
private:
RefCountedPtr<XdsOverrideHostLb> xds_override_host_policy_;
};
~XdsOverrideHostLb() override;
void ShutdownLocked() override;
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
void MaybeUpdatePickerLocked();
// Current config from the resolver.
RefCountedPtr<XdsOverrideHostLbConfig> config_;
// Internal state.
bool shutting_down_ = false;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// Latest state and picker reported by the child policy.
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
absl::Status status_;
RefCountedPtr<SubchannelPicker> picker_;
};
//
// XdsOverrideHostLb::Picker
//
XdsOverrideHostLb::Picker::Picker(XdsOverrideHostLb* xds_override_host_lb,
RefCountedPtr<SubchannelPicker> picker)
: picker_(std::move(picker)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] constructed new picker %p",
xds_override_host_lb, this);
}
}
LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick(
LoadBalancingPolicy::PickArgs args) {
if (picker_ == nullptr) { // Should never happen.
return PickResult::Fail(absl::InternalError(
"xds_override_host picker not given any child picker"));
}
// Delegate to child picker
return picker_->Pick(args);
}
//
// XdsOverrideHostLb
//
XdsOverrideHostLb::XdsOverrideHostLb(Args args)
: LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] created", this);
}
}
XdsOverrideHostLb::~XdsOverrideHostLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] destroying xds_override_host LB policy",
this);
}
}
void XdsOverrideHostLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] shutting down", this);
}
shutting_down_ = true;
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
if (child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());
child_policy_.reset();
}
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_.reset();
}
void XdsOverrideHostLb::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
}
void XdsOverrideHostLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] Received update", this);
}
auto old_config = std::move(config_);
// Update config.
config_ = std::move(args.config);
if (config_ == nullptr) {
return absl::InvalidArgumentError("Missing policy config");
}
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args.args);
}
// Update child policy.
UpdateArgs update_args;
update_args.addresses = std::move(args.addresses);
update_args.resolution_note = std::move(args.resolution_note);
update_args.config = config_->child_config();
update_args.args = std::move(args.args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] Updating child policy handler %p", this,
child_policy_.get());
}
return child_policy_->UpdateLocked(std::move(update_args));
}
void XdsOverrideHostLb::MaybeUpdatePickerLocked() {
if (picker_ != nullptr) {
auto xds_override_host_picker = MakeRefCounted<Picker>(this, picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] updating connectivity: state=%s "
"status=(%s) picker=%p",
this, ConnectivityStateName(state_), status_.ToString().c_str(),
xds_override_host_picker.get());
}
channel_control_helper()->UpdateState(state_, status_,
std::move(xds_override_host_picker));
}
}
OrphanablePtr<LoadBalancingPolicy> XdsOverrideHostLb::CreateChildPolicyLocked(
const ChannelArgs& args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_xds_override_host_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] Created new child policy handler %p",
this, lb_policy.get());
}
// Add our interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// this policy, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
//
// XdsOverrideHostLb::Helper
//
RefCountedPtr<SubchannelInterface> XdsOverrideHostLb::Helper::CreateSubchannel(
ServerAddress address, const ChannelArgs& args) {
return xds_override_host_policy_->channel_control_helper()->CreateSubchannel(
address, args);
}
void XdsOverrideHostLb::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker) {
if (xds_override_host_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] child connectivity state update: "
"state=%s (%s) picker=%p",
xds_override_host_policy_.get(), ConnectivityStateName(state),
status.ToString().c_str(), picker.get());
}
// Save the state and picker.
xds_override_host_policy_->state_ = state;
xds_override_host_policy_->status_ = status;
xds_override_host_policy_->picker_ = std::move(picker);
// Wrap the picker and return it to the channel.
xds_override_host_policy_->MaybeUpdatePickerLocked();
}
void XdsOverrideHostLb::Helper::RequestReresolution() {
if (xds_override_host_policy_->shutting_down_) return;
xds_override_host_policy_->channel_control_helper()->RequestReresolution();
}
absl::string_view XdsOverrideHostLb::Helper::GetAuthority() {
return xds_override_host_policy_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine*
XdsOverrideHostLb::Helper::GetEventEngine() {
return xds_override_host_policy_->channel_control_helper()->GetEventEngine();
}
void XdsOverrideHostLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (xds_override_host_policy_->shutting_down_) return;
xds_override_host_policy_->channel_control_helper()->AddTraceEvent(severity,
message);
}
//
// factory
//
const JsonLoaderInterface* XdsOverrideHostLbConfig::JsonLoader(
const JsonArgs&) {
static const auto kJsonLoader =
JsonObjectLoader<XdsOverrideHostLbConfig>()
// Child policy config is parsed in JsonPostLoad
.Finish();
return kJsonLoader;
}
void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
ValidationErrors* errors) {
ValidationErrors::ScopedField field(errors, ".childPolicy");
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors->AddError("field not present");
} else {
auto child_policy_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
it->second);
if (!child_policy_config.ok()) {
errors->AddError(child_policy_config.status().message());
} else {
child_config_ = std::move(*child_policy_config);
}
}
}
class XdsOverrideHostLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<XdsOverrideHostLb>(std::move(args));
}
absl::string_view name() const override { return kXdsOverrideHost; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& json) const override {
if (json.type() == Json::Type::JSON_NULL) {
// This policy was configured in the deprecated loadBalancingPolicy
// field or in the client API.
return absl::InvalidArgumentError(
"field:loadBalancingPolicy error:xds_override_host policy requires "
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
}
return LoadRefCountedFromJson<XdsOverrideHostLbConfig>(
json, JsonArgs(),
"errors validating xds_override_host LB policy config");
}
};
} // namespace
void RegisterXdsOverrideHostLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<XdsOverrideHostLbFactory>());
}
} // namespace grpc_core

@ -27,7 +27,8 @@
namespace grpc_event_engine {
namespace experimental {
extern void RegisterEventEngineChannelArgPreconditioning(grpc_core::CoreConfiguration::Builder* builder);
extern void RegisterEventEngineChannelArgPreconditioning(
grpc_core::CoreConfiguration::Builder* builder);
} // namespace experimental
} // namespace grpc_event_engine
@ -69,7 +70,8 @@ extern void RegisterBinderResolver(CoreConfiguration::Builder* builder);
#endif
void BuildCoreConfiguration(CoreConfiguration::Builder* builder) {
grpc_event_engine::experimental::RegisterEventEngineChannelArgPreconditioning(builder);
grpc_event_engine::experimental::RegisterEventEngineChannelArgPreconditioning(
builder);
// The order of the handshaker registration is crucial here.
// We want TCP connect handshaker to be registered last so that it is added to
// the start of the handshaker list.

@ -35,6 +35,8 @@ extern void RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder* builder);
extern void RegisterCdsLbPolicy(CoreConfiguration::Builder* builder);
extern void RegisterXdsClusterResolverLbPolicy(
CoreConfiguration::Builder* builder);
extern void RegisterXdsOverrideHostLbPolicy(
CoreConfiguration::Builder* builder);
extern void RegisterXdsWrrLocalityLbPolicy(CoreConfiguration::Builder* builder);
extern void RegisterFileWatcherCertificateProvider(
CoreConfiguration::Builder* builder);
@ -54,6 +56,7 @@ void RegisterExtraFilters(CoreConfiguration::Builder* builder) {
RegisterXdsClusterImplLbPolicy(builder);
RegisterCdsLbPolicy(builder);
RegisterXdsClusterResolverLbPolicy(builder);
RegisterXdsOverrideHostLbPolicy(builder);
RegisterXdsWrrLocalityLbPolicy(builder);
RegisterFileWatcherCertificateProvider(builder);
#endif

@ -51,6 +51,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',
'src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc',

@ -120,3 +120,17 @@ grpc_cc_test(
"//test/core/util:scoped_env_var",
],
)
grpc_cc_test(
name = "xds_override_host_lb_config_parser_test",
srcs = ["xds_override_host_lb_config_parser_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
tags = ["no_test_ios"],
deps = [
"//:grpc",
"//test/core/util:grpc_test_util",
],
)

@ -62,3 +62,15 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "xds_override_host_test",
srcs = ["xds_override_host_test.cc"],
external_deps = ["gtest"],
language = "C++",
deps = [
":lb_policy_test_lib",
"//src/core:grpc_lb_policy_xds_override_host",
"//test/core/util:grpc_test_util",
],
)

@ -220,9 +220,9 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// Called at test tear-down time to ensure that we have not left any
// unexpected events in the queue.
void ExpectQueueEmpty() {
void ExpectQueueEmpty(SourceLocation location = SourceLocation()) {
MutexLock lock(&mu_);
EXPECT_TRUE(queue_.empty());
EXPECT_TRUE(queue_.empty()) << location.file() << ":" << location.line();
for (const Event& event : queue_) {
gpr_log(GPR_ERROR, "UNEXPECTED EVENT LEFT IN QUEUE: %s",
EventString(event).c_str());
@ -375,7 +375,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
LoadBalancingPolicyTest()
: work_serializer_(std::make_shared<WorkSerializer>()) {}
~LoadBalancingPolicyTest() override {
void TearDown() override {
// Note: Can't safely trigger this from inside the FakeHelper dtor,
// because if there is a picker in the queue that is holding a ref
// to the LB policy, that will prevent the LB policy from being
@ -400,11 +400,14 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// Creates an LB policy config from json.
static RefCountedPtr<LoadBalancingPolicy::Config> MakeConfig(
const Json& json) {
return CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(json)
.value();
const Json& json, SourceLocation location = SourceLocation()) {
auto status_or_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
json);
EXPECT_TRUE(status_or_config.ok())
<< status_or_config.status() << "\n"
<< location.file() << ":" << location.line();
return status_or_config.value();
}
// Converts an address URI into a grpc_resolved_address.
@ -485,20 +488,22 @@ class LoadBalancingPolicyTest : public ::testing::Test {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> WaitForConnected(
SourceLocation location = SourceLocation()) {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> final_picker;
WaitForStateUpdate([&](FakeHelper::StateUpdate update) {
if (update.state == GRPC_CHANNEL_CONNECTING) {
EXPECT_TRUE(update.status.ok())
<< update.status << " at " << location.file() << ":"
<< location.line();
ExpectPickQueued(update.picker.get(), location);
return true; // Keep going.
}
EXPECT_EQ(update.state, GRPC_CHANNEL_READY)
<< ConnectivityStateName(update.state) << " at " << location.file()
<< ":" << location.line();
final_picker = std::move(update.picker);
return false; // Stop.
});
WaitForStateUpdate(
[&](FakeHelper::StateUpdate update) {
if (update.state == GRPC_CHANNEL_CONNECTING) {
EXPECT_TRUE(update.status.ok())
<< update.status << " at " << location.file() << ":"
<< location.line();
ExpectPickQueued(update.picker.get(), location);
return true; // Keep going.
}
EXPECT_EQ(update.state, GRPC_CHANNEL_READY)
<< ConnectivityStateName(update.state) << " at "
<< location.file() << ":" << location.line();
final_picker = std::move(update.picker);
return false; // Stop.
},
location);
return final_picker;
}
@ -644,6 +649,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
return &it->second;
}
void ExpectQueueEmpty(SourceLocation location = SourceLocation()) {
helper_->ExpectQueueEmpty(location);
}
std::shared_ptr<WorkSerializer> work_serializer_;
FakeHelper* helper_ = nullptr;
std::map<SubchannelKey, SubchannelState> subchannel_pool_;

@ -0,0 +1,88 @@
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "gmock/gmock.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
namespace {
class XdsOverrideHostTest : public LoadBalancingPolicyTest {
protected:
XdsOverrideHostTest()
: policy_(MakeLbPolicy("xds_override_host_experimental")) {}
RefCountedPtr<LoadBalancingPolicy::Config> MakeXdsOverrideHostConfig(
std::string child_policy = "pick_first") {
Json::Object child_policy_config = {{child_policy, Json::Object()}};
return MakeConfig(Json::Array{Json::Object{
{"xds_override_host_experimental",
Json::Object{{"childPolicy", Json::Array{{child_policy_config}}}}}}});
}
OrphanablePtr<LoadBalancingPolicy> policy_;
};
TEST_F(XdsOverrideHostTest, DelegatesToChild) {
ASSERT_NE(policy_, nullptr);
const std::array<absl::string_view, 2> kAddresses = {"ipv4:127.0.0.1:441",
"ipv4:127.0.0.1:442"};
EXPECT_EQ(policy_->name(), "xds_override_host_experimental");
// 1. We use pick_first as a child
EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, MakeXdsOverrideHostConfig()),
policy_.get()),
absl::OkStatus());
ExpectConnectingUpdate();
auto subchannel =
FindSubchannel({kAddresses[0]},
ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
ASSERT_NE(subchannel, nullptr);
ASSERT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
subchannel =
FindSubchannel({kAddresses[1]},
ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
ASSERT_NE(subchannel, nullptr);
ASSERT_FALSE(subchannel->ConnectionRequested());
auto picker = WaitForConnected();
// Pick first policy will always pick first!
EXPECT_EQ(ExpectPickComplete(picker.get()), "ipv4:127.0.0.1:441");
EXPECT_EQ(ExpectPickComplete(picker.get()), "ipv4:127.0.0.1:441");
}
TEST_F(XdsOverrideHostTest, NoConfigReportsError) {
EXPECT_EQ(
ApplyUpdate(BuildUpdate({"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"}),
policy_.get()),
absl::InvalidArgumentError("Missing policy config"));
}
} // namespace
} // namespace testing
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
}

@ -0,0 +1,119 @@
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/service_config/service_config_impl.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
namespace {
TEST(XdsOverrideHostConfigParsingTest, ValidConfig) {
const char* service_config_json =
"{\n"
" \"loadBalancingConfig\":[{\n"
" \"xds_override_host_experimental\":{\n"
" \"childPolicy\":[\n"
" {\"grpclb\":{}}\n"
" ]\n"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_TRUE(service_config.ok());
EXPECT_NE(*service_config, nullptr);
}
TEST(XdsOverrideHostConfigParsingTest, ReportsMissingChildPolicyField) {
const char* service_config_json =
"{\n"
" \"loadBalancingConfig\":[{\n"
" \"xds_override_host_experimental\":{\n"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_FALSE(service_config.ok());
EXPECT_EQ(service_config.status(),
absl::InvalidArgumentError(
"errors validating service config: [field:loadBalancingConfig "
"error:errors validating xds_override_host LB policy config: "
"[field:childPolicy error:field not present]]"));
}
TEST(XdsOverrideHostConfigParsingTest, ReportsChildPolicyShouldBeArray) {
const char* service_config_json =
"{\n"
" \"loadBalancingConfig\":[{\n"
" \"xds_override_host_experimental\":{\n"
" \"childPolicy\":{\n"
" \"grpclb\":{},\n"
" }\n"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_FALSE(service_config.ok()) << service_config.status();
EXPECT_EQ(service_config.status(),
absl::InvalidArgumentError(
"errors validating service config: [field:loadBalancingConfig "
"error:errors validating xds_override_host LB policy config: "
"[field:childPolicy error:type should be array]]"));
}
TEST(XdsOverrideHostConfigParsingTest, ReportsEmptyChildPolicyArray) {
const char* service_config_json =
"{\n"
" \"loadBalancingConfig\":[{\n"
" \"xds_override_host_experimental\":{\n"
" \"childPolicy\":[\n"
" ]\n"
" }\n"
" }]\n"
"}\n";
auto service_config =
ServiceConfigImpl::Create(ChannelArgs(), service_config_json);
ASSERT_FALSE(service_config.ok()) << service_config.status();
EXPECT_EQ(service_config.status(),
absl::InvalidArgumentError(
"errors validating service config: [field:loadBalancingConfig "
"error:errors validating xds_override_host LB policy config: "
"[field:childPolicy error:No known policies in list: ]]"));
}
} // namespace
} // namespace testing
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
auto result = RUN_ALL_TESTS();
grpc_shutdown();
return result;
}

@ -1135,6 +1135,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.h \

@ -938,6 +938,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.h \

@ -8575,6 +8575,54 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "xds_override_host_lb_config_parser_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "xds_override_host_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save