Second attempt: XdsClient: fix resource timeout behavior (#28088)

* Revert "Revert "XdsClient: fix resource timeout behavior (#27860)" (#28032)"

This reverts commit 817eed0928.

* use the right status code enum
pull/28104/head
Mark D. Roth 3 years ago committed by GitHub
parent 2fd63f2f81
commit fff84402ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/ext/xds/xds_api.cc
  2. 180
      src/core/ext/xds/xds_client.cc
  3. 4
      src/core/ext/xds/xds_client.h
  4. 201
      test/cpp/end2end/xds/xds_end2end_test.cc

@ -1909,7 +1909,6 @@ grpc_error_handle RouteConfigParse(
const EncodingContext& context,
const envoy_config_route_v3_RouteConfiguration* route_config,
bool /*is_v2*/, XdsApi::RdsUpdate* rds_update) {
MaybeLogRouteConfiguration(context, route_config);
// Get the virtual hosts.
size_t num_virtual_hosts;
const envoy_config_route_v3_VirtualHost* const* virtual_hosts =

@ -173,10 +173,10 @@ class XdsClient::ChannelState::AdsCallState
bool seen_response() const { return seen_response_; }
void SubscribeLocked(const std::string& type_url,
const XdsApi::ResourceName& resource)
const XdsApi::ResourceName& name)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void UnsubscribeLocked(const std::string& type_url,
const XdsApi::ResourceName& resource,
const XdsApi::ResourceName& name,
bool delay_unsubscription)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
@ -185,24 +185,20 @@ class XdsClient::ChannelState::AdsCallState
private:
class ResourceState : public InternallyRefCounted<ResourceState> {
public:
ResourceState(const std::string& type_url,
const XdsApi::ResourceName& resource,
bool sent_initial_request)
: type_url_(type_url),
resource_(resource),
sent_initial_request_(sent_initial_request) {
ResourceState(const std::string& type_url, const XdsApi::ResourceName& name)
: type_url_(type_url), name_(name) {
GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
grpc_schedule_on_exec_ctx);
}
void Orphan() override {
Finish();
MaybeCancelTimer();
Unref(DEBUG_LOCATION, "Orphan");
}
void Start(RefCountedPtr<AdsCallState> ads_calld) {
if (sent_initial_request_) return;
sent_initial_request_ = true;
void MaybeStartTimer(RefCountedPtr<AdsCallState> ads_calld) {
if (timer_started_) return;
timer_started_ = true;
ads_calld_ = std::move(ads_calld);
Ref(DEBUG_LOCATION, "timer").release();
timer_pending_ = true;
@ -212,7 +208,7 @@ class XdsClient::ChannelState::AdsCallState
&timer_callback_);
}
void Finish() {
void MaybeCancelTimer() {
if (timer_pending_) {
grpc_timer_cancel(&timer_);
timer_pending_ = false;
@ -239,8 +235,8 @@ class XdsClient::ChannelState::AdsCallState
GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat(
"timeout obtaining resource {type=%s name=%s} from xds server",
type_url_,
XdsApi::ConstructFullResourceName(resource_.authority,
type_url_, resource_.id)));
XdsApi::ConstructFullResourceName(name_.authority, type_url_,
name_.id)));
watcher_error = grpc_error_set_int(
watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
@ -248,28 +244,27 @@ class XdsClient::ChannelState::AdsCallState
grpc_error_std_string(watcher_error).c_str());
}
auto& authority_state =
ads_calld_->xds_client()->authority_state_map_[resource_.authority];
ads_calld_->xds_client()->authority_state_map_[name_.authority];
if (type_url_ == XdsApi::kLdsTypeUrl) {
ListenerState& state = authority_state.listener_map[resource_.id];
ListenerState& state = authority_state.listener_map[name_.id];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
ads_calld_->xds_client(), state.watchers,
GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION);
} else if (type_url_ == XdsApi::kRdsTypeUrl) {
RouteConfigState& state =
authority_state.route_config_map[resource_.id];
RouteConfigState& state = authority_state.route_config_map[name_.id];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
ads_calld_->xds_client(), state.watchers,
GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION);
} else if (type_url_ == XdsApi::kCdsTypeUrl) {
ClusterState& state = authority_state.cluster_map[resource_.id];
ClusterState& state = authority_state.cluster_map[name_.id];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
ads_calld_->xds_client(), state.watchers,
GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION);
} else if (type_url_ == XdsApi::kEdsTypeUrl) {
EndpointState& state = authority_state.endpoint_map[resource_.id];
EndpointState& state = authority_state.endpoint_map[name_.id];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
ads_calld_->xds_client(), state.watchers,
@ -283,10 +278,10 @@ class XdsClient::ChannelState::AdsCallState
}
const std::string type_url_;
const XdsApi::ResourceName resource_;
const XdsApi::ResourceName name_;
RefCountedPtr<AdsCallState> ads_calld_;
bool sent_initial_request_;
bool timer_started_ = false;
bool timer_pending_ = false;
grpc_timer timer_;
grpc_closure timer_callback_;
@ -618,7 +613,7 @@ void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
}
void XdsClient::ChannelState::SubscribeLocked(
const std::string& type_url, const XdsApi::ResourceName& resource) {
const std::string& type_url, const XdsApi::ResourceName& name) {
if (ads_calld_ == nullptr) {
// Start the ADS call if this is the first request.
ads_calld_.reset(new RetryableCall<AdsCallState>(
@ -632,16 +627,16 @@ void XdsClient::ChannelState::SubscribeLocked(
// because when the call is restarted it will resend all necessary requests.
if (ads_calld() == nullptr) return;
// Subscribe to this resource if the ADS call is active.
ads_calld()->SubscribeLocked(type_url, resource);
ads_calld()->SubscribeLocked(type_url, name);
}
void XdsClient::ChannelState::UnsubscribeLocked(
const std::string& type_url, const XdsApi::ResourceName& resource,
const std::string& type_url, const XdsApi::ResourceName& name,
bool delay_unsubscription) {
if (ads_calld_ != nullptr) {
auto* calld = ads_calld_->calld();
if (calld != nullptr) {
calld->UnsubscribeLocked(type_url, resource, delay_unsubscription);
calld->UnsubscribeLocked(type_url, name, delay_unsubscription);
if (!calld->HasSubscribedResources()) {
ads_calld_.reset();
}
@ -942,25 +937,23 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
}
void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
const std::string& type_url, const XdsApi::ResourceName& resource) {
auto& state = state_map_[type_url]
.subscribed_resources[resource.authority][resource.id];
const std::string& type_url, const XdsApi::ResourceName& name) {
auto& state =
state_map_[type_url].subscribed_resources[name.authority][name.id];
if (state == nullptr) {
state = MakeOrphanable<ResourceState>(
type_url, resource,
!chand()->resource_type_version_map_[type_url].empty());
state = MakeOrphanable<ResourceState>(type_url, name);
SendMessageLocked(type_url);
}
}
void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked(
const std::string& type_url, const XdsApi::ResourceName& resource,
const std::string& type_url, const XdsApi::ResourceName& name,
bool delay_unsubscription) {
auto& type_state_map = state_map_[type_url];
auto& authority_map = type_state_map.subscribed_resources[resource.authority];
authority_map.erase(resource.id);
auto& authority_map = type_state_map.subscribed_resources[name.authority];
authority_map.erase(name.id);
if (authority_map.empty()) {
type_state_map.subscribed_resources.erase(resource.authority);
type_state_map.subscribed_resources.erase(name.authority);
}
if (!delay_unsubscription) SendMessageLocked(type_url);
}
@ -1001,15 +994,19 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
std::set<std::string> rds_resource_names_seen;
for (auto& p : lds_update_map) {
const XdsApi::ResourceName& resource = p.first;
const XdsApi::ResourceName& name = p.first;
XdsApi::LdsUpdate& lds_update = p.second.resource;
auto& state =
lds_state.subscribed_resources[resource.authority][resource.id];
if (state != nullptr) state->Finish();
auto it = lds_state.subscribed_resources.find(name.authority);
if (it != lds_state.subscribed_resources.end()) {
auto res_it = it->second.find(name.id);
if (res_it != it->second.end()) {
res_it->second->MaybeCancelTimer();
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(),
XdsApi::ConstructFullResourceName(
resource.authority, XdsApi::kLdsTypeUrl, resource.id)
XdsApi::ConstructFullResourceName(name.authority,
XdsApi::kLdsTypeUrl, name.id)
.c_str(),
lds_update.ToString().c_str());
}
@ -1018,10 +1015,9 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
rds_resource_names_seen.insert(
lds_update.http_connection_manager.route_config_name);
}
ListenerState& listener_state =
xds_client()
->authority_state_map_[resource.authority]
.listener_map[resource.id];
ListenerState& listener_state = xds_client()
->authority_state_map_[name.authority]
.listener_map[name.id];
// Ignore identical update.
if (listener_state.update.has_value() &&
*listener_state.update == lds_update) {
@ -1030,8 +1026,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
"[xds_client %p] LDS update for %s identical to current, "
"ignoring.",
xds_client(),
XdsApi::ConstructFullResourceName(
resource.authority, XdsApi::kLdsTypeUrl, resource.id)
XdsApi::ConstructFullResourceName(name.authority,
XdsApi::kLdsTypeUrl, name.id)
.c_str());
}
continue;
@ -1055,14 +1051,14 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
// For invalid resources in the update, if they are already in the
// cache, pretend that they are present in the update, so that we
// don't incorrectly consider them deleted below.
for (const auto& resource : resource_names_failed) {
for (const auto& name : resource_names_failed) {
auto& listener_map =
xds_client()->authority_state_map_[resource.authority].listener_map;
auto it = listener_map.find(resource.id);
xds_client()->authority_state_map_[name.authority].listener_map;
auto it = listener_map.find(name.id);
if (it != listener_map.end()) {
auto& update = it->second.update;
if (!update.has_value()) continue;
lds_update_map[resource];
lds_update_map[name];
if (!update->http_connection_manager.route_config_name.empty()) {
rds_resource_names_seen.insert(
update->http_connection_manager.route_config_name);
@ -1128,19 +1124,23 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked(
}
auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
for (auto& p : rds_update_map) {
const XdsApi::ResourceName& resource = p.first;
const XdsApi::ResourceName& name = p.first;
XdsApi::RdsUpdate& rds_update = p.second.resource;
auto& state =
rds_state.subscribed_resources[resource.authority][resource.id];
if (state != nullptr) state->Finish();
auto it = rds_state.subscribed_resources.find(name.authority);
if (it != rds_state.subscribed_resources.end()) {
auto res_it = it->second.find(name.id);
if (res_it != it->second.end()) {
res_it->second->MaybeCancelTimer();
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
rds_update.ToString().c_str());
}
RouteConfigState& route_config_state =
xds_client()
->authority_state_map_[resource.authority]
.route_config_map[resource.id];
->authority_state_map_[name.authority]
.route_config_map[name.id];
// Ignore identical update.
if (route_config_state.update.has_value() &&
*route_config_state.update == rds_update) {
@ -1182,27 +1182,30 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
std::set<std::string> eds_resource_names_seen;
for (auto& p : cds_update_map) {
const XdsApi::ResourceName& resource = p.first;
const XdsApi::ResourceName& name = p.first;
XdsApi::CdsUpdate& cds_update = p.second.resource;
auto& state =
cds_state.subscribed_resources[resource.authority][resource.id];
if (state != nullptr) state->Finish();
auto it = cds_state.subscribed_resources.find(name.authority);
if (it != cds_state.subscribed_resources.end()) {
auto res_it = it->second.find(name.id);
if (res_it != it->second.end()) {
res_it->second->MaybeCancelTimer();
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(),
XdsApi::ConstructFullResourceName(
resource.authority, XdsApi::kCdsTypeUrl, resource.id)
XdsApi::ConstructFullResourceName(name.authority,
XdsApi::kCdsTypeUrl, name.id)
.c_str(),
cds_update.ToString().c_str());
}
// Record the EDS resource names seen.
eds_resource_names_seen.insert(
cds_update.eds_service_name.empty()
? XdsApi::ConstructFullResourceName(
resource.authority, XdsApi::kCdsTypeUrl, resource.id)
? XdsApi::ConstructFullResourceName(name.authority,
XdsApi::kCdsTypeUrl, name.id)
: cds_update.eds_service_name);
ClusterState& cluster_state = xds_client()
->authority_state_map_[resource.authority]
.cluster_map[resource.id];
ClusterState& cluster_state =
xds_client()->authority_state_map_[name.authority].cluster_map[name.id];
// Ignore identical update.
if (cluster_state.update.has_value() &&
*cluster_state.update == cds_update) {
@ -1232,18 +1235,18 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
// For invalid resources in the update, if they are already in the
// cache, pretend that they are present in the update, so that we
// don't incorrectly consider them deleted below.
for (const auto& resource : resource_names_failed) {
for (const auto& name : resource_names_failed) {
auto& cluster_map =
xds_client()->authority_state_map_[resource.authority].cluster_map;
auto it = cluster_map.find(resource.id);
xds_client()->authority_state_map_[name.authority].cluster_map;
auto it = cluster_map.find(name.id);
if (it != cluster_map.end()) {
auto& update = it->second.update;
if (!update.has_value()) continue;
cds_update_map[resource];
cds_update_map[name];
eds_resource_names_seen.insert(
update->eds_service_name.empty()
? XdsApi::ConstructFullResourceName(
resource.authority, XdsApi::kCdsTypeUrl, resource.id)
? XdsApi::ConstructFullResourceName(name.authority,
XdsApi::kCdsTypeUrl, name.id)
: update->eds_service_name);
}
}
@ -1304,22 +1307,25 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked(
}
auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
for (auto& p : eds_update_map) {
const XdsApi::ResourceName& resource = p.first;
const XdsApi::ResourceName& name = p.first;
XdsApi::EdsUpdate& eds_update = p.second.resource;
auto& state =
eds_state.subscribed_resources[resource.authority][resource.id];
if (state != nullptr) state->Finish();
auto it = eds_state.subscribed_resources.find(name.authority);
if (it != eds_state.subscribed_resources.end()) {
auto res_it = it->second.find(name.id);
if (res_it != it->second.end()) {
res_it->second->MaybeCancelTimer();
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
XdsApi::ConstructFullResourceName(
resource.authority, XdsApi::kCdsTypeUrl, resource.id)
XdsApi::ConstructFullResourceName(name.authority,
XdsApi::kCdsTypeUrl, name.id)
.c_str(),
eds_update.ToString().c_str());
}
EndpointState& endpoint_state =
xds_client()
->authority_state_map_[resource.authority]
.endpoint_map[resource.id];
EndpointState& endpoint_state = xds_client()
->authority_state_map_[name.authority]
.endpoint_map[name.id];
// Ignore identical update.
if (endpoint_state.update.has_value() &&
*endpoint_state.update == eds_update) {
@ -1605,7 +1611,7 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
for (auto& p : a.second) {
resource_map[a.first].insert(p.first);
OrphanablePtr<ResourceState>& state = p.second;
state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
state->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceState"));
}
}
}

@ -235,10 +235,10 @@ class XdsClient : public DualRefCounted<XdsClient> {
void CancelConnectivityWatchLocked();
void SubscribeLocked(const std::string& type_url,
const XdsApi::ResourceName& resource)
const XdsApi::ResourceName& name)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void UnsubscribeLocked(const std::string& type_url,
const XdsApi::ResourceName& resource,
const XdsApi::ResourceName& name,
bool delay_unsubscription)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);

@ -999,11 +999,11 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
EchoResponse* response) {
switch (rpc_options.method) {
case METHOD_ECHO:
return (*stub)->Echo(context, request, response);
return stub->Echo(context, request, response);
case METHOD_ECHO1:
return (*stub)->Echo1(context, request, response);
return stub->Echo1(context, request, response);
case METHOD_ECHO2:
return (*stub)->Echo2(context, request, response);
return stub->Echo2(context, request, response);
}
GPR_UNREACHABLE_CODE();
}
@ -1240,16 +1240,16 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
Status status;
switch (rpc_options.service) {
case SERVICE_ECHO:
status =
SendRpcMethod(&stub_, rpc_options, &context, request, response);
status = SendRpcMethod(stub_.get(), rpc_options, &context, request,
response);
break;
case SERVICE_ECHO1:
status =
SendRpcMethod(&stub1_, rpc_options, &context, request, response);
status = SendRpcMethod(stub1_.get(), rpc_options, &context, request,
response);
break;
case SERVICE_ECHO2:
status =
SendRpcMethod(&stub2_, rpc_options, &context, request, response);
status = SendRpcMethod(stub2_.get(), rpc_options, &context, request,
response);
break;
}
if (local_response) delete response;
@ -9865,39 +9865,198 @@ class TimeoutTest : public XdsEnd2endTest {
: XdsEnd2endTest(/* num_backends= */ 4, /* num_balancers= */ 1,
/*client_load_reporting_interval_seconds= */ 100,
/* xds_resource_does_not_exist_timeout_ms */ 500,
/* use_xds_enabled_server= */ false) {}
/* use_xds_enabled_server= */ false) {
StartAllBackends();
}
};
// Tests that LDS client times out when no response received.
TEST_P(TimeoutTest, Lds) {
TEST_P(TimeoutTest, LdsServerIgnoresRequest) {
balancers_[0]->ads_service()->IgnoreResourceType(kLdsTypeUrl);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
TEST_P(TimeoutTest, Rds) {
TEST_P(TimeoutTest, LdsResourceNotPresentInRequest) {
balancers_[0]->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
TEST_P(TimeoutTest, LdsSecondResourceNotPresentInRequest) {
ASSERT_NE(GetParam().bootstrap_source(), TestType::kBootstrapFromChannelArg)
<< "This test cannot use bootstrap from channel args, because it "
"needs two channels to use the same XdsClient instance.";
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
WaitForAllBackends();
// Create second channel for a new server name.
// This should fail because there is no LDS resource for this server name.
auto channel2 =
CreateChannel(/*failover_timeout=*/0, "new-server.example.com");
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
ClientContext context;
EchoRequest request;
EchoResponse response;
RpcOptions rpc_options;
rpc_options.SetupRpc(&context, &request);
auto status =
SendRpcMethod(stub2.get(), rpc_options, &context, request, &response);
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
}
TEST_P(TimeoutTest, RdsServerIgnoresRequest) {
balancers_[0]->ads_service()->IgnoreResourceType(kRdsTypeUrl);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
// Tests that CDS client times out when no response received.
TEST_P(TimeoutTest, Cds) {
TEST_P(TimeoutTest, RdsResourceNotPresentInRequest) {
balancers_[0]->ads_service()->UnsetResource(kRdsTypeUrl,
kDefaultRouteConfigurationName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
TEST_P(TimeoutTest, RdsSecondResourceNotPresentInRequest) {
ASSERT_NE(GetParam().bootstrap_source(), TestType::kBootstrapFromChannelArg)
<< "This test cannot use bootstrap from channel args, because it "
"needs two channels to use the same XdsClient instance.";
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
// Add listener for 2nd channel, but no RDS resource.
const char* kNewServerName = "new-server.example.com";
Listener listener = default_listener_;
listener.set_name(kNewServerName);
HttpConnectionManager http_connection_manager =
ClientHcmAccessor().Unpack(listener);
auto* rds = http_connection_manager.mutable_rds();
rds->set_route_config_name("rds_resource_does_not_exist");
rds->mutable_config_source()->mutable_ads();
ClientHcmAccessor().Pack(http_connection_manager, &listener);
balancers_[0]->ads_service()->SetLdsResource(listener);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
WaitForAllBackends();
// Create second channel for a new server name.
// This should fail because the LDS resource points to a non-existent RDS
// resource.
auto channel2 = CreateChannel(/*failover_timeout=*/0, kNewServerName);
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
ClientContext context;
EchoRequest request;
EchoResponse response;
RpcOptions rpc_options;
rpc_options.SetupRpc(&context, &request);
auto status =
SendRpcMethod(stub2.get(), rpc_options, &context, request, &response);
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
}
TEST_P(TimeoutTest, CdsServerIgnoresRequest) {
balancers_[0]->ads_service()->IgnoreResourceType(kCdsTypeUrl);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
TEST_P(TimeoutTest, Eds) {
TEST_P(TimeoutTest, CdsResourceNotPresentInRequest) {
balancers_[0]->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
TEST_P(TimeoutTest, CdsSecondResourceNotPresentInRequest) {
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
WaitForAllBackends();
// Change route config to point to non-existing cluster.
const char* kNewClusterName = "new_cluster_name";
RouteConfiguration route_config = default_route_config_;
route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
balancers_[0]->ads_service()->SetRdsResource(route_config);
// New cluster times out.
// May need to wait a bit for the change to propagate to the client.
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10);
bool error_seen = false;
do {
auto status = SendRpc();
if (status.error_code() == StatusCode::UNAVAILABLE) {
error_seen = true;
break;
}
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
EXPECT_TRUE(error_seen);
}
TEST_P(TimeoutTest, EdsServerIgnoresRequest) {
balancers_[0]->ads_service()->IgnoreResourceType(kEdsTypeUrl);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
TEST_P(TimeoutTest, EdsResourceNotPresentInRequest) {
// No need to remove EDS resource, since the test suite does not add it
// by default.
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
TEST_P(TimeoutTest, EdsSecondResourceNotPresentInRequest) {
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
WaitForAllBackends();
// New cluster that points to a non-existant EDS resource.
const char* kNewClusterName = "new_cluster_name";
Cluster cluster = default_cluster_;
cluster.set_name(kNewClusterName);
cluster.mutable_eds_cluster_config()->set_service_name(
"eds_service_name_does_not_exist");
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Now add a route pointing to the new cluster.
RouteConfiguration route_config = default_route_config_;
auto* route = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
*route_config.mutable_virtual_hosts(0)->add_routes() = *route;
route->mutable_match()->set_path("/grpc.testing.EchoTestService/Echo1");
route->mutable_route()->set_cluster(kNewClusterName);
balancers_[0]->ads_service()->SetRdsResource(route_config);
// New EDS resource times out.
// May need to wait a bit for the RDS change to propagate to the client.
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10);
bool error_seen = false;
do {
auto status = SendRpc(RpcOptions().set_rpc_method(METHOD_ECHO1));
if (status.error_code() == StatusCode::UNAVAILABLE) {
error_seen = true;
break;
}
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
EXPECT_TRUE(error_seen);
}
using LocalityMapTest = BasicTest;
// Tests that the localities in a locality map are picked according to their
@ -12398,9 +12557,13 @@ INSTANTIATE_TEST_SUITE_P(
// Do this only for XdsResolver with RDS enabled, so that we can test
// all resource types.
// Run with V3 only, since the functionality is no different in V2.
INSTANTIATE_TEST_SUITE_P(XdsTest, TimeoutTest,
::testing::Values(TestType().set_enable_rds_testing()),
&TestTypeName);
// Run with bootstrap from env var so that multiple channels share the same
// XdsClient (needed for testing the timeout for the 2nd LDS and RDS resource).
INSTANTIATE_TEST_SUITE_P(
XdsTest, TimeoutTest,
::testing::Values(TestType().set_enable_rds_testing().set_bootstrap_source(
TestType::kBootstrapFromEnvVar)),
&TestTypeName);
// XdsResolverOnlyTest depends on XdsResolver.
INSTANTIATE_TEST_SUITE_P(

Loading…
Cancel
Save