diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index 4bc15ef6385..c9676df6da1 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -1909,7 +1909,6 @@ grpc_error_handle RouteConfigParse( const EncodingContext& context, const envoy_config_route_v3_RouteConfiguration* route_config, bool /*is_v2*/, XdsApi::RdsUpdate* rds_update) { - MaybeLogRouteConfiguration(context, route_config); // Get the virtual hosts. size_t num_virtual_hosts; const envoy_config_route_v3_VirtualHost* const* virtual_hosts = diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 034556bd347..ce07f0392ba 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -173,10 +173,10 @@ class XdsClient::ChannelState::AdsCallState bool seen_response() const { return seen_response_; } void SubscribeLocked(const std::string& type_url, - const XdsApi::ResourceName& resource) + const XdsApi::ResourceName& name) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void UnsubscribeLocked(const std::string& type_url, - const XdsApi::ResourceName& resource, + const XdsApi::ResourceName& name, bool delay_unsubscription) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); @@ -185,24 +185,20 @@ class XdsClient::ChannelState::AdsCallState private: class ResourceState : public InternallyRefCounted { public: - ResourceState(const std::string& type_url, - const XdsApi::ResourceName& resource, - bool sent_initial_request) - : type_url_(type_url), - resource_(resource), - sent_initial_request_(sent_initial_request) { + ResourceState(const std::string& type_url, const XdsApi::ResourceName& name) + : type_url_(type_url), name_(name) { GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this, grpc_schedule_on_exec_ctx); } void Orphan() override { - Finish(); + MaybeCancelTimer(); Unref(DEBUG_LOCATION, "Orphan"); } - void Start(RefCountedPtr ads_calld) { - if (sent_initial_request_) return; - sent_initial_request_ = true; + void MaybeStartTimer(RefCountedPtr ads_calld) { + if (timer_started_) return; + timer_started_ = true; ads_calld_ = std::move(ads_calld); Ref(DEBUG_LOCATION, "timer").release(); timer_pending_ = true; @@ -212,7 +208,7 @@ class XdsClient::ChannelState::AdsCallState &timer_callback_); } - void Finish() { + void MaybeCancelTimer() { if (timer_pending_) { grpc_timer_cancel(&timer_); timer_pending_ = false; @@ -239,8 +235,8 @@ class XdsClient::ChannelState::AdsCallState GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat( "timeout obtaining resource {type=%s name=%s} from xds server", type_url_, - XdsApi::ConstructFullResourceName(resource_.authority, - type_url_, resource_.id))); + XdsApi::ConstructFullResourceName(name_.authority, type_url_, + name_.id))); watcher_error = grpc_error_set_int( watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -248,28 +244,27 @@ class XdsClient::ChannelState::AdsCallState grpc_error_std_string(watcher_error).c_str()); } auto& authority_state = - ads_calld_->xds_client()->authority_state_map_[resource_.authority]; + ads_calld_->xds_client()->authority_state_map_[name_.authority]; if (type_url_ == XdsApi::kLdsTypeUrl) { - ListenerState& state = authority_state.listener_map[resource_.id]; + ListenerState& state = authority_state.listener_map[name_.id]; state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( ads_calld_->xds_client(), state.watchers, GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION); } else if (type_url_ == XdsApi::kRdsTypeUrl) { - RouteConfigState& state = - authority_state.route_config_map[resource_.id]; + RouteConfigState& state = authority_state.route_config_map[name_.id]; state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( ads_calld_->xds_client(), state.watchers, GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION); } else if (type_url_ == XdsApi::kCdsTypeUrl) { - ClusterState& state = authority_state.cluster_map[resource_.id]; + ClusterState& state = authority_state.cluster_map[name_.id]; state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( ads_calld_->xds_client(), state.watchers, GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION); } else if (type_url_ == XdsApi::kEdsTypeUrl) { - EndpointState& state = authority_state.endpoint_map[resource_.id]; + EndpointState& state = authority_state.endpoint_map[name_.id]; state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( ads_calld_->xds_client(), state.watchers, @@ -283,10 +278,10 @@ class XdsClient::ChannelState::AdsCallState } const std::string type_url_; - const XdsApi::ResourceName resource_; + const XdsApi::ResourceName name_; RefCountedPtr ads_calld_; - bool sent_initial_request_; + bool timer_started_ = false; bool timer_pending_ = false; grpc_timer timer_; grpc_closure timer_callback_; @@ -618,7 +613,7 @@ void XdsClient::ChannelState::CancelConnectivityWatchLocked() { } void XdsClient::ChannelState::SubscribeLocked( - const std::string& type_url, const XdsApi::ResourceName& resource) { + const std::string& type_url, const XdsApi::ResourceName& name) { if (ads_calld_ == nullptr) { // Start the ADS call if this is the first request. ads_calld_.reset(new RetryableCall( @@ -632,16 +627,16 @@ void XdsClient::ChannelState::SubscribeLocked( // because when the call is restarted it will resend all necessary requests. if (ads_calld() == nullptr) return; // Subscribe to this resource if the ADS call is active. - ads_calld()->SubscribeLocked(type_url, resource); + ads_calld()->SubscribeLocked(type_url, name); } void XdsClient::ChannelState::UnsubscribeLocked( - const std::string& type_url, const XdsApi::ResourceName& resource, + const std::string& type_url, const XdsApi::ResourceName& name, bool delay_unsubscription) { if (ads_calld_ != nullptr) { auto* calld = ads_calld_->calld(); if (calld != nullptr) { - calld->UnsubscribeLocked(type_url, resource, delay_unsubscription); + calld->UnsubscribeLocked(type_url, name, delay_unsubscription); if (!calld->HasSubscribedResources()) { ads_calld_.reset(); } @@ -942,25 +937,23 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( } void XdsClient::ChannelState::AdsCallState::SubscribeLocked( - const std::string& type_url, const XdsApi::ResourceName& resource) { - auto& state = state_map_[type_url] - .subscribed_resources[resource.authority][resource.id]; + const std::string& type_url, const XdsApi::ResourceName& name) { + auto& state = + state_map_[type_url].subscribed_resources[name.authority][name.id]; if (state == nullptr) { - state = MakeOrphanable( - type_url, resource, - !chand()->resource_type_version_map_[type_url].empty()); + state = MakeOrphanable(type_url, name); SendMessageLocked(type_url); } } void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked( - const std::string& type_url, const XdsApi::ResourceName& resource, + const std::string& type_url, const XdsApi::ResourceName& name, bool delay_unsubscription) { auto& type_state_map = state_map_[type_url]; - auto& authority_map = type_state_map.subscribed_resources[resource.authority]; - authority_map.erase(resource.id); + auto& authority_map = type_state_map.subscribed_resources[name.authority]; + authority_map.erase(name.id); if (authority_map.empty()) { - type_state_map.subscribed_resources.erase(resource.authority); + type_state_map.subscribed_resources.erase(name.authority); } if (!delay_unsubscription) SendMessageLocked(type_url); } @@ -1001,15 +994,19 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( auto& lds_state = state_map_[XdsApi::kLdsTypeUrl]; std::set rds_resource_names_seen; for (auto& p : lds_update_map) { - const XdsApi::ResourceName& resource = p.first; + const XdsApi::ResourceName& name = p.first; XdsApi::LdsUpdate& lds_update = p.second.resource; - auto& state = - lds_state.subscribed_resources[resource.authority][resource.id]; - if (state != nullptr) state->Finish(); + auto it = lds_state.subscribed_resources.find(name.authority); + if (it != lds_state.subscribed_resources.end()) { + auto res_it = it->second.find(name.id); + if (res_it != it->second.end()) { + res_it->second->MaybeCancelTimer(); + } + } if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(), - XdsApi::ConstructFullResourceName( - resource.authority, XdsApi::kLdsTypeUrl, resource.id) + XdsApi::ConstructFullResourceName(name.authority, + XdsApi::kLdsTypeUrl, name.id) .c_str(), lds_update.ToString().c_str()); } @@ -1018,10 +1015,9 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( rds_resource_names_seen.insert( lds_update.http_connection_manager.route_config_name); } - ListenerState& listener_state = - xds_client() - ->authority_state_map_[resource.authority] - .listener_map[resource.id]; + ListenerState& listener_state = xds_client() + ->authority_state_map_[name.authority] + .listener_map[name.id]; // Ignore identical update. if (listener_state.update.has_value() && *listener_state.update == lds_update) { @@ -1030,8 +1026,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( "[xds_client %p] LDS update for %s identical to current, " "ignoring.", xds_client(), - XdsApi::ConstructFullResourceName( - resource.authority, XdsApi::kLdsTypeUrl, resource.id) + XdsApi::ConstructFullResourceName(name.authority, + XdsApi::kLdsTypeUrl, name.id) .c_str()); } continue; @@ -1055,14 +1051,14 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( // For invalid resources in the update, if they are already in the // cache, pretend that they are present in the update, so that we // don't incorrectly consider them deleted below. - for (const auto& resource : resource_names_failed) { + for (const auto& name : resource_names_failed) { auto& listener_map = - xds_client()->authority_state_map_[resource.authority].listener_map; - auto it = listener_map.find(resource.id); + xds_client()->authority_state_map_[name.authority].listener_map; + auto it = listener_map.find(name.id); if (it != listener_map.end()) { auto& update = it->second.update; if (!update.has_value()) continue; - lds_update_map[resource]; + lds_update_map[name]; if (!update->http_connection_manager.route_config_name.empty()) { rds_resource_names_seen.insert( update->http_connection_manager.route_config_name); @@ -1128,19 +1124,23 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked( } auto& rds_state = state_map_[XdsApi::kRdsTypeUrl]; for (auto& p : rds_update_map) { - const XdsApi::ResourceName& resource = p.first; + const XdsApi::ResourceName& name = p.first; XdsApi::RdsUpdate& rds_update = p.second.resource; - auto& state = - rds_state.subscribed_resources[resource.authority][resource.id]; - if (state != nullptr) state->Finish(); + auto it = rds_state.subscribed_resources.find(name.authority); + if (it != rds_state.subscribed_resources.end()) { + auto res_it = it->second.find(name.id); + if (res_it != it->second.end()) { + res_it->second->MaybeCancelTimer(); + } + } if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(), rds_update.ToString().c_str()); } RouteConfigState& route_config_state = xds_client() - ->authority_state_map_[resource.authority] - .route_config_map[resource.id]; + ->authority_state_map_[name.authority] + .route_config_map[name.id]; // Ignore identical update. if (route_config_state.update.has_value() && *route_config_state.update == rds_update) { @@ -1182,27 +1182,30 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( auto& cds_state = state_map_[XdsApi::kCdsTypeUrl]; std::set eds_resource_names_seen; for (auto& p : cds_update_map) { - const XdsApi::ResourceName& resource = p.first; + const XdsApi::ResourceName& name = p.first; XdsApi::CdsUpdate& cds_update = p.second.resource; - auto& state = - cds_state.subscribed_resources[resource.authority][resource.id]; - if (state != nullptr) state->Finish(); + auto it = cds_state.subscribed_resources.find(name.authority); + if (it != cds_state.subscribed_resources.end()) { + auto res_it = it->second.find(name.id); + if (res_it != it->second.end()) { + res_it->second->MaybeCancelTimer(); + } + } if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(), - XdsApi::ConstructFullResourceName( - resource.authority, XdsApi::kCdsTypeUrl, resource.id) + XdsApi::ConstructFullResourceName(name.authority, + XdsApi::kCdsTypeUrl, name.id) .c_str(), cds_update.ToString().c_str()); } // Record the EDS resource names seen. eds_resource_names_seen.insert( cds_update.eds_service_name.empty() - ? XdsApi::ConstructFullResourceName( - resource.authority, XdsApi::kCdsTypeUrl, resource.id) + ? XdsApi::ConstructFullResourceName(name.authority, + XdsApi::kCdsTypeUrl, name.id) : cds_update.eds_service_name); - ClusterState& cluster_state = xds_client() - ->authority_state_map_[resource.authority] - .cluster_map[resource.id]; + ClusterState& cluster_state = + xds_client()->authority_state_map_[name.authority].cluster_map[name.id]; // Ignore identical update. if (cluster_state.update.has_value() && *cluster_state.update == cds_update) { @@ -1232,18 +1235,18 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( // For invalid resources in the update, if they are already in the // cache, pretend that they are present in the update, so that we // don't incorrectly consider them deleted below. - for (const auto& resource : resource_names_failed) { + for (const auto& name : resource_names_failed) { auto& cluster_map = - xds_client()->authority_state_map_[resource.authority].cluster_map; - auto it = cluster_map.find(resource.id); + xds_client()->authority_state_map_[name.authority].cluster_map; + auto it = cluster_map.find(name.id); if (it != cluster_map.end()) { auto& update = it->second.update; if (!update.has_value()) continue; - cds_update_map[resource]; + cds_update_map[name]; eds_resource_names_seen.insert( update->eds_service_name.empty() - ? XdsApi::ConstructFullResourceName( - resource.authority, XdsApi::kCdsTypeUrl, resource.id) + ? XdsApi::ConstructFullResourceName(name.authority, + XdsApi::kCdsTypeUrl, name.id) : update->eds_service_name); } } @@ -1304,22 +1307,25 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked( } auto& eds_state = state_map_[XdsApi::kEdsTypeUrl]; for (auto& p : eds_update_map) { - const XdsApi::ResourceName& resource = p.first; + const XdsApi::ResourceName& name = p.first; XdsApi::EdsUpdate& eds_update = p.second.resource; - auto& state = - eds_state.subscribed_resources[resource.authority][resource.id]; - if (state != nullptr) state->Finish(); + auto it = eds_state.subscribed_resources.find(name.authority); + if (it != eds_state.subscribed_resources.end()) { + auto res_it = it->second.find(name.id); + if (res_it != it->second.end()) { + res_it->second->MaybeCancelTimer(); + } + } if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(), - XdsApi::ConstructFullResourceName( - resource.authority, XdsApi::kCdsTypeUrl, resource.id) + XdsApi::ConstructFullResourceName(name.authority, + XdsApi::kCdsTypeUrl, name.id) .c_str(), eds_update.ToString().c_str()); } - EndpointState& endpoint_state = - xds_client() - ->authority_state_map_[resource.authority] - .endpoint_map[resource.id]; + EndpointState& endpoint_state = xds_client() + ->authority_state_map_[name.authority] + .endpoint_map[name.id]; // Ignore identical update. if (endpoint_state.update.has_value() && *endpoint_state.update == eds_update) { @@ -1605,7 +1611,7 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( for (auto& p : a.second) { resource_map[a.first].insert(p.first); OrphanablePtr& state = p.second; - state->Start(Ref(DEBUG_LOCATION, "ResourceState")); + state->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceState")); } } } diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 153b8c495c8..431f7c471fe 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -235,10 +235,10 @@ class XdsClient : public DualRefCounted { void CancelConnectivityWatchLocked(); void SubscribeLocked(const std::string& type_url, - const XdsApi::ResourceName& resource) + const XdsApi::ResourceName& name) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void UnsubscribeLocked(const std::string& type_url, - const XdsApi::ResourceName& resource, + const XdsApi::ResourceName& name, bool delay_unsubscription) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); diff --git a/test/cpp/end2end/xds/xds_end2end_test.cc b/test/cpp/end2end/xds/xds_end2end_test.cc index 7627b509bc2..e4c45a1f0c3 100644 --- a/test/cpp/end2end/xds/xds_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_end2end_test.cc @@ -999,11 +999,11 @@ class XdsEnd2endTest : public ::testing::TestWithParam { EchoResponse* response) { switch (rpc_options.method) { case METHOD_ECHO: - return (*stub)->Echo(context, request, response); + return stub->Echo(context, request, response); case METHOD_ECHO1: - return (*stub)->Echo1(context, request, response); + return stub->Echo1(context, request, response); case METHOD_ECHO2: - return (*stub)->Echo2(context, request, response); + return stub->Echo2(context, request, response); } GPR_UNREACHABLE_CODE(); } @@ -1240,16 +1240,16 @@ class XdsEnd2endTest : public ::testing::TestWithParam { Status status; switch (rpc_options.service) { case SERVICE_ECHO: - status = - SendRpcMethod(&stub_, rpc_options, &context, request, response); + status = SendRpcMethod(stub_.get(), rpc_options, &context, request, + response); break; case SERVICE_ECHO1: - status = - SendRpcMethod(&stub1_, rpc_options, &context, request, response); + status = SendRpcMethod(stub1_.get(), rpc_options, &context, request, + response); break; case SERVICE_ECHO2: - status = - SendRpcMethod(&stub2_, rpc_options, &context, request, response); + status = SendRpcMethod(stub2_.get(), rpc_options, &context, request, + response); break; } if (local_response) delete response; @@ -9865,39 +9865,198 @@ class TimeoutTest : public XdsEnd2endTest { : XdsEnd2endTest(/* num_backends= */ 4, /* num_balancers= */ 1, /*client_load_reporting_interval_seconds= */ 100, /* xds_resource_does_not_exist_timeout_ms */ 500, - /* use_xds_enabled_server= */ false) {} + /* use_xds_enabled_server= */ false) { + StartAllBackends(); + } }; -// Tests that LDS client times out when no response received. -TEST_P(TimeoutTest, Lds) { +TEST_P(TimeoutTest, LdsServerIgnoresRequest) { balancers_[0]->ads_service()->IgnoreResourceType(kLdsTypeUrl); SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); CheckRpcSendFailure(); } -TEST_P(TimeoutTest, Rds) { +TEST_P(TimeoutTest, LdsResourceNotPresentInRequest) { + balancers_[0]->ads_service()->UnsetResource(kLdsTypeUrl, kServerName); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); +} + +TEST_P(TimeoutTest, LdsSecondResourceNotPresentInRequest) { + ASSERT_NE(GetParam().bootstrap_source(), TestType::kBootstrapFromChannelArg) + << "This test cannot use bootstrap from channel args, because it " + "needs two channels to use the same XdsClient instance."; + EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + WaitForAllBackends(); + // Create second channel for a new server name. + // This should fail because there is no LDS resource for this server name. + auto channel2 = + CreateChannel(/*failover_timeout=*/0, "new-server.example.com"); + auto stub2 = grpc::testing::EchoTestService::NewStub(channel2); + ClientContext context; + EchoRequest request; + EchoResponse response; + RpcOptions rpc_options; + rpc_options.SetupRpc(&context, &request); + auto status = + SendRpcMethod(stub2.get(), rpc_options, &context, request, &response); + EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); +} + +TEST_P(TimeoutTest, RdsServerIgnoresRequest) { balancers_[0]->ads_service()->IgnoreResourceType(kRdsTypeUrl); SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); CheckRpcSendFailure(); } -// Tests that CDS client times out when no response received. -TEST_P(TimeoutTest, Cds) { +TEST_P(TimeoutTest, RdsResourceNotPresentInRequest) { + balancers_[0]->ads_service()->UnsetResource(kRdsTypeUrl, + kDefaultRouteConfigurationName); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); +} + +TEST_P(TimeoutTest, RdsSecondResourceNotPresentInRequest) { + ASSERT_NE(GetParam().bootstrap_source(), TestType::kBootstrapFromChannelArg) + << "This test cannot use bootstrap from channel args, because it " + "needs two channels to use the same XdsClient instance."; + EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); + // Add listener for 2nd channel, but no RDS resource. + const char* kNewServerName = "new-server.example.com"; + Listener listener = default_listener_; + listener.set_name(kNewServerName); + HttpConnectionManager http_connection_manager = + ClientHcmAccessor().Unpack(listener); + auto* rds = http_connection_manager.mutable_rds(); + rds->set_route_config_name("rds_resource_does_not_exist"); + rds->mutable_config_source()->mutable_ads(); + ClientHcmAccessor().Pack(http_connection_manager, &listener); + balancers_[0]->ads_service()->SetLdsResource(listener); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + WaitForAllBackends(); + // Create second channel for a new server name. + // This should fail because the LDS resource points to a non-existent RDS + // resource. + auto channel2 = CreateChannel(/*failover_timeout=*/0, kNewServerName); + auto stub2 = grpc::testing::EchoTestService::NewStub(channel2); + ClientContext context; + EchoRequest request; + EchoResponse response; + RpcOptions rpc_options; + rpc_options.SetupRpc(&context, &request); + auto status = + SendRpcMethod(stub2.get(), rpc_options, &context, request, &response); + EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); +} + +TEST_P(TimeoutTest, CdsServerIgnoresRequest) { balancers_[0]->ads_service()->IgnoreResourceType(kCdsTypeUrl); SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); CheckRpcSendFailure(); } -TEST_P(TimeoutTest, Eds) { +TEST_P(TimeoutTest, CdsResourceNotPresentInRequest) { + balancers_[0]->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); +} + +TEST_P(TimeoutTest, CdsSecondResourceNotPresentInRequest) { + EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + WaitForAllBackends(); + // Change route config to point to non-existing cluster. + const char* kNewClusterName = "new_cluster_name"; + RouteConfiguration route_config = default_route_config_; + route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->set_cluster(kNewClusterName); + balancers_[0]->ads_service()->SetRdsResource(route_config); + // New cluster times out. + // May need to wait a bit for the change to propagate to the client. + gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10); + bool error_seen = false; + do { + auto status = SendRpc(); + if (status.error_code() == StatusCode::UNAVAILABLE) { + error_seen = true; + break; + } + } while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); + EXPECT_TRUE(error_seen); +} + +TEST_P(TimeoutTest, EdsServerIgnoresRequest) { balancers_[0]->ads_service()->IgnoreResourceType(kEdsTypeUrl); SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); CheckRpcSendFailure(); } +TEST_P(TimeoutTest, EdsResourceNotPresentInRequest) { + // No need to remove EDS resource, since the test suite does not add it + // by default. + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); +} + +TEST_P(TimeoutTest, EdsSecondResourceNotPresentInRequest) { + EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args)); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + WaitForAllBackends(); + // New cluster that points to a non-existant EDS resource. + const char* kNewClusterName = "new_cluster_name"; + Cluster cluster = default_cluster_; + cluster.set_name(kNewClusterName); + cluster.mutable_eds_cluster_config()->set_service_name( + "eds_service_name_does_not_exist"); + balancers_[0]->ads_service()->SetCdsResource(cluster); + // Now add a route pointing to the new cluster. + RouteConfiguration route_config = default_route_config_; + auto* route = route_config.mutable_virtual_hosts(0)->mutable_routes(0); + *route_config.mutable_virtual_hosts(0)->add_routes() = *route; + route->mutable_match()->set_path("/grpc.testing.EchoTestService/Echo1"); + route->mutable_route()->set_cluster(kNewClusterName); + balancers_[0]->ads_service()->SetRdsResource(route_config); + // New EDS resource times out. + // May need to wait a bit for the RDS change to propagate to the client. + gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10); + bool error_seen = false; + do { + auto status = SendRpc(RpcOptions().set_rpc_method(METHOD_ECHO1)); + if (status.error_code() == StatusCode::UNAVAILABLE) { + error_seen = true; + break; + } + } while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); + EXPECT_TRUE(error_seen); +} + using LocalityMapTest = BasicTest; // Tests that the localities in a locality map are picked according to their @@ -12398,9 +12557,13 @@ INSTANTIATE_TEST_SUITE_P( // Do this only for XdsResolver with RDS enabled, so that we can test // all resource types. // Run with V3 only, since the functionality is no different in V2. -INSTANTIATE_TEST_SUITE_P(XdsTest, TimeoutTest, - ::testing::Values(TestType().set_enable_rds_testing()), - &TestTypeName); +// Run with bootstrap from env var so that multiple channels share the same +// XdsClient (needed for testing the timeout for the 2nd LDS and RDS resource). +INSTANTIATE_TEST_SUITE_P( + XdsTest, TimeoutTest, + ::testing::Values(TestType().set_enable_rds_testing().set_bootstrap_source( + TestType::kBootstrapFromEnvVar)), + &TestTypeName); // XdsResolverOnlyTest depends on XdsResolver. INSTANTIATE_TEST_SUITE_P(