diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 89fd15faf1a..ab4c39f9310 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -360,6 +360,11 @@ typedef struct {
    of that priority fail to connect. If 0, failover happens immediately. Default
    value is 10 seconds. */
 #define GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS "grpc.xds_failover_timeout_ms"
+/* Timeout in milliseconds to wait for a resource to be returned from
+ * the xds server before assuming that it does not exist.
+ * The default is 15 seconds. */
+#define GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS \
+  "grpc.xds_resource_does_not_exist_timeout_ms"
 /** If non-zero, grpc server's cronet compression workaround will be enabled */
 #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \
   "grpc.workaround.cronet_compression"
diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
index 32f5e5c3523..e7eed60c446 100644
--- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
@@ -68,6 +68,7 @@ class XdsResolver : public Resolver {
 
 void XdsResolver::ServiceConfigWatcher::OnServiceConfigChanged(
     RefCountedPtr<ServiceConfig> service_config) {
+  if (resolver_->xds_client_ == nullptr) return;
   grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg();
   Result result;
   result.args =
@@ -77,6 +78,7 @@ void XdsResolver::ServiceConfigWatcher::OnServiceConfigChanged(
 }
 
 void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) {
+  if (resolver_->xds_client_ == nullptr) return;
   grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg();
   Result result;
   result.args =
diff --git a/src/core/ext/filters/client_channel/xds/xds_api.cc b/src/core/ext/filters/client_channel/xds/xds_api.cc
index 5b27c5d5904..dc7d10a7f4c 100644
--- a/src/core/ext/filters/client_channel/xds/xds_api.cc
+++ b/src/core/ext/filters/client_channel/xds/xds_api.cc
@@ -1119,10 +1119,12 @@ void LocalityStatsPopulate(
 }  // namespace
 
 grpc_slice XdsLrsRequestCreateAndEncode(
-    std::map<StringView, std::set<XdsClientStats*>> client_stats_map) {
+    std::map<StringView, std::set<XdsClientStats*>, StringLess>
+        client_stats_map) {
   upb::Arena arena;
   // Get the snapshots.
-  std::map<StringView, grpc_core::InlinedVector<XdsClientStats::Snapshot, 1>>
+  std::map<StringView, grpc_core::InlinedVector<XdsClientStats::Snapshot, 1>,
+           StringLess>
       snapshot_map;
   for (auto& p : client_stats_map) {
     const StringView& cluster_name = p.first;
diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h
index 48c7282772f..f98d32b1e38 100644
--- a/src/core/ext/filters/client_channel/xds/xds_api.h
+++ b/src/core/ext/filters/client_channel/xds/xds_api.h
@@ -41,19 +41,6 @@ constexpr char kCdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster";
 constexpr char kEdsTypeUrl[] =
     "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
 
-// The version state for each specific ADS resource type.
-struct VersionState {
-  // The version of the latest response that is accepted and used.
-  std::string version_info;
-  // The nonce of the latest response.
-  std::string nonce;
-  // The error message to be included in a NACK with the nonce. Consumed when a
-  // nonce is NACK'ed for the first time.
-  grpc_error* error = GRPC_ERROR_NONE;
-
-  ~VersionState() { GRPC_ERROR_UNREF(error); }
-};
-
 struct RdsUpdate {
   // The name to use in the CDS request.
   std::string cluster_name;
@@ -246,7 +233,7 @@ grpc_slice XdsLrsRequestCreateAndEncode(const std::string& server_name,
 // Creates an LRS request sending client-side load reports. If all the counters
 // are zero, returns empty slice.
 grpc_slice XdsLrsRequestCreateAndEncode(
-    std::map<StringView /*cluster_name*/, std::set<XdsClientStats*>>
+    std::map<StringView /*cluster_name*/, std::set<XdsClientStats*>, StringLess>
         client_stats_map);
 
 // Parses the LRS response and returns \a
diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc
index 52bda5aa8f3..7e3c7ba04ec 100644
--- a/src/core/ext/filters/client_channel/xds/xds_client.cc
+++ b/src/core/ext/filters/client_channel/xds/xds_client.cc
@@ -125,31 +125,122 @@ class XdsClient::ChannelState::AdsCallState
   XdsClient* xds_client() const { return chand()->xds_client(); }
   bool seen_response() const { return seen_response_; }
 
-  // If \a type_url is an unsupported type, \a nonce_for_unsupported_type and
-  // \a error_for_unsupported_type will be used in the request; otherwise, the
-  // nonce and error stored in each ADS call state will be used. Takes ownership
-  // of \a error_for_unsupported_type.
-  void SendMessageLocked(const std::string& type_url,
-                         const std::string& nonce_for_unsupported_type,
-                         grpc_error* error_for_unsupported_type,
-                         bool is_first_message);
+  void Subscribe(const std::string& type_url, const std::string& name);
+  void Unsubscribe(const std::string& type_url, const std::string& name);
+
+  bool HasSubscribedResources() const;
 
  private:
-  struct BufferedRequest {
-    std::string nonce;
-    grpc_error* error;
+  class ResourceState : public InternallyRefCounted<ResourceState> {
+   public:
+    ResourceState(const std::string& type_url, const std::string& name)
+        : type_url_(type_url), name_(name) {
+      GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
+                        grpc_schedule_on_exec_ctx);
+    }
+
+    void Orphan() override {
+      Finish();
+      Unref();
+    }
+
+    void Start(RefCountedPtr<AdsCallState> ads_calld) {
+      if (sent_) return;
+      sent_ = true;
+      ads_calld_ = std::move(ads_calld);
+      Ref().release();
+      timer_pending_ = true;
+      grpc_timer_init(
+          &timer_,
+          ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
+          &timer_callback_);
+    }
+
+    void Finish() {
+      if (timer_pending_) {
+        grpc_timer_cancel(&timer_);
+        timer_pending_ = false;
+      }
+    }
 
-    // Takes ownership of \a error.
-    BufferedRequest(std::string nonce, grpc_error* error)
-        : nonce(std::move(nonce)), error(error) {}
+   private:
+    static void OnTimer(void* arg, grpc_error* error) {
+      ResourceState* self = static_cast<ResourceState*>(arg);
+      self->ads_calld_->xds_client()->combiner_->Run(
+          GRPC_CLOSURE_INIT(&self->timer_callback_, OnTimerLocked, self,
+                            nullptr),
+          GRPC_ERROR_REF(error));
+    }
 
-    ~BufferedRequest() { GRPC_ERROR_UNREF(error); }
+    static void OnTimerLocked(void* arg, grpc_error* error) {
+      ResourceState* self = static_cast<ResourceState*>(arg);
+      if (error == GRPC_ERROR_NONE && self->timer_pending_) {
+        self->timer_pending_ = false;
+        char* msg;
+        gpr_asprintf(
+            &msg,
+            "timeout obtaining resource {type=%s name=%s} from xds server",
+            self->type_url_.c_str(), self->name_.c_str());
+        grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+        gpr_free(msg);
+        if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+          gpr_log(GPR_INFO, "[xds_client %p] %s",
+                  self->ads_calld_->xds_client(), grpc_error_string(error));
+        }
+        if (self->type_url_ == kLdsTypeUrl || self->type_url_ == kRdsTypeUrl) {
+          self->ads_calld_->xds_client()->service_config_watcher_->OnError(
+              error);
+        } else if (self->type_url_ == kCdsTypeUrl) {
+          ClusterState& state =
+              self->ads_calld_->xds_client()->cluster_map_[self->name_];
+          for (const auto& p : state.watchers) {
+            p.first->OnError(GRPC_ERROR_REF(error));
+          }
+          GRPC_ERROR_UNREF(error);
+        } else if (self->type_url_ == kEdsTypeUrl) {
+          EndpointState& state =
+              self->ads_calld_->xds_client()->endpoint_map_[self->name_];
+          for (const auto& p : state.watchers) {
+            p.first->OnError(GRPC_ERROR_REF(error));
+          }
+          GRPC_ERROR_UNREF(error);
+        } else {
+          GPR_UNREACHABLE_CODE(return );
+        }
+      }
+      self->ads_calld_.reset();
+      self->Unref();
+    }
+
+    const std::string type_url_;
+    const std::string name_;
+
+    RefCountedPtr<AdsCallState> ads_calld_;
+    bool sent_ = false;
+    bool timer_pending_ = false;
+    grpc_timer timer_;
+    grpc_closure timer_callback_;
   };
 
-  void AcceptLdsUpdate(LdsUpdate lds_update, std::string new_version);
-  void AcceptRdsUpdate(RdsUpdate rds_update, std::string new_version);
-  void AcceptCdsUpdate(CdsUpdateMap cds_update_map, std::string new_version);
-  void AcceptEdsUpdate(EdsUpdateMap eds_update_map, std::string new_version);
+  struct ResourceTypeState {
+    ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
+
+    // Version, nonce, and error for this resource type.
+    std::string version;
+    std::string nonce;
+    grpc_error* error = GRPC_ERROR_NONE;
+
+    // Subscribed resources of this type.
+    std::map<std::string /* name */, OrphanablePtr<ResourceState>>
+        subscribed_resources;
+  };
+
+  void SendMessageLocked(const std::string& type_url);
+
+  void AcceptLdsUpdate(LdsUpdate lds_update);
+  void AcceptRdsUpdate(RdsUpdate rds_update);
+  void AcceptCdsUpdate(CdsUpdateMap cds_update_map);
+  void AcceptEdsUpdate(EdsUpdateMap eds_update_map);
 
   static void OnRequestSent(void* arg, grpc_error* error);
   static void OnRequestSentLocked(void* arg, grpc_error* error);
@@ -160,8 +251,13 @@ class XdsClient::ChannelState::AdsCallState
 
   bool IsCurrentCallOnChannel() const;
 
+  std::set<StringView> ClusterNamesForRequest();
+  std::set<StringView> EdsServiceNamesForRequest();
+
   // The owning RetryableCall<>.
   RefCountedPtr<RetryableCall<AdsCallState>> parent_;
+
+  bool sent_initial_message_ = false;
   bool seen_response_ = false;
 
   // Always non-NULL.
@@ -184,15 +280,11 @@ class XdsClient::ChannelState::AdsCallState
   grpc_slice status_details_;
   grpc_closure on_status_received_;
 
-  // Version state.
-  VersionState lds_version_;
-  VersionState rds_version_;
-  VersionState cds_version_;
-  VersionState eds_version_;
+  // Resource types for which requests need to be sent.
+  std::set<std::string /*type_url*/> buffered_requests_;
 
-  // Buffered requests.
-  std::map<std::string /*type_url*/, std::unique_ptr<BufferedRequest>>
-      buffered_request_map_;
+  // State for each resource type.
+  std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
 };
 
 // Contains an LRS call to the xds server.
@@ -445,31 +537,30 @@ void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
   grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
 }
 
-void XdsClient::ChannelState::OnResourceNamesChanged(
-    const std::string& type_url) {
+void XdsClient::ChannelState::Subscribe(const std::string& type_url,
+                                        const std::string& name) {
   if (ads_calld_ == nullptr) {
     // Start the ADS call if this is the first request.
     ads_calld_.reset(new RetryableCall<AdsCallState>(
         Ref(DEBUG_LOCATION, "ChannelState+ads")));
-    // Note: AdsCallState's ctor will automatically send necessary messages, so
-    // we can return here.
+    // Note: AdsCallState's ctor will automatically subscribe to all
+    // resources that the XdsClient already has watchers for, so we can
+    // return here.
     return;
   }
   // If the ADS call is in backoff state, we don't need to do anything now
   // because when the call is restarted it will resend all necessary requests.
   if (ads_calld() == nullptr) return;
-  // Send the message if the ADS call is active.
-  ads_calld()->SendMessageLocked(type_url, "", nullptr, false);
+  // Subscribe to this resource if the ADS call is active.
+  ads_calld()->Subscribe(type_url, name);
 }
 
-void XdsClient::ChannelState::OnWatcherRemoved() {
-  // Keep the ADS call if there are watcher(s).
-  for (const auto& p : xds_client()->cluster_map_) {
-    const ClusterState& cluster_state = p.second;
-    if (!cluster_state.watchers.empty()) return;
+void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
+                                          const std::string& name) {
+  if (ads_calld_ != nullptr) {
+    ads_calld_->calld()->Unsubscribe(type_url, name);
+    if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset();
   }
-  if (!xds_client()->endpoint_map_.empty()) return;
-  ads_calld_.reset();
 }
 
 //
@@ -620,22 +711,14 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
   // Op: send request message.
   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
                     grpc_schedule_on_exec_ctx);
-  bool initial_message = true;
   if (xds_client()->service_config_watcher_ != nullptr) {
-    if (xds_client()->route_config_name_.empty()) {
-      SendMessageLocked(kLdsTypeUrl, "", nullptr, initial_message);
-      initial_message = false;
-    } else if (xds_client()->cluster_name_.empty()) {
-      SendMessageLocked(kRdsTypeUrl, "", nullptr, initial_message);
-      initial_message = false;
-    }
+    Subscribe(kLdsTypeUrl, xds_client()->server_name_);
   }
-  if (!xds_client()->cluster_map_.empty()) {
-    SendMessageLocked(kCdsTypeUrl, "", nullptr, initial_message);
-    initial_message = false;
+  for (const auto& p : xds_client()->cluster_map_) {
+    Subscribe(kCdsTypeUrl, std::string(p.first));
   }
-  if (!xds_client()->endpoint_map_.empty()) {
-    SendMessageLocked(kEdsTypeUrl, "", nullptr, initial_message);
+  for (const auto& p : xds_client()->endpoint_map_) {
+    Subscribe(kEdsTypeUrl, std::string(p.first));
   }
   // Op: recv initial metadata.
   op = ops;
@@ -693,51 +776,49 @@ void XdsClient::ChannelState::AdsCallState::Orphan() {
   // we are here because xds_client has to orphan a failed call, then the
   // following cancellation will be a no-op.
   grpc_call_cancel(call_, nullptr);
+  state_map_.clear();
   // Note that the initial ref is hold by on_status_received_. So the
   // corresponding unref happens in on_status_received_ instead of here.
 }
 
 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
-    const std::string& type_url, const std::string& nonce_for_unsupported_type,
-    grpc_error* error_for_unsupported_type, bool is_first_message) {
+    const std::string& type_url) {
   // Buffer message sending if an existing message is in flight.
   if (send_message_payload_ != nullptr) {
-    buffered_request_map_[type_url].reset(new BufferedRequest(
-        nonce_for_unsupported_type, error_for_unsupported_type));
+    buffered_requests_.insert(type_url);
     return;
   }
-  grpc_slice request_payload_slice;
+  auto& state = state_map_[type_url];
+  grpc_error* error = state.error;
+  state.error = GRPC_ERROR_NONE;
   const XdsBootstrap::Node* node =
-      is_first_message ? xds_client()->bootstrap_->node() : nullptr;
+      sent_initial_message_ ? nullptr : xds_client()->bootstrap_->node();
   const char* build_version =
-      is_first_message ? xds_client()->build_version_.get() : nullptr;
+      sent_initial_message_ ? nullptr : xds_client()->build_version_.get();
+  sent_initial_message_ = true;
+  grpc_slice request_payload_slice;
   if (type_url == kLdsTypeUrl) {
     request_payload_slice = XdsLdsRequestCreateAndEncode(
-        xds_client()->server_name_, node, build_version,
-        lds_version_.version_info, lds_version_.nonce, lds_version_.error);
-    lds_version_.error = GRPC_ERROR_NONE;
-    GRPC_ERROR_UNREF(error_for_unsupported_type);
+        xds_client()->server_name_, node, build_version, state.version,
+        state.nonce, error);
+    state.subscribed_resources[xds_client()->server_name_]->Start(Ref());
   } else if (type_url == kRdsTypeUrl) {
     request_payload_slice = XdsRdsRequestCreateAndEncode(
-        xds_client()->route_config_name_, node, build_version,
-        rds_version_.version_info, rds_version_.nonce, rds_version_.error);
-    rds_version_.error = GRPC_ERROR_NONE;
-    GRPC_ERROR_UNREF(error_for_unsupported_type);
+        xds_client()->route_config_name_, node, build_version, state.version,
+        state.nonce, error);
+    state.subscribed_resources[xds_client()->route_config_name_]->Start(Ref());
   } else if (type_url == kCdsTypeUrl) {
     request_payload_slice = XdsCdsRequestCreateAndEncode(
-        xds_client()->WatchedClusterNames(), node, build_version,
-        cds_version_.version_info, cds_version_.nonce, cds_version_.error);
-    cds_version_.error = GRPC_ERROR_NONE;
-    GRPC_ERROR_UNREF(error_for_unsupported_type);
+        ClusterNamesForRequest(), node, build_version, state.version,
+        state.nonce, error);
   } else if (type_url == kEdsTypeUrl) {
     request_payload_slice = XdsEdsRequestCreateAndEncode(
-        xds_client()->EdsServiceNames(), node, build_version,
-        eds_version_.version_info, eds_version_.nonce, eds_version_.error);
-    eds_version_.error = GRPC_ERROR_NONE;
-    GRPC_ERROR_UNREF(error_for_unsupported_type);
+        EdsServiceNamesForRequest(), node, build_version, state.version,
+        state.nonce, error);
   } else {
     request_payload_slice = XdsUnsupportedTypeNackRequestCreateAndEncode(
-        type_url, nonce_for_unsupported_type, error_for_unsupported_type);
+        type_url, state.nonce, state.error);
+    state_map_.erase(type_url);
   }
   // Create message payload.
   send_message_payload_ =
@@ -761,8 +842,30 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
   }
 }
 
+void XdsClient::ChannelState::AdsCallState::Subscribe(
+    const std::string& type_url, const std::string& name) {
+  auto& state = state_map_[type_url].subscribed_resources[name];
+  if (state == nullptr) {
+    state = MakeOrphanable<ResourceState>(type_url, name);
+    SendMessageLocked(type_url);
+  }
+}
+
+void XdsClient::ChannelState::AdsCallState::Unsubscribe(
+    const std::string& type_url, const std::string& name) {
+  state_map_[type_url].subscribed_resources.erase(name);
+  SendMessageLocked(type_url);
+}
+
+bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
+  for (const auto& p : state_map_) {
+    if (!p.second.subscribed_resources.empty()) return true;
+  }
+  return false;
+}
+
 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
-    LdsUpdate lds_update, std::string new_version) {
+    LdsUpdate lds_update) {
   const std::string& cluster_name =
       lds_update.rds_update.has_value()
           ? lds_update.rds_update.value().cluster_name
@@ -775,6 +878,9 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
             xds_client(), lds_update.route_config_name.c_str(),
             cluster_name.c_str());
   }
+  auto& lds_state = state_map_[kLdsTypeUrl];
+  auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
+  if (state != nullptr) state->Finish();
   // Ignore identical update.
   if (xds_client()->route_config_name_ == lds_update.route_config_name &&
       xds_client()->cluster_name_ == cluster_name) {
@@ -802,19 +908,22 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
     }
   } else {
     // Send RDS request for dynamic resolution.
-    SendMessageLocked(kRdsTypeUrl, "", nullptr, false);
+    Subscribe(kRdsTypeUrl, xds_client()->route_config_name_);
   }
-  lds_version_.version_info = std::move(new_version);
 }
 
 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
-    RdsUpdate rds_update, std::string new_version) {
+    RdsUpdate rds_update) {
   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
     gpr_log(GPR_INFO,
             "[xds_client %p] RDS update received: "
             "cluster_name=%s",
             xds_client(), rds_update.cluster_name.c_str());
   }
+  auto& rds_state = state_map_[kRdsTypeUrl];
+  auto& state =
+      rds_state.subscribed_resources[xds_client()->route_config_name_];
+  if (state != nullptr) state->Finish();
   // Ignore identical update.
   if (xds_client()->cluster_name_ == rds_update.cluster_name) {
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
@@ -835,14 +944,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
   } else {
     xds_client()->service_config_watcher_->OnError(error);
   }
-  rds_version_.version_info = std::move(new_version);
 }
 
 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
-    CdsUpdateMap cds_update_map, std::string new_version) {
+    CdsUpdateMap cds_update_map) {
+  auto& cds_state = state_map_[kCdsTypeUrl];
   for (auto& p : cds_update_map) {
     const char* cluster_name = p.first.c_str();
     CdsUpdate& cds_update = p.second;
+    auto& state = cds_state.subscribed_resources[cluster_name];
+    if (state != nullptr) state->Finish();
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
       gpr_log(GPR_INFO,
               "[xds_client %p] CDS update (cluster=%s) received: "
@@ -875,14 +986,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
       p.first->OnClusterChanged(cluster_state.update.value());
     }
   }
-  cds_version_.version_info = std::move(new_version);
 }
 
 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
-    EdsUpdateMap eds_update_map, std::string new_version) {
+    EdsUpdateMap eds_update_map) {
+  auto& eds_state = state_map_[kEdsTypeUrl];
   for (auto& p : eds_update_map) {
     const char* eds_service_name = p.first.c_str();
     EdsUpdate& eds_update = p.second;
+    auto& state = eds_state.subscribed_resources[eds_service_name];
+    if (state != nullptr) state->Finish();
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
       gpr_log(GPR_INFO,
               "[xds_client %p] EDS response with %" PRIuPTR
@@ -956,7 +1069,6 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
       p.first->OnEndpointChanged(endpoint_state.update);
     }
   }
-  eds_version_.version_info = std::move(new_version);
 }
 
 void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
@@ -977,22 +1089,17 @@ void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
     self->send_message_payload_ = nullptr;
     // Continue to send another pending message if any.
     // TODO(roth): The current code to handle buffered messages has the
-    // advantage of sending only the most recent list of resource names for each
-    // resource type (no matter how many times that resource type has been
-    // requested to send while the current message sending is still pending).
-    // But its disadvantage is that we send the requests in fixed order of
-    // resource types. We need to fix this if we are seeing some resource
-    // type(s) starved due to frequent requests of other resource type(s).
-    for (auto& p : self->buffered_request_map_) {
-      const std::string& type_url = p.first;
-      std::unique_ptr<BufferedRequest>& buffered_request = p.second;
-      if (buffered_request != nullptr) {
-        self->SendMessageLocked(type_url, buffered_request->nonce,
-                                buffered_request->error, false);
-        buffered_request->error = GRPC_ERROR_NONE;
-        buffered_request.reset();
-        break;
-      }
+    // advantage of sending only the most recent list of resource names for
+    // each resource type (no matter how many times that resource type has
+    // been requested to send while the current message sending is still
+    // pending). But its disadvantage is that we send the requests in fixed
+    // order of resource types. We need to fix this if we are seeing some
+    // resource type(s) starved due to frequent requests of other resource
+    // type(s).
+    auto it = self->buffered_requests_.begin();
+    if (it != self->buffered_requests_.end()) {
+      self->SendMessageLocked(*it);
+      self->buffered_requests_.erase(it);
     }
   }
   self->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
@@ -1043,8 +1150,8 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
   // Note that XdsAdsResponseDecodeAndParse() also validate the response.
   grpc_error* parse_error = XdsAdsResponseDecodeAndParse(
       response_slice, xds_client->server_name_, xds_client->route_config_name_,
-      xds_client->EdsServiceNames(), &lds_update, &rds_update, &cds_update_map,
-      &eds_update_map, &version, &nonce, &type_url);
+      ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update,
+      &cds_update_map, &eds_update_map, &version, &nonce, &type_url);
   grpc_slice_unref_internal(response_slice);
   if (type_url.empty()) {
     // Ignore unparsable response.
@@ -1052,48 +1159,34 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
             xds_client, grpc_error_string(parse_error));
     GRPC_ERROR_UNREF(parse_error);
   } else {
-    // Update nonce and error.
-    if (type_url == kLdsTypeUrl) {
-      ads_calld->lds_version_.nonce = nonce;
-      GRPC_ERROR_UNREF(ads_calld->lds_version_.error);
-      ads_calld->lds_version_.error = GRPC_ERROR_REF(parse_error);
-    } else if (type_url == kRdsTypeUrl) {
-      ads_calld->rds_version_.nonce = nonce;
-      GRPC_ERROR_UNREF(ads_calld->rds_version_.error);
-      ads_calld->rds_version_.error = GRPC_ERROR_REF(parse_error);
-    } else if (type_url == kCdsTypeUrl) {
-      ads_calld->cds_version_.nonce = nonce;
-      GRPC_ERROR_UNREF(ads_calld->cds_version_.error);
-      ads_calld->cds_version_.error = GRPC_ERROR_REF(parse_error);
-    } else if (type_url == kEdsTypeUrl) {
-      ads_calld->eds_version_.nonce = nonce;
-      GRPC_ERROR_UNREF(ads_calld->eds_version_.error);
-      ads_calld->eds_version_.error = GRPC_ERROR_REF(parse_error);
-    }
+    // Update nonce.
+    auto& state = ads_calld->state_map_[type_url];
+    state.nonce = std::move(nonce);
     // NACK or ACK the response.
     if (parse_error != GRPC_ERROR_NONE) {
+      GRPC_ERROR_UNREF(state.error);
+      state.error = parse_error;
       // NACK unacceptable update.
       gpr_log(
           GPR_ERROR,
           "[xds_client %p] ADS response can't be accepted, NACKing. error=%s",
           xds_client, grpc_error_string(parse_error));
-      ads_calld->SendMessageLocked(type_url, nonce, parse_error, false);
+      ads_calld->SendMessageLocked(type_url);
     } else {
       ads_calld->seen_response_ = true;
       // Accept the ADS response according to the type_url.
       if (type_url == kLdsTypeUrl) {
-        ads_calld->AcceptLdsUpdate(std::move(lds_update), std::move(version));
+        ads_calld->AcceptLdsUpdate(std::move(lds_update));
       } else if (type_url == kRdsTypeUrl) {
-        ads_calld->AcceptRdsUpdate(std::move(rds_update), std::move(version));
+        ads_calld->AcceptRdsUpdate(std::move(rds_update));
       } else if (type_url == kCdsTypeUrl) {
-        ads_calld->AcceptCdsUpdate(std::move(cds_update_map),
-                                   std::move(version));
+        ads_calld->AcceptCdsUpdate(std::move(cds_update_map));
       } else if (type_url == kEdsTypeUrl) {
-        ads_calld->AcceptEdsUpdate(std::move(eds_update_map),
-                                   std::move(version));
+        ads_calld->AcceptEdsUpdate(std::move(eds_update_map));
       }
+      state.version = std::move(version);
       // ACK the update.
-      ads_calld->SendMessageLocked(type_url, nonce, nullptr, false);
+      ads_calld->SendMessageLocked(type_url);
       // Start load reporting if needed.
       auto& lrs_call = ads_calld->chand()->lrs_calld_;
       if (lrs_call != nullptr) {
@@ -1164,6 +1257,28 @@ bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
   return this == chand()->ads_calld_->calld();
 }
 
+std::set<StringView>
+XdsClient::ChannelState::AdsCallState::ClusterNamesForRequest() {
+  std::set<StringView> cluster_names;
+  for (auto& p : state_map_[kCdsTypeUrl].subscribed_resources) {
+    cluster_names.insert(p.first);
+    OrphanablePtr<ResourceState>& state = p.second;
+    state->Start(Ref());
+  }
+  return cluster_names;
+}
+
+std::set<StringView>
+XdsClient::ChannelState::AdsCallState::EdsServiceNamesForRequest() {
+  std::set<StringView> eds_names;
+  for (auto& p : state_map_[kEdsTypeUrl].subscribed_resources) {
+    eds_names.insert(p.first);
+    OrphanablePtr<ResourceState>& state = p.second;
+    state->Start(Ref());
+  }
+  return eds_names;
+}
+
 //
 // XdsClient::ChannelState::LrsCallState::Reporter
 //
@@ -1584,6 +1699,12 @@ bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
 
 namespace {
 
+grpc_millis GetRequestTimeout(const grpc_channel_args& args) {
+  return grpc_channel_args_find_integer(
+      &args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
+      {15000, 0, INT_MAX});
+}
+
 UniquePtr<char> GenerateBuildVersionString() {
   char* build_version_str;
   gpr_asprintf(&build_version_str, "gRPC C-core %s %s", grpc_version_string(),
@@ -1598,6 +1719,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
                      std::unique_ptr<ServiceConfigWatcherInterface> watcher,
                      const grpc_channel_args& channel_args, grpc_error** error)
     : InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
+      request_timeout_(GetRequestTimeout(channel_args)),
       build_version_(GenerateBuildVersionString()),
       combiner_(GRPC_COMBINER_REF(combiner, "xds_client")),
       interested_parties_(interested_parties),
@@ -1618,7 +1740,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
   chand_ = MakeOrphanable<ChannelState>(
       Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel_args);
   if (service_config_watcher_ != nullptr) {
-    chand_->OnResourceNamesChanged(kLdsTypeUrl);
+    chand_->Subscribe(kLdsTypeUrl, std::string(server_name));
   }
 }
 
@@ -1634,8 +1756,8 @@ void XdsClient::Orphan() {
 
 void XdsClient::WatchClusterData(
     StringView cluster_name, std::unique_ptr<ClusterWatcherInterface> watcher) {
-  const bool new_name = cluster_map_.find(cluster_name) == cluster_map_.end();
-  ClusterState& cluster_state = cluster_map_[cluster_name];
+  std::string cluster_name_str = std::string(cluster_name);
+  ClusterState& cluster_state = cluster_map_[cluster_name_str];
   ClusterWatcherInterface* w = watcher.get();
   cluster_state.watchers[w] = std::move(watcher);
   // If we've already received an CDS update, notify the new watcher
@@ -1643,30 +1765,29 @@ void XdsClient::WatchClusterData(
   if (cluster_state.update.has_value()) {
     w->OnClusterChanged(cluster_state.update.value());
   }
-  if (new_name) chand_->OnResourceNamesChanged(kCdsTypeUrl);
+  chand_->Subscribe(kCdsTypeUrl, cluster_name_str);
 }
 
 void XdsClient::CancelClusterDataWatch(StringView cluster_name,
                                        ClusterWatcherInterface* watcher) {
   if (shutting_down_) return;
-  ClusterState& cluster_state = cluster_map_[cluster_name];
+  std::string cluster_name_str = std::string(cluster_name);
+  ClusterState& cluster_state = cluster_map_[cluster_name_str];
   auto it = cluster_state.watchers.find(watcher);
   if (it != cluster_state.watchers.end()) {
     cluster_state.watchers.erase(it);
     if (cluster_state.watchers.empty()) {
-      cluster_map_.erase(cluster_name);
-      chand_->OnResourceNamesChanged(kCdsTypeUrl);
+      cluster_map_.erase(cluster_name_str);
+      chand_->Unsubscribe(kCdsTypeUrl, cluster_name_str);
     }
   }
-  chand_->OnWatcherRemoved();
 }
 
 void XdsClient::WatchEndpointData(
     StringView eds_service_name,
     std::unique_ptr<EndpointWatcherInterface> watcher) {
-  const bool new_name =
-      endpoint_map_.find(eds_service_name) == endpoint_map_.end();
-  EndpointState& endpoint_state = endpoint_map_[eds_service_name];
+  std::string eds_service_name_str = std::string(eds_service_name);
+  EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
   EndpointWatcherInterface* w = watcher.get();
   endpoint_state.watchers[w] = std::move(watcher);
   // If we've already received an EDS update, notify the new watcher
@@ -1674,28 +1795,28 @@ void XdsClient::WatchEndpointData(
   if (!endpoint_state.update.priority_list_update.empty()) {
     w->OnEndpointChanged(endpoint_state.update);
   }
-  if (new_name) chand_->OnResourceNamesChanged(kEdsTypeUrl);
+  chand_->Subscribe(kEdsTypeUrl, eds_service_name_str);
 }
 
 void XdsClient::CancelEndpointDataWatch(StringView eds_service_name,
                                         EndpointWatcherInterface* watcher) {
   if (shutting_down_) return;
-  EndpointState& endpoint_state = endpoint_map_[eds_service_name];
+  std::string eds_service_name_str = std::string(eds_service_name);
+  EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
   auto it = endpoint_state.watchers.find(watcher);
   if (it != endpoint_state.watchers.end()) {
     endpoint_state.watchers.erase(it);
     if (endpoint_state.watchers.empty()) {
-      endpoint_map_.erase(eds_service_name);
-      chand_->OnResourceNamesChanged(kEdsTypeUrl);
+      endpoint_map_.erase(eds_service_name_str);
+      chand_->Unsubscribe(kEdsTypeUrl, eds_service_name_str);
     }
   }
-  chand_->OnWatcherRemoved();
 }
 
 void XdsClient::AddClientStats(StringView /*lrs_server*/,
                                StringView cluster_name,
                                XdsClientStats* client_stats) {
-  EndpointState& endpoint_state = endpoint_map_[cluster_name];
+  EndpointState& endpoint_state = endpoint_map_[std::string(cluster_name)];
   // TODO(roth): When we add support for direct federation, use the
   // server name specified in lrs_server.
   endpoint_state.client_stats.insert(client_stats);
@@ -1705,7 +1826,7 @@ void XdsClient::AddClientStats(StringView /*lrs_server*/,
 void XdsClient::RemoveClientStats(StringView /*lrs_server*/,
                                   StringView cluster_name,
                                   XdsClientStats* client_stats) {
-  EndpointState& endpoint_state = endpoint_map_[cluster_name];
+  EndpointState& endpoint_state = endpoint_map_[std::string(cluster_name)];
   // TODO(roth): When we add support for direct federation, use the
   // server name specified in lrs_server.
   // TODO(roth): In principle, we should try to send a final load report
@@ -1745,32 +1866,11 @@ grpc_error* XdsClient::CreateServiceConfig(
   return error;
 }
 
-std::set<StringView> XdsClient::WatchedClusterNames() const {
-  std::set<StringView> cluster_names;
-  for (const auto& p : cluster_map_) {
-    const StringView& cluster_name = p.first;
-    const ClusterState& cluster_state = p.second;
-    // Don't request for the clusters that are cached before watched.
-    if (cluster_state.watchers.empty()) continue;
-    cluster_names.emplace(cluster_name);
-  }
-  return cluster_names;
-}
-
-std::set<StringView> XdsClient::EdsServiceNames() const {
-  std::set<StringView> eds_service_names;
-  for (const auto& p : endpoint_map_) {
-    const StringView& eds_service_name = p.first;
-    eds_service_names.emplace(eds_service_name);
-  }
-  return eds_service_names;
-}
-
-std::map<StringView, std::set<XdsClientStats*>> XdsClient::ClientStatsMap()
-    const {
-  std::map<StringView, std::set<XdsClientStats*>> client_stats_map;
+std::map<StringView, std::set<XdsClientStats*>, StringLess>
+XdsClient::ClientStatsMap() const {
+  std::map<StringView, std::set<XdsClientStats*>, StringLess> client_stats_map;
   for (const auto& p : endpoint_map_) {
-    const StringView& cluster_name = p.first;
+    const StringView cluster_name = p.first;
     const auto& client_stats = p.second.client_stats;
     if (chand_->lrs_calld()->ShouldSendLoadReports(cluster_name)) {
       client_stats_map.emplace(cluster_name, client_stats);
diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h
index 3e7df5b1581..0552440cd8c 100644
--- a/src/core/ext/filters/client_channel/xds/xds_client.h
+++ b/src/core/ext/filters/client_channel/xds/xds_client.h
@@ -153,8 +153,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
     void StartConnectivityWatchLocked();
     void CancelConnectivityWatchLocked();
 
-    void OnResourceNamesChanged(const std::string& type_url);
-    void OnWatcherRemoved();
+    void Subscribe(const std::string& type_url, const std::string& name);
+    void Unsubscribe(const std::string& type_url, const std::string& name);
 
    private:
     class StateWatcher;
@@ -195,11 +195,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
       const std::string& cluster_name,
       RefCountedPtr<ServiceConfig>* service_config) const;
 
-  std::set<StringView> WatchedClusterNames() const;
-
-  std::set<StringView> EdsServiceNames() const;
-
-  std::map<StringView, std::set<XdsClientStats*>> ClientStatsMap() const;
+  std::map<StringView, std::set<XdsClientStats*>, StringLess> ClientStatsMap()
+      const;
 
   // Channel arg vtable functions.
   static void* ChannelArgCopy(void* p);
@@ -208,6 +205,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
 
   static const grpc_arg_pointer_vtable kXdsClientVtable;
 
+  const grpc_millis request_timeout_;
+
   grpc_core::UniquePtr<char> build_version_;
 
   Combiner* combiner_;
@@ -225,10 +224,9 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
   std::string route_config_name_;
   std::string cluster_name_;
   // All the received clusters are cached, no matter they are watched or not.
-  std::map<StringView /*cluster_name*/, ClusterState, StringLess> cluster_map_;
+  std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
   // Only the watched EDS service names are stored.
-  std::map<StringView /*eds_service_name*/, EndpointState, StringLess>
-      endpoint_map_;
+  std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
 
   bool shutting_down_ = false;
 };
diff --git a/src/core/lib/gprpp/string_view.h b/src/core/lib/gprpp/string_view.h
index 5b03616c280..a718b79231c 100644
--- a/src/core/lib/gprpp/string_view.h
+++ b/src/core/lib/gprpp/string_view.h
@@ -75,6 +75,11 @@ class StringView final {
       : StringView(ptr, ptr == nullptr ? 0 : strlen(ptr)) {}
   constexpr StringView() : StringView(nullptr, 0) {}
 
+  template <typename Allocator>
+  StringView(
+      const std::basic_string<char, std::char_traits<char>, Allocator>& str)
+      : StringView(str.data(), str.size()) {}
+
   constexpr const char* data() const { return ptr_; }
   constexpr size_t size() const { return size_; }
   constexpr bool empty() const { return size_ == 0; }
diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc
index 58fd8fcd78f..71977f82493 100644
--- a/test/cpp/end2end/xds_end2end_test.cc
+++ b/test/cpp/end2end/xds_end2end_test.cc
@@ -427,6 +427,7 @@ class AdsServiceImpl : public AdsService {
     const std::string version_str = "version_1";
     const std::string nonce_str = "nonce_1";
     grpc_core::MutexLock lock(&ads_mu_);
+    if (lds_ignore_) return;
     if (lds_response_state_ == NOT_SENT) {
       DiscoveryResponse response;
       response.set_type_url(kLdsTypeUrl);
@@ -452,6 +453,7 @@ class AdsServiceImpl : public AdsService {
     const std::string version_str = "version_1";
     const std::string nonce_str = "nonce_1";
     grpc_core::MutexLock lock(&ads_mu_);
+    if (rds_ignore_) return;
     if (rds_response_state_ == NOT_SENT) {
       DiscoveryResponse response;
       response.set_type_url(kRdsTypeUrl);
@@ -477,6 +479,7 @@ class AdsServiceImpl : public AdsService {
     const std::string version_str = "version_1";
     const std::string nonce_str = "nonce_1";
     grpc_core::MutexLock lock(&ads_mu_);
+    if (cds_ignore_) return;
     if (cds_response_state_ == NOT_SENT) {
       DiscoveryResponse response;
       response.set_type_url(kCdsTypeUrl);
@@ -503,6 +506,7 @@ class AdsServiceImpl : public AdsService {
     std::vector<ResponseDelayPair> responses_and_delays;
     {
       grpc_core::MutexLock lock(&ads_mu_);
+      if (eds_ignore_) return;
       responses_and_delays = eds_responses_and_delays_;
     }
     // Send response.
@@ -538,8 +542,13 @@ class AdsServiceImpl : public AdsService {
       // resource names). It's not causing a big problem now but should be
       // fixed.
       bool eds_sent = false;
+      bool seen_first_request = false;
       while (!eds_sent || cds_response_state_ == SENT) {
         if (!stream->Read(&request)) return;
+        if (!seen_first_request) {
+          EXPECT_TRUE(request.has_node());
+          seen_first_request = true;
+        }
         if (request.type_url() == kLdsTypeUrl) {
           HandleLdsRequest(&request, stream);
         } else if (request.type_url() == kRdsTypeUrl) {
@@ -585,23 +594,31 @@ class AdsServiceImpl : public AdsService {
     lds_response_data_ = std::move(lds_response_data);
   }
 
+  void set_lds_ignore() { lds_ignore_ = true; }
+
   void SetRdsResponse(
       std::map<std::string /*route_config_name*/, RouteConfiguration>
           rds_response_data) {
     rds_response_data_ = std::move(rds_response_data);
   }
 
+  void set_rds_ignore() { rds_ignore_ = true; }
+
   void SetCdsResponse(
       std::map<std::string /*cluster_name*/, Cluster> cds_response_data) {
     cds_response_data_ = std::move(cds_response_data);
   }
 
+  void set_cds_ignore() { cds_ignore_ = true; }
+
   void AddEdsResponse(const DiscoveryResponse& response, int send_after_ms) {
     grpc_core::MutexLock lock(&ads_mu_);
     eds_responses_and_delays_.push_back(
         std::make_pair(response, send_after_ms));
   }
 
+  void set_eds_ignore() { eds_ignore_ = true; }
+
   void SetLdsToUseDynamicRds() {
     auto listener = default_listener_;
     HttpConnectionManager http_connection_manager;
@@ -701,17 +718,21 @@ class AdsServiceImpl : public AdsService {
   Listener default_listener_;
   std::map<std::string /*server_name*/, Listener> lds_response_data_;
   ResponseState lds_response_state_ = NOT_SENT;
+  bool lds_ignore_ = false;
   // RDS response data.
   RouteConfiguration default_route_config_;
   std::map<std::string /*route_config_name*/, RouteConfiguration>
       rds_response_data_;
   ResponseState rds_response_state_ = NOT_SENT;
+  bool rds_ignore_ = false;
   // CDS response data.
   Cluster default_cluster_;
   std::map<std::string /*cluster_name*/, Cluster> cds_response_data_;
   ResponseState cds_response_state_ = NOT_SENT;
+  bool cds_ignore_ = false;
   // EDS response data.
   std::vector<ResponseDelayPair> eds_responses_and_delays_;
+  bool eds_ignore_ = false;
 };
 
 class LrsServiceImpl : public LrsService {
@@ -895,7 +916,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
   void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); }
 
   void ResetStub(int fallback_timeout = 0, int failover_timeout = 0,
-                 const grpc::string& expected_targets = "") {
+                 const grpc::string& expected_targets = "",
+                 int xds_resource_does_not_exist_timeout = 0) {
     ChannelArguments args;
     // TODO(juanlishen): Add setter to ChannelArguments.
     if (fallback_timeout > 0) {
@@ -904,6 +926,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
     if (failover_timeout > 0) {
       args.SetInt(GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, failover_timeout);
     }
+    if (xds_resource_does_not_exist_timeout > 0) {
+      args.SetInt(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
+                  xds_resource_does_not_exist_timeout);
+    }
     // If the parent channel is using the fake resolver, we inject the
     // response generator for the parent here, and then SetNextResolution()
     // will inject the xds channel's response generator via the parent's
@@ -1625,6 +1651,15 @@ TEST_P(LdsTest, RouteActionHasNoCluster) {
             AdsServiceImpl::NACKED);
 }
 
+// Tests that LDS client times out when no response received.
+TEST_P(LdsTest, Timeout) {
+  ResetStub(0, 0, "", 500);
+  balancers_[0]->ads_service()->set_lds_ignore();
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  CheckRpcSendFailure();
+}
+
 using RdsTest = BasicTest;
 
 // Tests that RDS client should send an ACK upon correct RDS response.
@@ -1751,6 +1786,16 @@ TEST_P(RdsTest, RouteActionHasNoCluster) {
             AdsServiceImpl::NACKED);
 }
 
+// Tests that RDS client times out when no response received.
+TEST_P(RdsTest, Timeout) {
+  ResetStub(0, 0, "", 500);
+  balancers_[0]->ads_service()->SetLdsToUseDynamicRds();
+  balancers_[0]->ads_service()->set_rds_ignore();
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  CheckRpcSendFailure();
+}
+
 using CdsTest = BasicTest;
 
 // Tests that CDS client should send an ACK upon correct CDS response.
@@ -1818,6 +1863,27 @@ TEST_P(CdsTest, WrongLrsServer) {
             AdsServiceImpl::NACKED);
 }
 
+// Tests that CDS client times out when no response received.
+TEST_P(CdsTest, Timeout) {
+  ResetStub(0, 0, "", 500);
+  balancers_[0]->ads_service()->set_cds_ignore();
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  CheckRpcSendFailure();
+}
+
+using EdsTest = BasicTest;
+
+// TODO(roth): Add tests showing that RPCs fail when EDS data is invalid.
+
+TEST_P(EdsTest, Timeout) {
+  ResetStub(0, 0, "", 500);
+  balancers_[0]->ads_service()->set_eds_ignore();
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  CheckRpcSendFailure();
+}
+
 using LocalityMapTest = BasicTest;
 
 // Tests that the localities in a locality map are picked according to their
@@ -3031,6 +3097,13 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest,
                                            TestType(true, true)),
                          &TestTypeName);
 
+// EDS could be tested with or without XdsResolver, but the tests would
+// be the same either way, so we test it only with XdsResolver.
+INSTANTIATE_TEST_SUITE_P(XdsTest, EdsTest,
+                         ::testing::Values(TestType(true, false),
+                                           TestType(true, true)),
+                         &TestTypeName);
+
 INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest,
                          ::testing::Values(TestType(false, true),
                                            TestType(false, false),