diff --git a/CMakeLists.txt b/CMakeLists.txt index 3497f565a10..3622ef0f6e4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24551,6 +24551,14 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault_common.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault_common.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault_common.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault_common.grpc.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.h diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index d40655ef6ba..b37f790343b 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -13235,6 +13235,8 @@ targets: - src/proto/grpc/testing/xds/v3/endpoint.proto - src/proto/grpc/testing/xds/v3/expr.proto - src/proto/grpc/testing/xds/v3/extension.proto + - src/proto/grpc/testing/xds/v3/fault.proto + - src/proto/grpc/testing/xds/v3/fault_common.proto - src/proto/grpc/testing/xds/v3/http_connection_manager.proto - src/proto/grpc/testing/xds/v3/http_filter_rbac.proto - src/proto/grpc/testing/xds/v3/listener.proto diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index e6ddd072d20..37763df0265 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -883,26 +883,44 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) { ¤t_listener_.route_config, // RDS resource name [&](std::string* rds_name) { - if (route_config_watcher_ != nullptr) { - XdsRouteConfigResourceType::CancelWatch( - xds_client_.get(), route_config_name_, route_config_watcher_, - /*delay_unsubscription=*/!rds_name->empty()); - route_config_watcher_ = nullptr; - } - route_config_name_ = std::move(*rds_name); - if (!route_config_name_.empty()) { - current_virtual_host_.routes.clear(); + // If the RDS name changed, update the RDS watcher. + // Note that this will be true on the initial update, because + // route_config_name_ will be empty. + if (route_config_name_ != *rds_name) { + // If we already had a watch (i.e., if the previous config had + // a different RDS name), stop the previous watch. + // There will be no previous watch if either (a) this is the + // initial resource update or (b) the previous Listener had an + // inlined RouteConfig. + if (route_config_watcher_ != nullptr) { + XdsRouteConfigResourceType::CancelWatch( + xds_client_.get(), route_config_name_, route_config_watcher_, + /*delay_unsubscription=*/true); + route_config_watcher_ = nullptr; + } + // Start watch for the new RDS resource name. + route_config_name_ = std::move(*rds_name); auto watcher = MakeRefCounted(Ref()); route_config_watcher_ = watcher.get(); XdsRouteConfigResourceType::StartWatch( xds_client_.get(), route_config_name_, std::move(watcher)); + } else { + // RDS resource name has not changed, so no watch needs to be + // updated, but we still need to propagate any changes in the + // HCM config (e.g., the list of HTTP filters). + GenerateResult(); } - // HCM may contain newer filter config. We need to propagate the - // update as config selector to the channel. - GenerateResult(); }, // inlined RouteConfig [&](XdsRouteConfigResource* route_config) { + // If the previous update specified an RDS resource instead of + // having an inlined RouteConfig, we need to cancel the RDS watch. + if (route_config_watcher_ != nullptr) { + XdsRouteConfigResourceType::CancelWatch( + xds_client_.get(), route_config_name_, route_config_watcher_); + route_config_watcher_ = nullptr; + route_config_name_.clear(); + } OnRouteConfigUpdate(std::move(*route_config)); }); } diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD index ecd312f1c3f..9406fda0503 100644 --- a/test/cpp/end2end/xds/BUILD +++ b/test/cpp/end2end/xds/BUILD @@ -353,6 +353,7 @@ grpc_cc_test( "//:gpr", "//:grpc", "//:grpc++", + "//src/proto/grpc/testing/xds/v3:fault_proto", "//src/proto/grpc/testing/xds/v3:router_proto", "//test/core/util:grpc_test_util", ], diff --git a/test/cpp/end2end/xds/xds_routing_end2end_test.cc b/test/cpp/end2end/xds/xds_routing_end2end_test.cc index aad1224e47b..e0c05c38630 100644 --- a/test/cpp/end2end/xds/xds_routing_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_routing_end2end_test.cc @@ -21,6 +21,7 @@ #include #include "src/core/ext/filters/client_channel/backup_poller.h" +#include "src/proto/grpc/testing/xds/v3/fault.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/router.grpc.pb.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" @@ -96,11 +97,13 @@ TEST_P(LdsDeletionTest, ListenerDeletionIgnored) { EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); // Unset LDS resource and wait for client to ACK the update. balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName); - const auto deadline = absl::Now() + absl::Seconds(30); + const auto deadline = + absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor()); while (true) { ASSERT_LT(absl::Now(), deadline) << "timed out waiting for LDS ACK"; response_state = balancer_->ads_service()->lds_response_state(); if (response_state.has_value()) break; + absl::SleepFor(absl::Seconds(1) * grpc_test_slowdown_factor()); } EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); // Make sure we can still send RPCs. @@ -127,6 +130,240 @@ TEST_P(LdsDeletionTest, ListenerDeletionIgnored) { WaitForAllBackends(DEBUG_LOCATION, 1, 2); } +using LdsRdsInteractionTest = XdsEnd2endTest; + +INSTANTIATE_TEST_SUITE_P( + XdsTest, LdsRdsInteractionTest, + ::testing::Values(XdsTestType().set_enable_rds_testing()), + &XdsTestType::Name); + +TEST_P(LdsRdsInteractionTest, SwitchFromRdsToInlineRouteConfig) { + CreateAndStartBackends(2); + // Bring up client pointing to backend 0 and wait for it to connect. + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + WaitForBackend(DEBUG_LOCATION, 0); + // RDS should have been ACKed. + auto response_state = balancer_->ads_service()->rds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Now recreate the LDS resource with an inline route config pointing to a + // different CDS and EDS resource, pointing to backend 1, and make sure + // the client uses it. + const char* kNewClusterName = "new_cluster_name"; + const char* kNewEdsResourceName = "new_eds_resource_name"; + auto cluster = default_cluster_; + cluster.set_name(kNewClusterName); + cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName); + balancer_->ads_service()->SetCdsResource(cluster); + args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args, kNewEdsResourceName)); + RouteConfiguration new_route_config = default_route_config_; + new_route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->set_cluster(kNewClusterName); + Listener listener = default_listener_; + HttpConnectionManager http_connection_manager = + ClientHcmAccessor().Unpack(listener); + *http_connection_manager.mutable_route_config() = new_route_config; + ClientHcmAccessor().Pack(http_connection_manager, &listener); + balancer_->ads_service()->SetLdsResource(listener); + // Wait for client to start using backend 1. + WaitForBackend(DEBUG_LOCATION, 1); + // Send an update to the original RDS resource, which the client + // should no longer be subscribed to. We need this RouteConfig to be + // different than the original one so that the update does not get + // squelched by XdsClient, so we add a second domain to the vhost that + // will not actually be used. + new_route_config = default_route_config_; + new_route_config.mutable_virtual_hosts(0)->add_domains("foo.example.com"); + balancer_->ads_service()->SetRdsResource(new_route_config); + // Wait for RDS ACK to know that the client saw the change. + // TODO(roth): The client does not actually ACK here, it just sends an + // unsubscription request, but our fake xDS server is incorrectly treating + // that as an ACK. When we have time, fix the behavior of the fake + // xDS server, and then change this test to ensure that there is no RDS + // ACK within the 30-second timeout period. + const auto deadline = + absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor()); + while (true) { + ASSERT_LT(absl::Now(), deadline) << "timed out waiting for RDS ACK"; + response_state = balancer_->ads_service()->rds_response_state(); + if (response_state.has_value()) break; + absl::SleepFor(absl::Seconds(1) * grpc_test_slowdown_factor()); + } + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Make sure RPCs are still going to backend 1. This shows that the + // client did not replace its route config with the one from the RDS + // resource that it should no longer be using. + ResetBackendCounters(); + CheckRpcSendOk(DEBUG_LOCATION); + EXPECT_EQ(0, backends_[0]->backend_service()->request_count()); + EXPECT_EQ(1, backends_[1]->backend_service()->request_count()); +} + +TEST_P(LdsRdsInteractionTest, SwitchFromInlineRouteConfigToRds) { + CreateAndStartBackends(2); + // Create an LDS resource with an inline RouteConfig pointing to a + // different CDS and EDS resource, sending traffic to backend 0. + const char* kNewClusterName = "new_cluster_name"; + const char* kNewEdsResourceName = "new_eds_resource_name"; + auto cluster = default_cluster_; + cluster.set_name(kNewClusterName); + cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName); + balancer_->ads_service()->SetCdsResource(cluster); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args, kNewEdsResourceName)); + RouteConfiguration route_config = default_route_config_; + route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->set_cluster(kNewClusterName); + Listener listener = default_listener_; + HttpConnectionManager http_connection_manager = + ClientHcmAccessor().Unpack(listener); + *http_connection_manager.mutable_route_config() = route_config; + ClientHcmAccessor().Pack(http_connection_manager, &listener); + balancer_->ads_service()->SetLdsResource(listener); + // Start the client and make sure traffic goes to backend 0. + WaitForBackend(DEBUG_LOCATION, 0); + // RDS should not have been ACKed, because the RouteConfig was inlined. + ASSERT_FALSE(balancer_->ads_service()->rds_response_state().has_value()); + // Change the LDS resource to point to an RDS resource. The LDS resource + // configures the fault injection filter with a config that fails all RPCs. + // However, the RDS resource has a typed_per_filter_config override that + // disables the fault injection filter. The RDS resource points to a + // new cluster that sends traffic to backend 1. + args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + route_config = default_route_config_; + auto* config_map = route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_typed_per_filter_config(); + (*config_map)["envoy.fault"].PackFrom( + envoy::extensions::filters::http::fault::v3::HTTPFault()); + envoy::extensions::filters::http::fault::v3::HTTPFault http_fault; + auto* abort_percentage = http_fault.mutable_abort()->mutable_percentage(); + abort_percentage->set_numerator(100); + abort_percentage->set_denominator(abort_percentage->HUNDRED); + http_fault.mutable_abort()->set_grpc_status( + static_cast(StatusCode::ABORTED)); + listener = default_listener_; + http_connection_manager = ClientHcmAccessor().Unpack(listener); + *http_connection_manager.add_http_filters() = + http_connection_manager.http_filters(0); + auto* filter = http_connection_manager.mutable_http_filters(0); + filter->set_name("envoy.fault"); + filter->mutable_typed_config()->PackFrom(http_fault); + ClientHcmAccessor().Pack(http_connection_manager, &listener); + SetListenerAndRouteConfiguration(balancer_.get(), std::move(listener), + route_config); + // Wait for traffic to switch to backend 1. There should be no RPC + // failures here; if there are, that indicates that the client started + // using the new LDS resource before it saw the new RDS resource. + WaitForBackend(DEBUG_LOCATION, 1); +} + +TEST_P(LdsRdsInteractionTest, HcmConfigUpdatedWithoutRdsChange) { + CreateAndStartBackends(1); + // Bring up client pointing to backend 0 and wait for it to connect. + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + WaitForBackend(DEBUG_LOCATION, 0); + // LDS should have been ACKed. + auto response_state = balancer_->ads_service()->lds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Now update the LDS resource to add the fault injection filter with + // a config that fails all RPCs. + envoy::extensions::filters::http::fault::v3::HTTPFault http_fault; + auto* abort_percentage = http_fault.mutable_abort()->mutable_percentage(); + abort_percentage->set_numerator(100); + abort_percentage->set_denominator(abort_percentage->HUNDRED); + http_fault.mutable_abort()->set_grpc_status( + static_cast(StatusCode::ABORTED)); + Listener listener = default_listener_; + HttpConnectionManager http_connection_manager = + ClientHcmAccessor().Unpack(listener); + *http_connection_manager.add_http_filters() = + http_connection_manager.http_filters(0); + auto* filter = http_connection_manager.mutable_http_filters(0); + filter->set_name("envoy.fault"); + filter->mutable_typed_config()->PackFrom(http_fault); + ClientHcmAccessor().Pack(http_connection_manager, &listener); + SetListenerAndRouteConfiguration(balancer_.get(), std::move(listener), + default_route_config_); + // Wait for the LDS update to be ACKed. + const auto deadline = + absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor()); + while (true) { + ASSERT_LT(absl::Now(), deadline) << "timed out waiting for LDS ACK"; + response_state = balancer_->ads_service()->lds_response_state(); + if (response_state.has_value()) break; + absl::SleepFor(absl::Seconds(1) * grpc_test_slowdown_factor()); + } + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Now RPCs should fail with ABORTED status. + CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::ABORTED, "Fault injected"); +} + +TEST_P(LdsRdsInteractionTest, LdsUpdateChangesHcmConfigAndRdsResourceName) { + CreateAndStartBackends(2); + // Bring up client pointing to backend 0 and wait for it to connect. + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + WaitForBackend(DEBUG_LOCATION, 0); + // Change the LDS resource to point to an RDS resource. The LDS resource + // configures the fault injection filter with a config that fails all RPCs. + // However, the RDS resource has a typed_per_filter_config override that + // disables the fault injection filter. The RDS resource points to a + // new cluster that sends traffic to backend 1. + const char* kNewClusterName = "new_cluster_name"; + const char* kNewEdsResourceName = "new_eds_resource_name"; + auto cluster = default_cluster_; + cluster.set_name(kNewClusterName); + cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName); + balancer_->ads_service()->SetCdsResource(cluster); + args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args, kNewEdsResourceName)); + RouteConfiguration route_config = default_route_config_; + route_config.set_name("new_route_config"); + route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->set_cluster(kNewClusterName); + auto* config_map = route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_typed_per_filter_config(); + (*config_map)["envoy.fault"].PackFrom( + envoy::extensions::filters::http::fault::v3::HTTPFault()); + envoy::extensions::filters::http::fault::v3::HTTPFault http_fault; + auto* abort_percentage = http_fault.mutable_abort()->mutable_percentage(); + abort_percentage->set_numerator(100); + abort_percentage->set_denominator(abort_percentage->HUNDRED); + http_fault.mutable_abort()->set_grpc_status( + static_cast(StatusCode::ABORTED)); + Listener listener = default_listener_; + HttpConnectionManager http_connection_manager = + ClientHcmAccessor().Unpack(listener); + *http_connection_manager.add_http_filters() = + http_connection_manager.http_filters(0); + auto* filter = http_connection_manager.mutable_http_filters(0); + filter->set_name("envoy.fault"); + filter->mutable_typed_config()->PackFrom(http_fault); + ClientHcmAccessor().Pack(http_connection_manager, &listener); + SetListenerAndRouteConfiguration(balancer_.get(), std::move(listener), + route_config); + // Wait for traffic to switch to backend 1. There should be no RPC + // failures here; if there are, that indicates that the client started + // using the new LDS resource before it saw the new RDS resource. + WaitForBackend(DEBUG_LOCATION, 1); +} + using LdsRdsTest = XdsEnd2endTest; // Test with and without RDS.