diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index e036610a6db..c36766a6dc3 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -267,8 +267,9 @@ class ChannelData { void UpdateServiceConfigInControlPlaneLocked( RefCountedPtr service_config, + RefCountedPtr config_selector, const internal::ClientChannelGlobalParsedConfig* parsed_service_config, - const char* lb_policy_name, const grpc_channel_args* args); + const char* lb_policy_name); void UpdateServiceConfigInDataPlaneLocked(); @@ -1480,6 +1481,7 @@ ChannelData::ChannelConfigHelper::ChooseServiceConfig( const Resolver::Result& result) { ChooseServiceConfigResult service_config_result; RefCountedPtr service_config; + RefCountedPtr config_selector; if (result.service_config_error != GRPC_ERROR_NONE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s", @@ -1495,6 +1497,7 @@ ChannelData::ChannelConfigHelper::ChooseServiceConfig( chand_); } service_config = chand_->saved_service_config_; + config_selector = chand_->saved_config_selector_; } else { // No previously returned config, so put the channel into // TRANSIENT_FAILURE. @@ -1511,8 +1514,9 @@ ChannelData::ChannelConfigHelper::ChooseServiceConfig( } service_config = chand_->default_service_config_; } else { - // Use service config returned by resolver. + // Use ServiceConfig and ConfigSelector returned by resolver. service_config = result.service_config; + config_selector = ConfigSelector::GetFromChannelArgs(*result.args); } GPR_ASSERT(service_config != nullptr); // Extract global config for client channel. @@ -1523,16 +1527,23 @@ ChannelData::ChannelConfigHelper::ChooseServiceConfig( // Find LB policy config. ChooseLbPolicy(result, parsed_service_config, &service_config_result.lb_policy_config); - // Check if the config has changed. - service_config_result.service_config_changed = + // Check if the ServiceConfig has changed. + const bool service_config_changed = chand_->saved_service_config_ == nullptr || service_config->json_string() != chand_->saved_service_config_->json_string(); + // Check if the ConfigSelector has changed. + const bool config_selector_changed = !ConfigSelector::Equals( + chand_->saved_config_selector_.get(), config_selector.get()); + // Indicate a change if either the ServiceConfig or ConfigSelector have + // changed. + service_config_result.service_config_changed = + service_config_changed || config_selector_changed; // If it has, apply the global parameters now. if (service_config_result.service_config_changed) { chand_->UpdateServiceConfigInControlPlaneLocked( - std::move(service_config), parsed_service_config, - service_config_result.lb_policy_config->name(), result.args); + std::move(service_config), std::move(config_selector), + parsed_service_config, service_config_result.lb_policy_config->name()); } // Return results. return service_config_result; @@ -1814,8 +1825,9 @@ void ChannelData::UpdateStateAndPickerLocked( void ChannelData::UpdateServiceConfigInControlPlaneLocked( RefCountedPtr service_config, + RefCountedPtr config_selector, const internal::ClientChannelGlobalParsedConfig* parsed_service_config, - const char* lb_policy_name, const grpc_channel_args* args) { + const char* lb_policy_name) { grpc_core::UniquePtr service_config_json( gpr_strdup(service_config->json_string().c_str())); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { @@ -1825,13 +1837,20 @@ void ChannelData::UpdateServiceConfigInControlPlaneLocked( } // Save service config. saved_service_config_ = std::move(service_config); - // Save health check service name. - health_check_service_name_.reset( - gpr_strdup(parsed_service_config->health_check_service_name())); - // Update health check service name used by existing subchannel wrappers. - for (auto* subchannel_wrapper : subchannel_wrappers_) { - subchannel_wrapper->UpdateHealthCheckServiceName(grpc_core::UniquePtr( - gpr_strdup(health_check_service_name_.get()))); + // Update health check service name if needed. + if (((health_check_service_name_ == nullptr) != + (parsed_service_config->health_check_service_name() == nullptr)) || + (health_check_service_name_ != nullptr && + strcmp(health_check_service_name_.get(), + parsed_service_config->health_check_service_name()) != 0)) { + health_check_service_name_.reset( + gpr_strdup(parsed_service_config->health_check_service_name())); + // Update health check service name used by existing subchannel wrappers. + for (auto* subchannel_wrapper : subchannel_wrappers_) { + subchannel_wrapper->UpdateHealthCheckServiceName( + grpc_core::UniquePtr( + gpr_strdup(health_check_service_name_.get()))); + } } // Swap out the data used by GetChannelInfo(). grpc_core::UniquePtr lb_policy_name_owned(gpr_strdup(lb_policy_name)); @@ -1841,7 +1860,11 @@ void ChannelData::UpdateServiceConfigInControlPlaneLocked( info_service_config_json_ = std::move(service_config_json); } // Save config selector. - saved_config_selector_ = ConfigSelector::GetFromChannelArgs(*args); + saved_config_selector_ = std::move(config_selector); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this, + saved_config_selector_.get()); + } } void ChannelData::UpdateServiceConfigInDataPlaneLocked() { @@ -1862,6 +1885,10 @@ void ChannelData::UpdateServiceConfigInDataPlaneLocked() { RefCountedPtr service_config = saved_service_config_; // Grab ref to config selector. Use default if resolver didn't supply one. RefCountedPtr config_selector = saved_config_selector_; + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this, + saved_config_selector_.get()); + } if (config_selector == nullptr) { config_selector = MakeRefCounted(saved_service_config_); diff --git a/src/core/ext/filters/client_channel/config_selector.h b/src/core/ext/filters/client_channel/config_selector.h index 895e15f56f2..30457d701d6 100644 --- a/src/core/ext/filters/client_channel/config_selector.h +++ b/src/core/ext/filters/client_channel/config_selector.h @@ -65,6 +65,19 @@ class ConfigSelector : public RefCounted { virtual ~ConfigSelector() = default; + virtual const char* name() const = 0; + + // Will be called only if the two objects have the same name, so + // subclasses can be free to safely down-cast the argument. + virtual bool Equals(const ConfigSelector* other) const = 0; + + static bool Equals(const ConfigSelector* cs1, const ConfigSelector* cs2) { + if (cs1 == nullptr) return cs2 == nullptr; + if (cs2 == nullptr) return false; + if (strcmp(cs1->name(), cs2->name()) != 0) return false; + return cs1->Equals(cs2); + } + virtual CallConfig GetCallConfig(GetCallConfigArgs args) = 0; grpc_arg MakeChannelArg() const; @@ -83,6 +96,12 @@ class DefaultConfigSelector : public ConfigSelector { GPR_DEBUG_ASSERT(service_config_ != nullptr); } + const char* name() const override { return "default"; } + + // Only comparing the ConfigSelector itself, not the underlying + // service config, so we always return true. + bool Equals(const ConfigSelector* other) const override { return true; } + CallConfig GetCallConfig(GetCallConfigArgs args) override { CallConfig call_config; call_config.method_configs = diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index c017538ce07..4d3cb3b8fa4 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -115,6 +115,16 @@ class XdsResolver : public Resolver { XdsConfigSelector(RefCountedPtr resolver, const std::vector& routes); ~XdsConfigSelector(); + + const char* name() const override { return "XdsConfigSelector"; } + + bool Equals(const ConfigSelector* other) const override { + const auto* other_xds = static_cast(other); + // Don't need to compare resolver_, since that will always be the same. + return route_table_ == other_xds->route_table_ && + clusters_ == other_xds->clusters_; + } + CallConfig GetCallConfig(GetCallConfigArgs args) override; private: @@ -122,6 +132,10 @@ class XdsResolver : public Resolver { XdsApi::Route route; absl::InlinedVector, 2> weighted_cluster_state; + bool operator==(const Route& other) const { + return route == other.route && + weighted_cluster_state == other.weighted_cluster_state; + } }; using RouteTable = std::vector; @@ -229,6 +243,10 @@ XdsResolver::XdsConfigSelector::XdsConfigSelector( RefCountedPtr resolver, const std::vector& routes) : resolver_(std::move(resolver)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p", + resolver_.get(), this); + } // 1. Construct the route table // 2 Update resolver's cluster state map // 3. Construct cluster list to hold on to entries in the cluster state @@ -240,6 +258,10 @@ XdsResolver::XdsConfigSelector::XdsConfigSelector( // invalid data. route_table_.reserve(routes.size()); for (auto& route : routes) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s", + resolver_.get(), this, route.ToString().c_str()); + } route_table_.emplace_back(); auto& route_entry = route_table_.back(); route_entry.route = route; @@ -258,6 +280,10 @@ XdsResolver::XdsConfigSelector::XdsConfigSelector( } XdsResolver::XdsConfigSelector::~XdsConfigSelector() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p", + resolver_.get(), this); + } clusters_.clear(); resolver_->MaybeRemoveUnusedClusters(); } diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index e8797ceb527..9c9f0a92f37 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -26,7 +26,11 @@ #include #include +#include +#include + #include "absl/strings/str_cat.h" +#include "absl/types/optional.h" #include #include @@ -38,9 +42,6 @@ #include #include -#include "absl/strings/str_cat.h" -#include "absl/types/optional.h" - #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/server_address.h" @@ -76,9 +77,6 @@ #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/route.grpc.pb.h" -#include -#include - namespace grpc { namespace testing { namespace { @@ -808,10 +806,7 @@ class AdsServiceImpl : public std::enable_shared_from_this { } } // As long as the test did not tell us to ignore this type of - // request, we will loop through all resources to: - // 1. subscribe if necessary - // 2. update if necessary - // 3. unsubscribe if necessary + // request, look at all the resource names. if (parent_->resource_types_to_ignore_.find(v3_resource_type) == parent_->resource_types_to_ignore_.end()) { auto& subscription_name_map = @@ -826,9 +821,11 @@ class AdsServiceImpl : public std::enable_shared_from_this { auto& subscription_state = subscription_name_map[resource_name]; auto& resource_state = resource_name_map[resource_name]; + // Subscribe if needed. parent_->MaybeSubscribe(v3_resource_type, resource_name, &subscription_state, &resource_state, &update_queue); + // Send update if needed. if (ClientNeedsResourceUpdate(resource_state, &subscription_state)) { gpr_log(GPR_INFO, @@ -4137,6 +4134,71 @@ TEST_P(LdsRdsTest, XdsRoutingHeadersMatchingUnmatchCases) { EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::ACKED); } +TEST_P(LdsRdsTest, XdsRoutingChangeRoutesWithoutChangingClusters) { + const char* kNewClusterName = "new_cluster"; + const char* kNewEdsServiceName = "new_eds_service_name"; + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + // Populate new EDS resources. + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts(0, 1)}, + }); + AdsServiceImpl::EdsResourceArgs args1({ + {"locality0", GetBackendPorts(1, 2)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args)); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args1, kNewEdsServiceName)); + // Populate new CDS resources. + Cluster new_cluster = balancers_[0]->ads_service()->default_cluster(); + new_cluster.set_name(kNewClusterName); + new_cluster.mutable_eds_cluster_config()->set_service_name( + kNewEdsServiceName); + balancers_[0]->ads_service()->SetCdsResource(new_cluster); + // Populating Route Configurations for LDS. + RouteConfiguration route_config = + balancers_[0]->ads_service()->default_route_config(); + auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0); + route1->mutable_match()->set_prefix("/grpc.testing.EchoTest1Service/"); + route1->mutable_route()->set_cluster(kNewClusterName); + auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes(); + default_route->mutable_match()->set_prefix(""); + default_route->mutable_route()->set_cluster(kDefaultClusterName); + SetRouteConfiguration(0, route_config); + // Make sure all backends are up and that requests for each RPC + // service go to the right backends. + WaitForAllBackends(0, 1, false); + WaitForAllBackends(1, 2, false, RpcOptions().set_rpc_service(SERVICE_ECHO1)); + WaitForAllBackends(0, 1, false, RpcOptions().set_rpc_service(SERVICE_ECHO2)); + // Requests for services Echo and Echo2 should have gone to backend 0. + EXPECT_EQ(1, backends_[0]->backend_service()->request_count()); + EXPECT_EQ(0, backends_[0]->backend_service1()->request_count()); + EXPECT_EQ(1, backends_[0]->backend_service2()->request_count()); + // Requests for service Echo1 should have gone to backend 1. + EXPECT_EQ(0, backends_[1]->backend_service()->request_count()); + EXPECT_EQ(1, backends_[1]->backend_service1()->request_count()); + EXPECT_EQ(0, backends_[1]->backend_service2()->request_count()); + // Now send an update that changes the first route to match a + // different RPC service, and wait for the client to make the change. + route1->mutable_match()->set_prefix("/grpc.testing.EchoTest2Service/"); + SetRouteConfiguration(0, route_config); + WaitForAllBackends(1, 2, true, RpcOptions().set_rpc_service(SERVICE_ECHO2)); + // Now repeat the earlier test, making sure all traffic goes to the + // right place. + WaitForAllBackends(0, 1, false); + WaitForAllBackends(0, 1, false, RpcOptions().set_rpc_service(SERVICE_ECHO1)); + WaitForAllBackends(1, 2, false, RpcOptions().set_rpc_service(SERVICE_ECHO2)); + // Requests for services Echo and Echo1 should have gone to backend 0. + EXPECT_EQ(1, backends_[0]->backend_service()->request_count()); + EXPECT_EQ(1, backends_[0]->backend_service1()->request_count()); + EXPECT_EQ(0, backends_[0]->backend_service2()->request_count()); + // Requests for service Echo2 should have gone to backend 1. + EXPECT_EQ(0, backends_[1]->backend_service()->request_count()); + EXPECT_EQ(0, backends_[1]->backend_service1()->request_count()); + EXPECT_EQ(1, backends_[1]->backend_service2()->request_count()); +} + using CdsTest = BasicTest; // Tests that CDS client should send an ACK upon correct CDS response.