diff --git a/BUILD b/BUILD index 57a0401f7ea..74bd041822b 100644 --- a/BUILD +++ b/BUILD @@ -523,6 +523,7 @@ GRPC_XDS_TARGETS = [ "//src/core:grpc_lb_policy_xds_cluster_impl", "//src/core:grpc_lb_policy_xds_cluster_manager", "//src/core:grpc_lb_policy_xds_cluster_resolver", + "//src/core:grpc_lb_policy_xds_override_host", "//src/core:grpc_lb_policy_xds_wrr_locality", "//src/core:grpc_resolver_xds", "//src/core:grpc_resolver_c2p", diff --git a/CMakeLists.txt b/CMakeLists.txt index b5c6d87ab3f..e2b237d9499 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1290,6 +1290,8 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx xds_outlier_detection_end2end_test) endif() + add_dependencies(buildtests_cxx xds_override_host_lb_config_parser_test) + add_dependencies(buildtests_cxx xds_override_host_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx xds_ring_hash_end2end_test) endif() @@ -1699,6 +1701,7 @@ add_library(grpc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc + src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc src/core/ext/filters/client_channel/local_subchannel_pool.cc src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc @@ -24156,6 +24159,80 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) endif() +endif() +if(gRPC_BUILD_TESTS) + +add_executable(xds_override_host_lb_config_parser_test + test/core/client_channel/xds_override_host_lb_config_parser_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(xds_override_host_lb_config_parser_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(xds_override_host_lb_config_parser_test + ${_gRPC_BASELIB_LIBRARIES} + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ZLIB_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + +endif() +if(gRPC_BUILD_TESTS) + +add_executable(xds_override_host_test + test/core/client_channel/lb_policy/xds_override_host_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(xds_override_host_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(xds_override_host_test + ${_gRPC_BASELIB_LIBRARIES} + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ZLIB_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) diff --git a/Makefile b/Makefile index d129f51d6dd..79bd679cde3 100644 --- a/Makefile +++ b/Makefile @@ -994,6 +994,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc \ @@ -2904,6 +2905,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.cc: $(OPENSSL_D src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc: $(OPENSSL_DEP) +src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 3debebdb0e2..3233fda672c 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -1082,6 +1082,7 @@ libs: - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc + - src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc - src/core/ext/filters/client_channel/local_subchannel_pool.cc - src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc @@ -13130,6 +13131,25 @@ targets: - linux - posix - mac +- name: xds_override_host_lb_config_parser_test + gtest: true + build: test + language: c++ + headers: [] + src: + - test/core/client_channel/xds_override_host_lb_config_parser_test.cc + deps: + - grpc_test_util +- name: xds_override_host_test + gtest: true + build: test + language: c++ + headers: + - test/core/client_channel/lb_policy/lb_policy_test_lib.h + src: + - test/core/client_channel/lb_policy/xds_override_host_test.cc + deps: + - grpc_test_util - name: xds_ring_hash_end2end_test gtest: true build: test diff --git a/config.m4 b/config.m4 index 626e3687e37..3a0ebfb6faa 100644 --- a/config.m4 +++ b/config.m4 @@ -76,6 +76,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc \ diff --git a/config.w32 b/config.w32 index 76aefbd9ed0..63a1e6795b4 100644 --- a/config.w32 +++ b/config.w32 @@ -42,6 +42,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_impl.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_manager.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_resolver.cc " + + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_override_host.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_wrr_locality.cc " + "src\\core\\ext\\filters\\client_channel\\local_subchannel_pool.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\binder\\binder_resolver.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 5dc33ecf241..664116e0db6 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -264,6 +264,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.h', diff --git a/grpc.gemspec b/grpc.gemspec index cbcae9e3290..5cabc734dbc 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -175,6 +175,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc ) s.files += %w( src/core/ext/filters/client_channel/local_subchannel_pool.cc ) s.files += %w( src/core/ext/filters/client_channel/local_subchannel_pool.h ) diff --git a/grpc.gyp b/grpc.gyp index 7af6ae01910..f231ecb5353 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -406,6 +406,7 @@ 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc', 'src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc', diff --git a/package.xml b/package.xml index bd51bae7e38..f0325e19bf5 100644 --- a/package.xml +++ b/package.xml @@ -157,6 +157,7 @@ + diff --git a/src/core/BUILD b/src/core/BUILD index 837a2bb12c9..7606378df28 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4309,6 +4309,40 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "grpc_lb_policy_xds_override_host", + srcs = [ + "ext/filters/client_channel/lb_policy/xds/xds_override_host.cc", + ], + external_deps = [ + "absl/status", + "absl/status:statusor", + "absl/strings", + "absl/types:optional", + ], + language = "c++", + deps = [ + "channel_args", + "grpc_lb_subchannel_list", + "json", + "json_args", + "json_object_loader", + "lb_policy", + "lb_policy_factory", + "pollset_set", + "subchannel_interface", + "//:config", + "//:debug_location", + "//:gpr", + "//:grpc_base", + "//:grpc_client_channel", + "//:grpc_trace", + "//:orphanable", + "//:ref_counted_ptr", + "//:server_address", + ], +) + grpc_cc_library( name = "lb_server_load_reporting_filter", srcs = [ diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc new file mode 100644 index 00000000000..209154ea59a --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc @@ -0,0 +1,410 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" + +#include +#include +#include + +#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" +#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/json/json.h" +#include "src/core/lib/json/json_args.h" +#include "src/core/lib/json/json_object_loader.h" +#include "src/core/lib/load_balancing/lb_policy.h" +#include "src/core/lib/load_balancing/lb_policy_factory.h" +#include "src/core/lib/load_balancing/subchannel_interface.h" +#include "src/core/lib/resolver/server_address.h" +#include "src/core/lib/transport/connectivity_state.h" + +namespace grpc_core { + +TraceFlag grpc_lb_xds_override_host_trace(false, "xds_override_host_lb"); + +namespace { + +// +// xds_override_host LB policy +// + +constexpr absl::string_view kXdsOverrideHost = "xds_override_host_experimental"; + +// Config for stateful session LB policy. +class XdsOverrideHostLbConfig : public LoadBalancingPolicy::Config { + public: + XdsOverrideHostLbConfig() = default; + + XdsOverrideHostLbConfig(const XdsOverrideHostLbConfig&) = delete; + XdsOverrideHostLbConfig& operator=(const XdsOverrideHostLbConfig&) = delete; + + XdsOverrideHostLbConfig(XdsOverrideHostLbConfig&& other) = delete; + XdsOverrideHostLbConfig& operator=(XdsOverrideHostLbConfig&& other) = delete; + + absl::string_view name() const override { return kXdsOverrideHost; } + + RefCountedPtr child_config() const { + return child_config_; + } + + static const JsonLoaderInterface* JsonLoader(const JsonArgs&); + void JsonPostLoad(const Json& json, const JsonArgs&, + ValidationErrors* errors); + + private: + RefCountedPtr child_config_; +}; + +// xDS Cluster Impl LB policy. +class XdsOverrideHostLb : public LoadBalancingPolicy { + public: + explicit XdsOverrideHostLb(Args args); + + absl::string_view name() const override { return kXdsOverrideHost; } + + absl::Status UpdateLocked(UpdateArgs args) override; + void ExitIdleLocked() override; + void ResetBackoffLocked() override; + + private: + // A picker that wraps the picker from the child for cases when cookie is + // present. + class Picker : public SubchannelPicker { + public: + Picker(XdsOverrideHostLb* xds_override_host_lb, + RefCountedPtr picker); + + PickResult Pick(PickArgs args) override; + + private: + RefCountedPtr picker_; + }; + + class Helper : public ChannelControlHelper { + public: + explicit Helper(RefCountedPtr xds_override_host_policy) + : xds_override_host_policy_(std::move(xds_override_host_policy)) {} + + ~Helper() override { + xds_override_host_policy_.reset(DEBUG_LOCATION, "Helper"); + } + + RefCountedPtr CreateSubchannel( + ServerAddress address, const ChannelArgs& args) override; + void UpdateState(grpc_connectivity_state state, const absl::Status& status, + RefCountedPtr picker) override; + void RequestReresolution() override; + absl::string_view GetAuthority() override; + grpc_event_engine::experimental::EventEngine* GetEventEngine() override; + void AddTraceEvent(TraceSeverity severity, + absl::string_view message) override; + + private: + RefCountedPtr xds_override_host_policy_; + }; + + ~XdsOverrideHostLb() override; + + void ShutdownLocked() override; + + OrphanablePtr CreateChildPolicyLocked( + const ChannelArgs& args); + + void MaybeUpdatePickerLocked(); + + // Current config from the resolver. + RefCountedPtr config_; + + // Internal state. + bool shutting_down_ = false; + + OrphanablePtr child_policy_; + + // Latest state and picker reported by the child policy. + grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE; + absl::Status status_; + RefCountedPtr picker_; +}; + +// +// XdsOverrideHostLb::Picker +// + +XdsOverrideHostLb::Picker::Picker(XdsOverrideHostLb* xds_override_host_lb, + RefCountedPtr picker) + : picker_(std::move(picker)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, "[xds_override_host_lb %p] constructed new picker %p", + xds_override_host_lb, this); + } +} + +LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick( + LoadBalancingPolicy::PickArgs args) { + if (picker_ == nullptr) { // Should never happen. + return PickResult::Fail(absl::InternalError( + "xds_override_host picker not given any child picker")); + } + // Delegate to child picker + return picker_->Pick(args); +} + +// +// XdsOverrideHostLb +// + +XdsOverrideHostLb::XdsOverrideHostLb(Args args) + : LoadBalancingPolicy(std::move(args)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, "[xds_override_host_lb %p] created", this); + } +} + +XdsOverrideHostLb::~XdsOverrideHostLb() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, + "[xds_override_host_lb %p] destroying xds_override_host LB policy", + this); + } +} + +void XdsOverrideHostLb::ShutdownLocked() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, "[xds_override_host_lb %p] shutting down", this); + } + shutting_down_ = true; + // Remove the child policy's interested_parties pollset_set from the + // xDS policy. + if (child_policy_ != nullptr) { + grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), + interested_parties()); + child_policy_.reset(); + } + // Drop our ref to the child's picker, in case it's holding a ref to + // the child. + picker_.reset(); +} + +void XdsOverrideHostLb::ExitIdleLocked() { + if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); +} + +void XdsOverrideHostLb::ResetBackoffLocked() { + if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); +} + +absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, "[xds_override_host_lb %p] Received update", this); + } + auto old_config = std::move(config_); + // Update config. + config_ = std::move(args.config); + if (config_ == nullptr) { + return absl::InvalidArgumentError("Missing policy config"); + } + // Create child policy if needed. + if (child_policy_ == nullptr) { + child_policy_ = CreateChildPolicyLocked(args.args); + } + // Update child policy. + UpdateArgs update_args; + update_args.addresses = std::move(args.addresses); + update_args.resolution_note = std::move(args.resolution_note); + update_args.config = config_->child_config(); + update_args.args = std::move(args.args); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, + "[xds_override_host_lb %p] Updating child policy handler %p", this, + child_policy_.get()); + } + return child_policy_->UpdateLocked(std::move(update_args)); +} + +void XdsOverrideHostLb::MaybeUpdatePickerLocked() { + if (picker_ != nullptr) { + auto xds_override_host_picker = MakeRefCounted(this, picker_); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, + "[xds_override_host_lb %p] updating connectivity: state=%s " + "status=(%s) picker=%p", + this, ConnectivityStateName(state_), status_.ToString().c_str(), + xds_override_host_picker.get()); + } + channel_control_helper()->UpdateState(state_, status_, + std::move(xds_override_host_picker)); + } +} + +OrphanablePtr XdsOverrideHostLb::CreateChildPolicyLocked( + const ChannelArgs& args) { + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.work_serializer = work_serializer(); + lb_policy_args.args = args; + lb_policy_args.channel_control_helper = + std::make_unique(Ref(DEBUG_LOCATION, "Helper")); + OrphanablePtr lb_policy = + MakeOrphanable(std::move(lb_policy_args), + &grpc_lb_xds_override_host_trace); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, + "[xds_override_host_lb %p] Created new child policy handler %p", + this, lb_policy.get()); + } + // Add our interested_parties pollset_set to that of the newly created + // child policy. This will make the child policy progress upon activity on + // this policy, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), + interested_parties()); + return lb_policy; +} + +// +// XdsOverrideHostLb::Helper +// + +RefCountedPtr XdsOverrideHostLb::Helper::CreateSubchannel( + ServerAddress address, const ChannelArgs& args) { + return xds_override_host_policy_->channel_control_helper()->CreateSubchannel( + address, args); +} + +void XdsOverrideHostLb::Helper::UpdateState( + grpc_connectivity_state state, const absl::Status& status, + RefCountedPtr picker) { + if (xds_override_host_policy_->shutting_down_) return; + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, + "[xds_override_host_lb %p] child connectivity state update: " + "state=%s (%s) picker=%p", + xds_override_host_policy_.get(), ConnectivityStateName(state), + status.ToString().c_str(), picker.get()); + } + // Save the state and picker. + xds_override_host_policy_->state_ = state; + xds_override_host_policy_->status_ = status; + xds_override_host_policy_->picker_ = std::move(picker); + // Wrap the picker and return it to the channel. + xds_override_host_policy_->MaybeUpdatePickerLocked(); +} + +void XdsOverrideHostLb::Helper::RequestReresolution() { + if (xds_override_host_policy_->shutting_down_) return; + xds_override_host_policy_->channel_control_helper()->RequestReresolution(); +} + +absl::string_view XdsOverrideHostLb::Helper::GetAuthority() { + return xds_override_host_policy_->channel_control_helper()->GetAuthority(); +} + +grpc_event_engine::experimental::EventEngine* +XdsOverrideHostLb::Helper::GetEventEngine() { + return xds_override_host_policy_->channel_control_helper()->GetEventEngine(); +} + +void XdsOverrideHostLb::Helper::AddTraceEvent(TraceSeverity severity, + absl::string_view message) { + if (xds_override_host_policy_->shutting_down_) return; + xds_override_host_policy_->channel_control_helper()->AddTraceEvent(severity, + message); +} + +// +// factory +// +const JsonLoaderInterface* XdsOverrideHostLbConfig::JsonLoader( + const JsonArgs&) { + static const auto kJsonLoader = + JsonObjectLoader() + // Child policy config is parsed in JsonPostLoad + .Finish(); + return kJsonLoader; +} + +void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json, const JsonArgs&, + ValidationErrors* errors) { + ValidationErrors::ScopedField field(errors, ".childPolicy"); + auto it = json.object_value().find("childPolicy"); + if (it == json.object_value().end()) { + errors->AddError("field not present"); + } else { + auto child_policy_config = + CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( + it->second); + if (!child_policy_config.ok()) { + errors->AddError(child_policy_config.status().message()); + } else { + child_config_ = std::move(*child_policy_config); + } + } +} + +class XdsOverrideHostLbFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr CreateLoadBalancingPolicy( + LoadBalancingPolicy::Args args) const override { + return MakeOrphanable(std::move(args)); + } + + absl::string_view name() const override { return kXdsOverrideHost; } + + absl::StatusOr> + ParseLoadBalancingConfig(const Json& json) const override { + if (json.type() == Json::Type::JSON_NULL) { + // This policy was configured in the deprecated loadBalancingPolicy + // field or in the client API. + return absl::InvalidArgumentError( + "field:loadBalancingPolicy error:xds_override_host policy requires " + "configuration. Please use loadBalancingConfig field of service " + "config instead."); + } + return LoadRefCountedFromJson( + json, JsonArgs(), + "errors validating xds_override_host LB policy config"); + } +}; + +} // namespace + +void RegisterXdsOverrideHostLbPolicy(CoreConfiguration::Builder* builder) { + builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory( + std::make_unique()); +} + +} // namespace grpc_core diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index fb55a55a404..e04efebbe7c 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -27,7 +27,8 @@ namespace grpc_event_engine { namespace experimental { -extern void RegisterEventEngineChannelArgPreconditioning(grpc_core::CoreConfiguration::Builder* builder); +extern void RegisterEventEngineChannelArgPreconditioning( + grpc_core::CoreConfiguration::Builder* builder); } // namespace experimental } // namespace grpc_event_engine @@ -69,7 +70,8 @@ extern void RegisterBinderResolver(CoreConfiguration::Builder* builder); #endif void BuildCoreConfiguration(CoreConfiguration::Builder* builder) { - grpc_event_engine::experimental::RegisterEventEngineChannelArgPreconditioning(builder); + grpc_event_engine::experimental::RegisterEventEngineChannelArgPreconditioning( + builder); // The order of the handshaker registration is crucial here. // We want TCP connect handshaker to be registered last so that it is added to // the start of the handshaker list. diff --git a/src/core/plugin_registry/grpc_plugin_registry_extra.cc b/src/core/plugin_registry/grpc_plugin_registry_extra.cc index a2c80cd7d73..6fdc2d52a69 100644 --- a/src/core/plugin_registry/grpc_plugin_registry_extra.cc +++ b/src/core/plugin_registry/grpc_plugin_registry_extra.cc @@ -35,6 +35,8 @@ extern void RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder* builder); extern void RegisterCdsLbPolicy(CoreConfiguration::Builder* builder); extern void RegisterXdsClusterResolverLbPolicy( CoreConfiguration::Builder* builder); +extern void RegisterXdsOverrideHostLbPolicy( + CoreConfiguration::Builder* builder); extern void RegisterXdsWrrLocalityLbPolicy(CoreConfiguration::Builder* builder); extern void RegisterFileWatcherCertificateProvider( CoreConfiguration::Builder* builder); @@ -54,6 +56,7 @@ void RegisterExtraFilters(CoreConfiguration::Builder* builder) { RegisterXdsClusterImplLbPolicy(builder); RegisterCdsLbPolicy(builder); RegisterXdsClusterResolverLbPolicy(builder); + RegisterXdsOverrideHostLbPolicy(builder); RegisterXdsWrrLocalityLbPolicy(builder); RegisterFileWatcherCertificateProvider(builder); #endif diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 2959512df71..cdab9772b88 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -51,6 +51,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc', 'src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc', diff --git a/test/core/client_channel/BUILD b/test/core/client_channel/BUILD index 2c68e91c627..1d70b79b98d 100644 --- a/test/core/client_channel/BUILD +++ b/test/core/client_channel/BUILD @@ -120,3 +120,17 @@ grpc_cc_test( "//test/core/util:scoped_env_var", ], ) + +grpc_cc_test( + name = "xds_override_host_lb_config_parser_test", + srcs = ["xds_override_host_lb_config_parser_test.cc"], + external_deps = [ + "gtest", + ], + language = "C++", + tags = ["no_test_ios"], + deps = [ + "//:grpc", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/client_channel/lb_policy/BUILD b/test/core/client_channel/lb_policy/BUILD index c8f5f963127..8830da2b3ab 100644 --- a/test/core/client_channel/lb_policy/BUILD +++ b/test/core/client_channel/lb_policy/BUILD @@ -62,3 +62,15 @@ grpc_cc_test( "//test/core/util:grpc_test_util", ], ) + +grpc_cc_test( + name = "xds_override_host_test", + srcs = ["xds_override_host_test.cc"], + external_deps = ["gtest"], + language = "C++", + deps = [ + ":lb_policy_test_lib", + "//src/core:grpc_lb_policy_xds_override_host", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index 5d6c7926a8b..6b7430bc6ac 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -220,9 +220,9 @@ class LoadBalancingPolicyTest : public ::testing::Test { // Called at test tear-down time to ensure that we have not left any // unexpected events in the queue. - void ExpectQueueEmpty() { + void ExpectQueueEmpty(SourceLocation location = SourceLocation()) { MutexLock lock(&mu_); - EXPECT_TRUE(queue_.empty()); + EXPECT_TRUE(queue_.empty()) << location.file() << ":" << location.line(); for (const Event& event : queue_) { gpr_log(GPR_ERROR, "UNEXPECTED EVENT LEFT IN QUEUE: %s", EventString(event).c_str()); @@ -375,7 +375,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { LoadBalancingPolicyTest() : work_serializer_(std::make_shared()) {} - ~LoadBalancingPolicyTest() override { + void TearDown() override { // Note: Can't safely trigger this from inside the FakeHelper dtor, // because if there is a picker in the queue that is holding a ref // to the LB policy, that will prevent the LB policy from being @@ -400,11 +400,14 @@ class LoadBalancingPolicyTest : public ::testing::Test { // Creates an LB policy config from json. static RefCountedPtr MakeConfig( - const Json& json) { - return CoreConfiguration::Get() - .lb_policy_registry() - .ParseLoadBalancingConfig(json) - .value(); + const Json& json, SourceLocation location = SourceLocation()) { + auto status_or_config = + CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( + json); + EXPECT_TRUE(status_or_config.ok()) + << status_or_config.status() << "\n" + << location.file() << ":" << location.line(); + return status_or_config.value(); } // Converts an address URI into a grpc_resolved_address. @@ -485,20 +488,22 @@ class LoadBalancingPolicyTest : public ::testing::Test { RefCountedPtr WaitForConnected( SourceLocation location = SourceLocation()) { RefCountedPtr final_picker; - WaitForStateUpdate([&](FakeHelper::StateUpdate update) { - if (update.state == GRPC_CHANNEL_CONNECTING) { - EXPECT_TRUE(update.status.ok()) - << update.status << " at " << location.file() << ":" - << location.line(); - ExpectPickQueued(update.picker.get(), location); - return true; // Keep going. - } - EXPECT_EQ(update.state, GRPC_CHANNEL_READY) - << ConnectivityStateName(update.state) << " at " << location.file() - << ":" << location.line(); - final_picker = std::move(update.picker); - return false; // Stop. - }); + WaitForStateUpdate( + [&](FakeHelper::StateUpdate update) { + if (update.state == GRPC_CHANNEL_CONNECTING) { + EXPECT_TRUE(update.status.ok()) + << update.status << " at " << location.file() << ":" + << location.line(); + ExpectPickQueued(update.picker.get(), location); + return true; // Keep going. + } + EXPECT_EQ(update.state, GRPC_CHANNEL_READY) + << ConnectivityStateName(update.state) << " at " + << location.file() << ":" << location.line(); + final_picker = std::move(update.picker); + return false; // Stop. + }, + location); return final_picker; } @@ -644,6 +649,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { return &it->second; } + void ExpectQueueEmpty(SourceLocation location = SourceLocation()) { + helper_->ExpectQueueEmpty(location); + } + std::shared_ptr work_serializer_; FakeHelper* helper_ = nullptr; std::map subchannel_pool_; diff --git a/test/core/client_channel/lb_policy/xds_override_host_test.cc b/test/core/client_channel/lb_policy/xds_override_host_test.cc new file mode 100644 index 00000000000..c758c450d3c --- /dev/null +++ b/test/core/client_channel/lb_policy/xds_override_host_test.cc @@ -0,0 +1,88 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include "gmock/gmock.h" + +#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h" +#include "test/core/util/test_config.h" + +namespace grpc_core { +namespace testing { +namespace { + +class XdsOverrideHostTest : public LoadBalancingPolicyTest { + protected: + XdsOverrideHostTest() + : policy_(MakeLbPolicy("xds_override_host_experimental")) {} + + RefCountedPtr MakeXdsOverrideHostConfig( + std::string child_policy = "pick_first") { + Json::Object child_policy_config = {{child_policy, Json::Object()}}; + return MakeConfig(Json::Array{Json::Object{ + {"xds_override_host_experimental", + Json::Object{{"childPolicy", Json::Array{{child_policy_config}}}}}}}); + } + + OrphanablePtr policy_; +}; + +TEST_F(XdsOverrideHostTest, DelegatesToChild) { + ASSERT_NE(policy_, nullptr); + const std::array kAddresses = {"ipv4:127.0.0.1:441", + "ipv4:127.0.0.1:442"}; + EXPECT_EQ(policy_->name(), "xds_override_host_experimental"); + // 1. We use pick_first as a child + EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, MakeXdsOverrideHostConfig()), + policy_.get()), + absl::OkStatus()); + ExpectConnectingUpdate(); + auto subchannel = + FindSubchannel({kAddresses[0]}, + ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + ASSERT_NE(subchannel, nullptr); + ASSERT_TRUE(subchannel->ConnectionRequested()); + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + subchannel->SetConnectivityState(GRPC_CHANNEL_READY); + subchannel = + FindSubchannel({kAddresses[1]}, + ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + ASSERT_NE(subchannel, nullptr); + ASSERT_FALSE(subchannel->ConnectionRequested()); + auto picker = WaitForConnected(); + // Pick first policy will always pick first! + EXPECT_EQ(ExpectPickComplete(picker.get()), "ipv4:127.0.0.1:441"); + EXPECT_EQ(ExpectPickComplete(picker.get()), "ipv4:127.0.0.1:441"); +} + +TEST_F(XdsOverrideHostTest, NoConfigReportsError) { + EXPECT_EQ( + ApplyUpdate(BuildUpdate({"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"}), + policy_.get()), + absl::InvalidArgumentError("Missing policy config")); +} + +} // namespace +} // namespace testing +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; +} diff --git a/test/core/client_channel/xds_override_host_lb_config_parser_test.cc b/test/core/client_channel/xds_override_host_lb_config_parser_test.cc new file mode 100644 index 00000000000..2a556f1db15 --- /dev/null +++ b/test/core/client_channel/xds_override_host_lb_config_parser_test.cc @@ -0,0 +1,119 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/service_config/service_config.h" +#include "src/core/lib/service_config/service_config_impl.h" +#include "test/core/util/test_config.h" + +namespace grpc_core { +namespace testing { +namespace { + +TEST(XdsOverrideHostConfigParsingTest, ValidConfig) { + const char* service_config_json = + "{\n" + " \"loadBalancingConfig\":[{\n" + " \"xds_override_host_experimental\":{\n" + " \"childPolicy\":[\n" + " {\"grpclb\":{}}\n" + " ]\n" + " }\n" + " }]\n" + "}\n"; + auto service_config = + ServiceConfigImpl::Create(ChannelArgs(), service_config_json); + ASSERT_TRUE(service_config.ok()); + EXPECT_NE(*service_config, nullptr); +} + +TEST(XdsOverrideHostConfigParsingTest, ReportsMissingChildPolicyField) { + const char* service_config_json = + "{\n" + " \"loadBalancingConfig\":[{\n" + " \"xds_override_host_experimental\":{\n" + " }\n" + " }]\n" + "}\n"; + auto service_config = + ServiceConfigImpl::Create(ChannelArgs(), service_config_json); + ASSERT_FALSE(service_config.ok()); + EXPECT_EQ(service_config.status(), + absl::InvalidArgumentError( + "errors validating service config: [field:loadBalancingConfig " + "error:errors validating xds_override_host LB policy config: " + "[field:childPolicy error:field not present]]")); +} + +TEST(XdsOverrideHostConfigParsingTest, ReportsChildPolicyShouldBeArray) { + const char* service_config_json = + "{\n" + " \"loadBalancingConfig\":[{\n" + " \"xds_override_host_experimental\":{\n" + " \"childPolicy\":{\n" + " \"grpclb\":{},\n" + " }\n" + " }\n" + " }]\n" + "}\n"; + auto service_config = + ServiceConfigImpl::Create(ChannelArgs(), service_config_json); + ASSERT_FALSE(service_config.ok()) << service_config.status(); + EXPECT_EQ(service_config.status(), + absl::InvalidArgumentError( + "errors validating service config: [field:loadBalancingConfig " + "error:errors validating xds_override_host LB policy config: " + "[field:childPolicy error:type should be array]]")); +} + +TEST(XdsOverrideHostConfigParsingTest, ReportsEmptyChildPolicyArray) { + const char* service_config_json = + "{\n" + " \"loadBalancingConfig\":[{\n" + " \"xds_override_host_experimental\":{\n" + " \"childPolicy\":[\n" + " ]\n" + " }\n" + " }]\n" + "}\n"; + auto service_config = + ServiceConfigImpl::Create(ChannelArgs(), service_config_json); + ASSERT_FALSE(service_config.ok()) << service_config.status(); + EXPECT_EQ(service_config.status(), + absl::InvalidArgumentError( + "errors validating service config: [field:loadBalancingConfig " + "error:errors validating xds_override_host LB policy config: " + "[field:childPolicy error:No known policies in list: ]]")); +} +} // namespace +} // namespace testing +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 6df73a4666c..4f9608eccce 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1135,6 +1135,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index bba1ac95db2..dfc07b57004 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -938,6 +938,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc \ +src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.h \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 54fb90311b9..4a1894a74ca 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -8575,6 +8575,54 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "xds_override_host_lb_config_parser_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "xds_override_host_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,