xds resolver: fix edge cases in interactions between LDS and RDS (#31668)

* xds resolver: fix edge cases in interactions between LDS and RDS

* improve SwitchFromInlineRouteConfigToRds test

* clang-tidy

* Automated change: Fix sanity tests

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/31675/head
Mark D. Roth 2 years ago committed by GitHub
parent 000c1fc18d
commit 64589d7ff3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      CMakeLists.txt
  2. 2
      build_autogenerated.yaml
  3. 42
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  4. 1
      test/cpp/end2end/xds/BUILD
  5. 239
      test/cpp/end2end/xds/xds_routing_end2end_test.cc

8
CMakeLists.txt generated

@ -24551,6 +24551,14 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault_common.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault_common.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault_common.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/fault_common.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.h

@ -13235,6 +13235,8 @@ targets:
- src/proto/grpc/testing/xds/v3/endpoint.proto
- src/proto/grpc/testing/xds/v3/expr.proto
- src/proto/grpc/testing/xds/v3/extension.proto
- src/proto/grpc/testing/xds/v3/fault.proto
- src/proto/grpc/testing/xds/v3/fault_common.proto
- src/proto/grpc/testing/xds/v3/http_connection_manager.proto
- src/proto/grpc/testing/xds/v3/http_filter_rbac.proto
- src/proto/grpc/testing/xds/v3/listener.proto

@ -883,26 +883,44 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) {
&current_listener_.route_config,
// RDS resource name
[&](std::string* rds_name) {
if (route_config_watcher_ != nullptr) {
XdsRouteConfigResourceType::CancelWatch(
xds_client_.get(), route_config_name_, route_config_watcher_,
/*delay_unsubscription=*/!rds_name->empty());
route_config_watcher_ = nullptr;
}
route_config_name_ = std::move(*rds_name);
if (!route_config_name_.empty()) {
current_virtual_host_.routes.clear();
// If the RDS name changed, update the RDS watcher.
// Note that this will be true on the initial update, because
// route_config_name_ will be empty.
if (route_config_name_ != *rds_name) {
// If we already had a watch (i.e., if the previous config had
// a different RDS name), stop the previous watch.
// There will be no previous watch if either (a) this is the
// initial resource update or (b) the previous Listener had an
// inlined RouteConfig.
if (route_config_watcher_ != nullptr) {
XdsRouteConfigResourceType::CancelWatch(
xds_client_.get(), route_config_name_, route_config_watcher_,
/*delay_unsubscription=*/true);
route_config_watcher_ = nullptr;
}
// Start watch for the new RDS resource name.
route_config_name_ = std::move(*rds_name);
auto watcher = MakeRefCounted<RouteConfigWatcher>(Ref());
route_config_watcher_ = watcher.get();
XdsRouteConfigResourceType::StartWatch(
xds_client_.get(), route_config_name_, std::move(watcher));
} else {
// RDS resource name has not changed, so no watch needs to be
// updated, but we still need to propagate any changes in the
// HCM config (e.g., the list of HTTP filters).
GenerateResult();
}
// HCM may contain newer filter config. We need to propagate the
// update as config selector to the channel.
GenerateResult();
},
// inlined RouteConfig
[&](XdsRouteConfigResource* route_config) {
// If the previous update specified an RDS resource instead of
// having an inlined RouteConfig, we need to cancel the RDS watch.
if (route_config_watcher_ != nullptr) {
XdsRouteConfigResourceType::CancelWatch(
xds_client_.get(), route_config_name_, route_config_watcher_);
route_config_watcher_ = nullptr;
route_config_name_.clear();
}
OnRouteConfigUpdate(std::move(*route_config));
});
}

@ -353,6 +353,7 @@ grpc_cc_test(
"//:gpr",
"//:grpc",
"//:grpc++",
"//src/proto/grpc/testing/xds/v3:fault_proto",
"//src/proto/grpc/testing/xds/v3:router_proto",
"//test/core/util:grpc_test_util",
],

@ -21,6 +21,7 @@
#include <gtest/gtest.h>
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/proto/grpc/testing/xds/v3/fault.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/router.grpc.pb.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
@ -96,11 +97,13 @@ TEST_P(LdsDeletionTest, ListenerDeletionIgnored) {
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Unset LDS resource and wait for client to ACK the update.
balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
const auto deadline = absl::Now() + absl::Seconds(30);
const auto deadline =
absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
while (true) {
ASSERT_LT(absl::Now(), deadline) << "timed out waiting for LDS ACK";
response_state = balancer_->ads_service()->lds_response_state();
if (response_state.has_value()) break;
absl::SleepFor(absl::Seconds(1) * grpc_test_slowdown_factor());
}
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Make sure we can still send RPCs.
@ -127,6 +130,240 @@ TEST_P(LdsDeletionTest, ListenerDeletionIgnored) {
WaitForAllBackends(DEBUG_LOCATION, 1, 2);
}
using LdsRdsInteractionTest = XdsEnd2endTest;
INSTANTIATE_TEST_SUITE_P(
XdsTest, LdsRdsInteractionTest,
::testing::Values(XdsTestType().set_enable_rds_testing()),
&XdsTestType::Name);
TEST_P(LdsRdsInteractionTest, SwitchFromRdsToInlineRouteConfig) {
CreateAndStartBackends(2);
// Bring up client pointing to backend 0 and wait for it to connect.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0);
// RDS should have been ACKed.
auto response_state = balancer_->ads_service()->rds_response_state();
ASSERT_TRUE(response_state.has_value());
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Now recreate the LDS resource with an inline route config pointing to a
// different CDS and EDS resource, pointing to backend 1, and make sure
// the client uses it.
const char* kNewClusterName = "new_cluster_name";
const char* kNewEdsResourceName = "new_eds_resource_name";
auto cluster = default_cluster_;
cluster.set_name(kNewClusterName);
cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
balancer_->ads_service()->SetCdsResource(cluster);
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kNewEdsResourceName));
RouteConfiguration new_route_config = default_route_config_;
new_route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
Listener listener = default_listener_;
HttpConnectionManager http_connection_manager =
ClientHcmAccessor().Unpack(listener);
*http_connection_manager.mutable_route_config() = new_route_config;
ClientHcmAccessor().Pack(http_connection_manager, &listener);
balancer_->ads_service()->SetLdsResource(listener);
// Wait for client to start using backend 1.
WaitForBackend(DEBUG_LOCATION, 1);
// Send an update to the original RDS resource, which the client
// should no longer be subscribed to. We need this RouteConfig to be
// different than the original one so that the update does not get
// squelched by XdsClient, so we add a second domain to the vhost that
// will not actually be used.
new_route_config = default_route_config_;
new_route_config.mutable_virtual_hosts(0)->add_domains("foo.example.com");
balancer_->ads_service()->SetRdsResource(new_route_config);
// Wait for RDS ACK to know that the client saw the change.
// TODO(roth): The client does not actually ACK here, it just sends an
// unsubscription request, but our fake xDS server is incorrectly treating
// that as an ACK. When we have time, fix the behavior of the fake
// xDS server, and then change this test to ensure that there is no RDS
// ACK within the 30-second timeout period.
const auto deadline =
absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
while (true) {
ASSERT_LT(absl::Now(), deadline) << "timed out waiting for RDS ACK";
response_state = balancer_->ads_service()->rds_response_state();
if (response_state.has_value()) break;
absl::SleepFor(absl::Seconds(1) * grpc_test_slowdown_factor());
}
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Make sure RPCs are still going to backend 1. This shows that the
// client did not replace its route config with the one from the RDS
// resource that it should no longer be using.
ResetBackendCounters();
CheckRpcSendOk(DEBUG_LOCATION);
EXPECT_EQ(0, backends_[0]->backend_service()->request_count());
EXPECT_EQ(1, backends_[1]->backend_service()->request_count());
}
TEST_P(LdsRdsInteractionTest, SwitchFromInlineRouteConfigToRds) {
CreateAndStartBackends(2);
// Create an LDS resource with an inline RouteConfig pointing to a
// different CDS and EDS resource, sending traffic to backend 0.
const char* kNewClusterName = "new_cluster_name";
const char* kNewEdsResourceName = "new_eds_resource_name";
auto cluster = default_cluster_;
cluster.set_name(kNewClusterName);
cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kNewEdsResourceName));
RouteConfiguration route_config = default_route_config_;
route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
Listener listener = default_listener_;
HttpConnectionManager http_connection_manager =
ClientHcmAccessor().Unpack(listener);
*http_connection_manager.mutable_route_config() = route_config;
ClientHcmAccessor().Pack(http_connection_manager, &listener);
balancer_->ads_service()->SetLdsResource(listener);
// Start the client and make sure traffic goes to backend 0.
WaitForBackend(DEBUG_LOCATION, 0);
// RDS should not have been ACKed, because the RouteConfig was inlined.
ASSERT_FALSE(balancer_->ads_service()->rds_response_state().has_value());
// Change the LDS resource to point to an RDS resource. The LDS resource
// configures the fault injection filter with a config that fails all RPCs.
// However, the RDS resource has a typed_per_filter_config override that
// disables the fault injection filter. The RDS resource points to a
// new cluster that sends traffic to backend 1.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
route_config = default_route_config_;
auto* config_map = route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_typed_per_filter_config();
(*config_map)["envoy.fault"].PackFrom(
envoy::extensions::filters::http::fault::v3::HTTPFault());
envoy::extensions::filters::http::fault::v3::HTTPFault http_fault;
auto* abort_percentage = http_fault.mutable_abort()->mutable_percentage();
abort_percentage->set_numerator(100);
abort_percentage->set_denominator(abort_percentage->HUNDRED);
http_fault.mutable_abort()->set_grpc_status(
static_cast<uint32_t>(StatusCode::ABORTED));
listener = default_listener_;
http_connection_manager = ClientHcmAccessor().Unpack(listener);
*http_connection_manager.add_http_filters() =
http_connection_manager.http_filters(0);
auto* filter = http_connection_manager.mutable_http_filters(0);
filter->set_name("envoy.fault");
filter->mutable_typed_config()->PackFrom(http_fault);
ClientHcmAccessor().Pack(http_connection_manager, &listener);
SetListenerAndRouteConfiguration(balancer_.get(), std::move(listener),
route_config);
// Wait for traffic to switch to backend 1. There should be no RPC
// failures here; if there are, that indicates that the client started
// using the new LDS resource before it saw the new RDS resource.
WaitForBackend(DEBUG_LOCATION, 1);
}
TEST_P(LdsRdsInteractionTest, HcmConfigUpdatedWithoutRdsChange) {
CreateAndStartBackends(1);
// Bring up client pointing to backend 0 and wait for it to connect.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0);
// LDS should have been ACKed.
auto response_state = balancer_->ads_service()->lds_response_state();
ASSERT_TRUE(response_state.has_value());
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Now update the LDS resource to add the fault injection filter with
// a config that fails all RPCs.
envoy::extensions::filters::http::fault::v3::HTTPFault http_fault;
auto* abort_percentage = http_fault.mutable_abort()->mutable_percentage();
abort_percentage->set_numerator(100);
abort_percentage->set_denominator(abort_percentage->HUNDRED);
http_fault.mutable_abort()->set_grpc_status(
static_cast<uint32_t>(StatusCode::ABORTED));
Listener listener = default_listener_;
HttpConnectionManager http_connection_manager =
ClientHcmAccessor().Unpack(listener);
*http_connection_manager.add_http_filters() =
http_connection_manager.http_filters(0);
auto* filter = http_connection_manager.mutable_http_filters(0);
filter->set_name("envoy.fault");
filter->mutable_typed_config()->PackFrom(http_fault);
ClientHcmAccessor().Pack(http_connection_manager, &listener);
SetListenerAndRouteConfiguration(balancer_.get(), std::move(listener),
default_route_config_);
// Wait for the LDS update to be ACKed.
const auto deadline =
absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
while (true) {
ASSERT_LT(absl::Now(), deadline) << "timed out waiting for LDS ACK";
response_state = balancer_->ads_service()->lds_response_state();
if (response_state.has_value()) break;
absl::SleepFor(absl::Seconds(1) * grpc_test_slowdown_factor());
}
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Now RPCs should fail with ABORTED status.
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::ABORTED, "Fault injected");
}
TEST_P(LdsRdsInteractionTest, LdsUpdateChangesHcmConfigAndRdsResourceName) {
CreateAndStartBackends(2);
// Bring up client pointing to backend 0 and wait for it to connect.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0);
// Change the LDS resource to point to an RDS resource. The LDS resource
// configures the fault injection filter with a config that fails all RPCs.
// However, the RDS resource has a typed_per_filter_config override that
// disables the fault injection filter. The RDS resource points to a
// new cluster that sends traffic to backend 1.
const char* kNewClusterName = "new_cluster_name";
const char* kNewEdsResourceName = "new_eds_resource_name";
auto cluster = default_cluster_;
cluster.set_name(kNewClusterName);
cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
balancer_->ads_service()->SetCdsResource(cluster);
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kNewEdsResourceName));
RouteConfiguration route_config = default_route_config_;
route_config.set_name("new_route_config");
route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
auto* config_map = route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_typed_per_filter_config();
(*config_map)["envoy.fault"].PackFrom(
envoy::extensions::filters::http::fault::v3::HTTPFault());
envoy::extensions::filters::http::fault::v3::HTTPFault http_fault;
auto* abort_percentage = http_fault.mutable_abort()->mutable_percentage();
abort_percentage->set_numerator(100);
abort_percentage->set_denominator(abort_percentage->HUNDRED);
http_fault.mutable_abort()->set_grpc_status(
static_cast<uint32_t>(StatusCode::ABORTED));
Listener listener = default_listener_;
HttpConnectionManager http_connection_manager =
ClientHcmAccessor().Unpack(listener);
*http_connection_manager.add_http_filters() =
http_connection_manager.http_filters(0);
auto* filter = http_connection_manager.mutable_http_filters(0);
filter->set_name("envoy.fault");
filter->mutable_typed_config()->PackFrom(http_fault);
ClientHcmAccessor().Pack(http_connection_manager, &listener);
SetListenerAndRouteConfiguration(balancer_.get(), std::move(listener),
route_config);
// Wait for traffic to switch to backend 1. There should be no RPC
// failures here; if there are, that indicates that the client started
// using the new LDS resource before it saw the new RDS resource.
WaitForBackend(DEBUG_LOCATION, 1);
}
using LdsRdsTest = XdsEnd2endTest;
// Test with and without RDS.

Loading…
Cancel
Save