From d7a91d59bbde2b14ab398578f59a3d22c70ebcab Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 27 Jan 2020 11:27:44 -0800 Subject: [PATCH] Support xds request timeouts. --- include/grpc/impl/codegen/grpc_types.h | 5 + .../resolver/xds/xds_resolver.cc | 2 + .../ext/filters/client_channel/xds/xds_api.cc | 6 +- .../ext/filters/client_channel/xds/xds_api.h | 15 +- .../filters/client_channel/xds/xds_client.cc | 446 +++++++++++------- .../filters/client_channel/xds/xds_client.h | 18 +- src/core/lib/gprpp/string_view.h | 5 + test/cpp/end2end/xds_end2end_test.cc | 75 ++- 8 files changed, 372 insertions(+), 200 deletions(-) diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 89fd15faf1a..ab4c39f9310 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -360,6 +360,11 @@ typedef struct { of that priority fail to connect. If 0, failover happens immediately. Default value is 10 seconds. */ #define GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS "grpc.xds_failover_timeout_ms" +/* Timeout in milliseconds to wait for a resource to be returned from + * the xds server before assuming that it does not exist. + * The default is 15 seconds. */ +#define GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS \ + "grpc.xds_resource_does_not_exist_timeout_ms" /** If non-zero, grpc server's cronet compression workaround will be enabled */ #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \ "grpc.workaround.cronet_compression" diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index 32f5e5c3523..e7eed60c446 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -68,6 +68,7 @@ class XdsResolver : public Resolver { void XdsResolver::ServiceConfigWatcher::OnServiceConfigChanged( RefCountedPtr service_config) { + if (resolver_->xds_client_ == nullptr) return; grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg(); Result result; result.args = @@ -77,6 +78,7 @@ void XdsResolver::ServiceConfigWatcher::OnServiceConfigChanged( } void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) { + if (resolver_->xds_client_ == nullptr) return; grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg(); Result result; result.args = diff --git a/src/core/ext/filters/client_channel/xds/xds_api.cc b/src/core/ext/filters/client_channel/xds/xds_api.cc index 5b27c5d5904..dc7d10a7f4c 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.cc +++ b/src/core/ext/filters/client_channel/xds/xds_api.cc @@ -1119,10 +1119,12 @@ void LocalityStatsPopulate( } // namespace grpc_slice XdsLrsRequestCreateAndEncode( - std::map> client_stats_map) { + std::map, StringLess> + client_stats_map) { upb::Arena arena; // Get the snapshots. - std::map> + std::map, + StringLess> snapshot_map; for (auto& p : client_stats_map) { const StringView& cluster_name = p.first; diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h index 48c7282772f..f98d32b1e38 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.h +++ b/src/core/ext/filters/client_channel/xds/xds_api.h @@ -41,19 +41,6 @@ constexpr char kCdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster"; constexpr char kEdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"; -// The version state for each specific ADS resource type. -struct VersionState { - // The version of the latest response that is accepted and used. - std::string version_info; - // The nonce of the latest response. - std::string nonce; - // The error message to be included in a NACK with the nonce. Consumed when a - // nonce is NACK'ed for the first time. - grpc_error* error = GRPC_ERROR_NONE; - - ~VersionState() { GRPC_ERROR_UNREF(error); } -}; - struct RdsUpdate { // The name to use in the CDS request. std::string cluster_name; @@ -246,7 +233,7 @@ grpc_slice XdsLrsRequestCreateAndEncode(const std::string& server_name, // Creates an LRS request sending client-side load reports. If all the counters // are zero, returns empty slice. grpc_slice XdsLrsRequestCreateAndEncode( - std::map> + std::map, StringLess> client_stats_map); // Parses the LRS response and returns \a diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 52bda5aa8f3..7e3c7ba04ec 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -125,31 +125,122 @@ class XdsClient::ChannelState::AdsCallState XdsClient* xds_client() const { return chand()->xds_client(); } bool seen_response() const { return seen_response_; } - // If \a type_url is an unsupported type, \a nonce_for_unsupported_type and - // \a error_for_unsupported_type will be used in the request; otherwise, the - // nonce and error stored in each ADS call state will be used. Takes ownership - // of \a error_for_unsupported_type. - void SendMessageLocked(const std::string& type_url, - const std::string& nonce_for_unsupported_type, - grpc_error* error_for_unsupported_type, - bool is_first_message); + void Subscribe(const std::string& type_url, const std::string& name); + void Unsubscribe(const std::string& type_url, const std::string& name); + + bool HasSubscribedResources() const; private: - struct BufferedRequest { - std::string nonce; - grpc_error* error; + class ResourceState : public InternallyRefCounted { + public: + ResourceState(const std::string& type_url, const std::string& name) + : type_url_(type_url), name_(name) { + GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this, + grpc_schedule_on_exec_ctx); + } + + void Orphan() override { + Finish(); + Unref(); + } + + void Start(RefCountedPtr ads_calld) { + if (sent_) return; + sent_ = true; + ads_calld_ = std::move(ads_calld); + Ref().release(); + timer_pending_ = true; + grpc_timer_init( + &timer_, + ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_, + &timer_callback_); + } + + void Finish() { + if (timer_pending_) { + grpc_timer_cancel(&timer_); + timer_pending_ = false; + } + } - // Takes ownership of \a error. - BufferedRequest(std::string nonce, grpc_error* error) - : nonce(std::move(nonce)), error(error) {} + private: + static void OnTimer(void* arg, grpc_error* error) { + ResourceState* self = static_cast(arg); + self->ads_calld_->xds_client()->combiner_->Run( + GRPC_CLOSURE_INIT(&self->timer_callback_, OnTimerLocked, self, + nullptr), + GRPC_ERROR_REF(error)); + } - ~BufferedRequest() { GRPC_ERROR_UNREF(error); } + static void OnTimerLocked(void* arg, grpc_error* error) { + ResourceState* self = static_cast(arg); + if (error == GRPC_ERROR_NONE && self->timer_pending_) { + self->timer_pending_ = false; + char* msg; + gpr_asprintf( + &msg, + "timeout obtaining resource {type=%s name=%s} from xds server", + self->type_url_.c_str(), self->name_.c_str()); + grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + gpr_free(msg); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "[xds_client %p] %s", + self->ads_calld_->xds_client(), grpc_error_string(error)); + } + if (self->type_url_ == kLdsTypeUrl || self->type_url_ == kRdsTypeUrl) { + self->ads_calld_->xds_client()->service_config_watcher_->OnError( + error); + } else if (self->type_url_ == kCdsTypeUrl) { + ClusterState& state = + self->ads_calld_->xds_client()->cluster_map_[self->name_]; + for (const auto& p : state.watchers) { + p.first->OnError(GRPC_ERROR_REF(error)); + } + GRPC_ERROR_UNREF(error); + } else if (self->type_url_ == kEdsTypeUrl) { + EndpointState& state = + self->ads_calld_->xds_client()->endpoint_map_[self->name_]; + for (const auto& p : state.watchers) { + p.first->OnError(GRPC_ERROR_REF(error)); + } + GRPC_ERROR_UNREF(error); + } else { + GPR_UNREACHABLE_CODE(return ); + } + } + self->ads_calld_.reset(); + self->Unref(); + } + + const std::string type_url_; + const std::string name_; + + RefCountedPtr ads_calld_; + bool sent_ = false; + bool timer_pending_ = false; + grpc_timer timer_; + grpc_closure timer_callback_; }; - void AcceptLdsUpdate(LdsUpdate lds_update, std::string new_version); - void AcceptRdsUpdate(RdsUpdate rds_update, std::string new_version); - void AcceptCdsUpdate(CdsUpdateMap cds_update_map, std::string new_version); - void AcceptEdsUpdate(EdsUpdateMap eds_update_map, std::string new_version); + struct ResourceTypeState { + ~ResourceTypeState() { GRPC_ERROR_UNREF(error); } + + // Version, nonce, and error for this resource type. + std::string version; + std::string nonce; + grpc_error* error = GRPC_ERROR_NONE; + + // Subscribed resources of this type. + std::map> + subscribed_resources; + }; + + void SendMessageLocked(const std::string& type_url); + + void AcceptLdsUpdate(LdsUpdate lds_update); + void AcceptRdsUpdate(RdsUpdate rds_update); + void AcceptCdsUpdate(CdsUpdateMap cds_update_map); + void AcceptEdsUpdate(EdsUpdateMap eds_update_map); static void OnRequestSent(void* arg, grpc_error* error); static void OnRequestSentLocked(void* arg, grpc_error* error); @@ -160,8 +251,13 @@ class XdsClient::ChannelState::AdsCallState bool IsCurrentCallOnChannel() const; + std::set ClusterNamesForRequest(); + std::set EdsServiceNamesForRequest(); + // The owning RetryableCall<>. RefCountedPtr> parent_; + + bool sent_initial_message_ = false; bool seen_response_ = false; // Always non-NULL. @@ -184,15 +280,11 @@ class XdsClient::ChannelState::AdsCallState grpc_slice status_details_; grpc_closure on_status_received_; - // Version state. - VersionState lds_version_; - VersionState rds_version_; - VersionState cds_version_; - VersionState eds_version_; + // Resource types for which requests need to be sent. + std::set buffered_requests_; - // Buffered requests. - std::map> - buffered_request_map_; + // State for each resource type. + std::map state_map_; }; // Contains an LRS call to the xds server. @@ -445,31 +537,30 @@ void XdsClient::ChannelState::CancelConnectivityWatchLocked() { grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_); } -void XdsClient::ChannelState::OnResourceNamesChanged( - const std::string& type_url) { +void XdsClient::ChannelState::Subscribe(const std::string& type_url, + const std::string& name) { if (ads_calld_ == nullptr) { // Start the ADS call if this is the first request. ads_calld_.reset(new RetryableCall( Ref(DEBUG_LOCATION, "ChannelState+ads"))); - // Note: AdsCallState's ctor will automatically send necessary messages, so - // we can return here. + // Note: AdsCallState's ctor will automatically subscribe to all + // resources that the XdsClient already has watchers for, so we can + // return here. return; } // If the ADS call is in backoff state, we don't need to do anything now // because when the call is restarted it will resend all necessary requests. if (ads_calld() == nullptr) return; - // Send the message if the ADS call is active. - ads_calld()->SendMessageLocked(type_url, "", nullptr, false); + // Subscribe to this resource if the ADS call is active. + ads_calld()->Subscribe(type_url, name); } -void XdsClient::ChannelState::OnWatcherRemoved() { - // Keep the ADS call if there are watcher(s). - for (const auto& p : xds_client()->cluster_map_) { - const ClusterState& cluster_state = p.second; - if (!cluster_state.watchers.empty()) return; +void XdsClient::ChannelState::Unsubscribe(const std::string& type_url, + const std::string& name) { + if (ads_calld_ != nullptr) { + ads_calld_->calld()->Unsubscribe(type_url, name); + if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset(); } - if (!xds_client()->endpoint_map_.empty()) return; - ads_calld_.reset(); } // @@ -620,22 +711,14 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( // Op: send request message. GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, grpc_schedule_on_exec_ctx); - bool initial_message = true; if (xds_client()->service_config_watcher_ != nullptr) { - if (xds_client()->route_config_name_.empty()) { - SendMessageLocked(kLdsTypeUrl, "", nullptr, initial_message); - initial_message = false; - } else if (xds_client()->cluster_name_.empty()) { - SendMessageLocked(kRdsTypeUrl, "", nullptr, initial_message); - initial_message = false; - } + Subscribe(kLdsTypeUrl, xds_client()->server_name_); } - if (!xds_client()->cluster_map_.empty()) { - SendMessageLocked(kCdsTypeUrl, "", nullptr, initial_message); - initial_message = false; + for (const auto& p : xds_client()->cluster_map_) { + Subscribe(kCdsTypeUrl, std::string(p.first)); } - if (!xds_client()->endpoint_map_.empty()) { - SendMessageLocked(kEdsTypeUrl, "", nullptr, initial_message); + for (const auto& p : xds_client()->endpoint_map_) { + Subscribe(kEdsTypeUrl, std::string(p.first)); } // Op: recv initial metadata. op = ops; @@ -693,51 +776,49 @@ void XdsClient::ChannelState::AdsCallState::Orphan() { // we are here because xds_client has to orphan a failed call, then the // following cancellation will be a no-op. grpc_call_cancel(call_, nullptr); + state_map_.clear(); // Note that the initial ref is hold by on_status_received_. So the // corresponding unref happens in on_status_received_ instead of here. } void XdsClient::ChannelState::AdsCallState::SendMessageLocked( - const std::string& type_url, const std::string& nonce_for_unsupported_type, - grpc_error* error_for_unsupported_type, bool is_first_message) { + const std::string& type_url) { // Buffer message sending if an existing message is in flight. if (send_message_payload_ != nullptr) { - buffered_request_map_[type_url].reset(new BufferedRequest( - nonce_for_unsupported_type, error_for_unsupported_type)); + buffered_requests_.insert(type_url); return; } - grpc_slice request_payload_slice; + auto& state = state_map_[type_url]; + grpc_error* error = state.error; + state.error = GRPC_ERROR_NONE; const XdsBootstrap::Node* node = - is_first_message ? xds_client()->bootstrap_->node() : nullptr; + sent_initial_message_ ? nullptr : xds_client()->bootstrap_->node(); const char* build_version = - is_first_message ? xds_client()->build_version_.get() : nullptr; + sent_initial_message_ ? nullptr : xds_client()->build_version_.get(); + sent_initial_message_ = true; + grpc_slice request_payload_slice; if (type_url == kLdsTypeUrl) { request_payload_slice = XdsLdsRequestCreateAndEncode( - xds_client()->server_name_, node, build_version, - lds_version_.version_info, lds_version_.nonce, lds_version_.error); - lds_version_.error = GRPC_ERROR_NONE; - GRPC_ERROR_UNREF(error_for_unsupported_type); + xds_client()->server_name_, node, build_version, state.version, + state.nonce, error); + state.subscribed_resources[xds_client()->server_name_]->Start(Ref()); } else if (type_url == kRdsTypeUrl) { request_payload_slice = XdsRdsRequestCreateAndEncode( - xds_client()->route_config_name_, node, build_version, - rds_version_.version_info, rds_version_.nonce, rds_version_.error); - rds_version_.error = GRPC_ERROR_NONE; - GRPC_ERROR_UNREF(error_for_unsupported_type); + xds_client()->route_config_name_, node, build_version, state.version, + state.nonce, error); + state.subscribed_resources[xds_client()->route_config_name_]->Start(Ref()); } else if (type_url == kCdsTypeUrl) { request_payload_slice = XdsCdsRequestCreateAndEncode( - xds_client()->WatchedClusterNames(), node, build_version, - cds_version_.version_info, cds_version_.nonce, cds_version_.error); - cds_version_.error = GRPC_ERROR_NONE; - GRPC_ERROR_UNREF(error_for_unsupported_type); + ClusterNamesForRequest(), node, build_version, state.version, + state.nonce, error); } else if (type_url == kEdsTypeUrl) { request_payload_slice = XdsEdsRequestCreateAndEncode( - xds_client()->EdsServiceNames(), node, build_version, - eds_version_.version_info, eds_version_.nonce, eds_version_.error); - eds_version_.error = GRPC_ERROR_NONE; - GRPC_ERROR_UNREF(error_for_unsupported_type); + EdsServiceNamesForRequest(), node, build_version, state.version, + state.nonce, error); } else { request_payload_slice = XdsUnsupportedTypeNackRequestCreateAndEncode( - type_url, nonce_for_unsupported_type, error_for_unsupported_type); + type_url, state.nonce, state.error); + state_map_.erase(type_url); } // Create message payload. send_message_payload_ = @@ -761,8 +842,30 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( } } +void XdsClient::ChannelState::AdsCallState::Subscribe( + const std::string& type_url, const std::string& name) { + auto& state = state_map_[type_url].subscribed_resources[name]; + if (state == nullptr) { + state = MakeOrphanable(type_url, name); + SendMessageLocked(type_url); + } +} + +void XdsClient::ChannelState::AdsCallState::Unsubscribe( + const std::string& type_url, const std::string& name) { + state_map_[type_url].subscribed_resources.erase(name); + SendMessageLocked(type_url); +} + +bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const { + for (const auto& p : state_map_) { + if (!p.second.subscribed_resources.empty()) return true; + } + return false; +} + void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( - LdsUpdate lds_update, std::string new_version) { + LdsUpdate lds_update) { const std::string& cluster_name = lds_update.rds_update.has_value() ? lds_update.rds_update.value().cluster_name @@ -775,6 +878,9 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( xds_client(), lds_update.route_config_name.c_str(), cluster_name.c_str()); } + auto& lds_state = state_map_[kLdsTypeUrl]; + auto& state = lds_state.subscribed_resources[xds_client()->server_name_]; + if (state != nullptr) state->Finish(); // Ignore identical update. if (xds_client()->route_config_name_ == lds_update.route_config_name && xds_client()->cluster_name_ == cluster_name) { @@ -802,19 +908,22 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( } } else { // Send RDS request for dynamic resolution. - SendMessageLocked(kRdsTypeUrl, "", nullptr, false); + Subscribe(kRdsTypeUrl, xds_client()->route_config_name_); } - lds_version_.version_info = std::move(new_version); } void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( - RdsUpdate rds_update, std::string new_version) { + RdsUpdate rds_update) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] RDS update received: " "cluster_name=%s", xds_client(), rds_update.cluster_name.c_str()); } + auto& rds_state = state_map_[kRdsTypeUrl]; + auto& state = + rds_state.subscribed_resources[xds_client()->route_config_name_]; + if (state != nullptr) state->Finish(); // Ignore identical update. if (xds_client()->cluster_name_ == rds_update.cluster_name) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -835,14 +944,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( } else { xds_client()->service_config_watcher_->OnError(error); } - rds_version_.version_info = std::move(new_version); } void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( - CdsUpdateMap cds_update_map, std::string new_version) { + CdsUpdateMap cds_update_map) { + auto& cds_state = state_map_[kCdsTypeUrl]; for (auto& p : cds_update_map) { const char* cluster_name = p.first.c_str(); CdsUpdate& cds_update = p.second; + auto& state = cds_state.subscribed_resources[cluster_name]; + if (state != nullptr) state->Finish(); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] CDS update (cluster=%s) received: " @@ -875,14 +986,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( p.first->OnClusterChanged(cluster_state.update.value()); } } - cds_version_.version_info = std::move(new_version); } void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate( - EdsUpdateMap eds_update_map, std::string new_version) { + EdsUpdateMap eds_update_map) { + auto& eds_state = state_map_[kEdsTypeUrl]; for (auto& p : eds_update_map) { const char* eds_service_name = p.first.c_str(); EdsUpdate& eds_update = p.second; + auto& state = eds_state.subscribed_resources[eds_service_name]; + if (state != nullptr) state->Finish(); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] EDS response with %" PRIuPTR @@ -956,7 +1069,6 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate( p.first->OnEndpointChanged(endpoint_state.update); } } - eds_version_.version_info = std::move(new_version); } void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg, @@ -977,22 +1089,17 @@ void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked( self->send_message_payload_ = nullptr; // Continue to send another pending message if any. // TODO(roth): The current code to handle buffered messages has the - // advantage of sending only the most recent list of resource names for each - // resource type (no matter how many times that resource type has been - // requested to send while the current message sending is still pending). - // But its disadvantage is that we send the requests in fixed order of - // resource types. We need to fix this if we are seeing some resource - // type(s) starved due to frequent requests of other resource type(s). - for (auto& p : self->buffered_request_map_) { - const std::string& type_url = p.first; - std::unique_ptr& buffered_request = p.second; - if (buffered_request != nullptr) { - self->SendMessageLocked(type_url, buffered_request->nonce, - buffered_request->error, false); - buffered_request->error = GRPC_ERROR_NONE; - buffered_request.reset(); - break; - } + // advantage of sending only the most recent list of resource names for + // each resource type (no matter how many times that resource type has + // been requested to send while the current message sending is still + // pending). But its disadvantage is that we send the requests in fixed + // order of resource types. We need to fix this if we are seeing some + // resource type(s) starved due to frequent requests of other resource + // type(s). + auto it = self->buffered_requests_.begin(); + if (it != self->buffered_requests_.end()) { + self->SendMessageLocked(*it); + self->buffered_requests_.erase(it); } } self->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked"); @@ -1043,8 +1150,8 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( // Note that XdsAdsResponseDecodeAndParse() also validate the response. grpc_error* parse_error = XdsAdsResponseDecodeAndParse( response_slice, xds_client->server_name_, xds_client->route_config_name_, - xds_client->EdsServiceNames(), &lds_update, &rds_update, &cds_update_map, - &eds_update_map, &version, &nonce, &type_url); + ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update, + &cds_update_map, &eds_update_map, &version, &nonce, &type_url); grpc_slice_unref_internal(response_slice); if (type_url.empty()) { // Ignore unparsable response. @@ -1052,48 +1159,34 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( xds_client, grpc_error_string(parse_error)); GRPC_ERROR_UNREF(parse_error); } else { - // Update nonce and error. - if (type_url == kLdsTypeUrl) { - ads_calld->lds_version_.nonce = nonce; - GRPC_ERROR_UNREF(ads_calld->lds_version_.error); - ads_calld->lds_version_.error = GRPC_ERROR_REF(parse_error); - } else if (type_url == kRdsTypeUrl) { - ads_calld->rds_version_.nonce = nonce; - GRPC_ERROR_UNREF(ads_calld->rds_version_.error); - ads_calld->rds_version_.error = GRPC_ERROR_REF(parse_error); - } else if (type_url == kCdsTypeUrl) { - ads_calld->cds_version_.nonce = nonce; - GRPC_ERROR_UNREF(ads_calld->cds_version_.error); - ads_calld->cds_version_.error = GRPC_ERROR_REF(parse_error); - } else if (type_url == kEdsTypeUrl) { - ads_calld->eds_version_.nonce = nonce; - GRPC_ERROR_UNREF(ads_calld->eds_version_.error); - ads_calld->eds_version_.error = GRPC_ERROR_REF(parse_error); - } + // Update nonce. + auto& state = ads_calld->state_map_[type_url]; + state.nonce = std::move(nonce); // NACK or ACK the response. if (parse_error != GRPC_ERROR_NONE) { + GRPC_ERROR_UNREF(state.error); + state.error = parse_error; // NACK unacceptable update. gpr_log( GPR_ERROR, "[xds_client %p] ADS response can't be accepted, NACKing. error=%s", xds_client, grpc_error_string(parse_error)); - ads_calld->SendMessageLocked(type_url, nonce, parse_error, false); + ads_calld->SendMessageLocked(type_url); } else { ads_calld->seen_response_ = true; // Accept the ADS response according to the type_url. if (type_url == kLdsTypeUrl) { - ads_calld->AcceptLdsUpdate(std::move(lds_update), std::move(version)); + ads_calld->AcceptLdsUpdate(std::move(lds_update)); } else if (type_url == kRdsTypeUrl) { - ads_calld->AcceptRdsUpdate(std::move(rds_update), std::move(version)); + ads_calld->AcceptRdsUpdate(std::move(rds_update)); } else if (type_url == kCdsTypeUrl) { - ads_calld->AcceptCdsUpdate(std::move(cds_update_map), - std::move(version)); + ads_calld->AcceptCdsUpdate(std::move(cds_update_map)); } else if (type_url == kEdsTypeUrl) { - ads_calld->AcceptEdsUpdate(std::move(eds_update_map), - std::move(version)); + ads_calld->AcceptEdsUpdate(std::move(eds_update_map)); } + state.version = std::move(version); // ACK the update. - ads_calld->SendMessageLocked(type_url, nonce, nullptr, false); + ads_calld->SendMessageLocked(type_url); // Start load reporting if needed. auto& lrs_call = ads_calld->chand()->lrs_calld_; if (lrs_call != nullptr) { @@ -1164,6 +1257,28 @@ bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const { return this == chand()->ads_calld_->calld(); } +std::set +XdsClient::ChannelState::AdsCallState::ClusterNamesForRequest() { + std::set cluster_names; + for (auto& p : state_map_[kCdsTypeUrl].subscribed_resources) { + cluster_names.insert(p.first); + OrphanablePtr& state = p.second; + state->Start(Ref()); + } + return cluster_names; +} + +std::set +XdsClient::ChannelState::AdsCallState::EdsServiceNamesForRequest() { + std::set eds_names; + for (auto& p : state_map_[kEdsTypeUrl].subscribed_resources) { + eds_names.insert(p.first); + OrphanablePtr& state = p.second; + state->Start(Ref()); + } + return eds_names; +} + // // XdsClient::ChannelState::LrsCallState::Reporter // @@ -1584,6 +1699,12 @@ bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const { namespace { +grpc_millis GetRequestTimeout(const grpc_channel_args& args) { + return grpc_channel_args_find_integer( + &args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS, + {15000, 0, INT_MAX}); +} + UniquePtr GenerateBuildVersionString() { char* build_version_str; gpr_asprintf(&build_version_str, "gRPC C-core %s %s", grpc_version_string(), @@ -1598,6 +1719,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties, std::unique_ptr watcher, const grpc_channel_args& channel_args, grpc_error** error) : InternallyRefCounted(&grpc_xds_client_trace), + request_timeout_(GetRequestTimeout(channel_args)), build_version_(GenerateBuildVersionString()), combiner_(GRPC_COMBINER_REF(combiner, "xds_client")), interested_parties_(interested_parties), @@ -1618,7 +1740,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties, chand_ = MakeOrphanable( Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel_args); if (service_config_watcher_ != nullptr) { - chand_->OnResourceNamesChanged(kLdsTypeUrl); + chand_->Subscribe(kLdsTypeUrl, std::string(server_name)); } } @@ -1634,8 +1756,8 @@ void XdsClient::Orphan() { void XdsClient::WatchClusterData( StringView cluster_name, std::unique_ptr watcher) { - const bool new_name = cluster_map_.find(cluster_name) == cluster_map_.end(); - ClusterState& cluster_state = cluster_map_[cluster_name]; + std::string cluster_name_str = std::string(cluster_name); + ClusterState& cluster_state = cluster_map_[cluster_name_str]; ClusterWatcherInterface* w = watcher.get(); cluster_state.watchers[w] = std::move(watcher); // If we've already received an CDS update, notify the new watcher @@ -1643,30 +1765,29 @@ void XdsClient::WatchClusterData( if (cluster_state.update.has_value()) { w->OnClusterChanged(cluster_state.update.value()); } - if (new_name) chand_->OnResourceNamesChanged(kCdsTypeUrl); + chand_->Subscribe(kCdsTypeUrl, cluster_name_str); } void XdsClient::CancelClusterDataWatch(StringView cluster_name, ClusterWatcherInterface* watcher) { if (shutting_down_) return; - ClusterState& cluster_state = cluster_map_[cluster_name]; + std::string cluster_name_str = std::string(cluster_name); + ClusterState& cluster_state = cluster_map_[cluster_name_str]; auto it = cluster_state.watchers.find(watcher); if (it != cluster_state.watchers.end()) { cluster_state.watchers.erase(it); if (cluster_state.watchers.empty()) { - cluster_map_.erase(cluster_name); - chand_->OnResourceNamesChanged(kCdsTypeUrl); + cluster_map_.erase(cluster_name_str); + chand_->Unsubscribe(kCdsTypeUrl, cluster_name_str); } } - chand_->OnWatcherRemoved(); } void XdsClient::WatchEndpointData( StringView eds_service_name, std::unique_ptr watcher) { - const bool new_name = - endpoint_map_.find(eds_service_name) == endpoint_map_.end(); - EndpointState& endpoint_state = endpoint_map_[eds_service_name]; + std::string eds_service_name_str = std::string(eds_service_name); + EndpointState& endpoint_state = endpoint_map_[eds_service_name_str]; EndpointWatcherInterface* w = watcher.get(); endpoint_state.watchers[w] = std::move(watcher); // If we've already received an EDS update, notify the new watcher @@ -1674,28 +1795,28 @@ void XdsClient::WatchEndpointData( if (!endpoint_state.update.priority_list_update.empty()) { w->OnEndpointChanged(endpoint_state.update); } - if (new_name) chand_->OnResourceNamesChanged(kEdsTypeUrl); + chand_->Subscribe(kEdsTypeUrl, eds_service_name_str); } void XdsClient::CancelEndpointDataWatch(StringView eds_service_name, EndpointWatcherInterface* watcher) { if (shutting_down_) return; - EndpointState& endpoint_state = endpoint_map_[eds_service_name]; + std::string eds_service_name_str = std::string(eds_service_name); + EndpointState& endpoint_state = endpoint_map_[eds_service_name_str]; auto it = endpoint_state.watchers.find(watcher); if (it != endpoint_state.watchers.end()) { endpoint_state.watchers.erase(it); if (endpoint_state.watchers.empty()) { - endpoint_map_.erase(eds_service_name); - chand_->OnResourceNamesChanged(kEdsTypeUrl); + endpoint_map_.erase(eds_service_name_str); + chand_->Unsubscribe(kEdsTypeUrl, eds_service_name_str); } } - chand_->OnWatcherRemoved(); } void XdsClient::AddClientStats(StringView /*lrs_server*/, StringView cluster_name, XdsClientStats* client_stats) { - EndpointState& endpoint_state = endpoint_map_[cluster_name]; + EndpointState& endpoint_state = endpoint_map_[std::string(cluster_name)]; // TODO(roth): When we add support for direct federation, use the // server name specified in lrs_server. endpoint_state.client_stats.insert(client_stats); @@ -1705,7 +1826,7 @@ void XdsClient::AddClientStats(StringView /*lrs_server*/, void XdsClient::RemoveClientStats(StringView /*lrs_server*/, StringView cluster_name, XdsClientStats* client_stats) { - EndpointState& endpoint_state = endpoint_map_[cluster_name]; + EndpointState& endpoint_state = endpoint_map_[std::string(cluster_name)]; // TODO(roth): When we add support for direct federation, use the // server name specified in lrs_server. // TODO(roth): In principle, we should try to send a final load report @@ -1745,32 +1866,11 @@ grpc_error* XdsClient::CreateServiceConfig( return error; } -std::set XdsClient::WatchedClusterNames() const { - std::set cluster_names; - for (const auto& p : cluster_map_) { - const StringView& cluster_name = p.first; - const ClusterState& cluster_state = p.second; - // Don't request for the clusters that are cached before watched. - if (cluster_state.watchers.empty()) continue; - cluster_names.emplace(cluster_name); - } - return cluster_names; -} - -std::set XdsClient::EdsServiceNames() const { - std::set eds_service_names; - for (const auto& p : endpoint_map_) { - const StringView& eds_service_name = p.first; - eds_service_names.emplace(eds_service_name); - } - return eds_service_names; -} - -std::map> XdsClient::ClientStatsMap() - const { - std::map> client_stats_map; +std::map, StringLess> +XdsClient::ClientStatsMap() const { + std::map, StringLess> client_stats_map; for (const auto& p : endpoint_map_) { - const StringView& cluster_name = p.first; + const StringView cluster_name = p.first; const auto& client_stats = p.second.client_stats; if (chand_->lrs_calld()->ShouldSendLoadReports(cluster_name)) { client_stats_map.emplace(cluster_name, client_stats); diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index 3e7df5b1581..0552440cd8c 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -153,8 +153,8 @@ class XdsClient : public InternallyRefCounted { void StartConnectivityWatchLocked(); void CancelConnectivityWatchLocked(); - void OnResourceNamesChanged(const std::string& type_url); - void OnWatcherRemoved(); + void Subscribe(const std::string& type_url, const std::string& name); + void Unsubscribe(const std::string& type_url, const std::string& name); private: class StateWatcher; @@ -195,11 +195,8 @@ class XdsClient : public InternallyRefCounted { const std::string& cluster_name, RefCountedPtr* service_config) const; - std::set WatchedClusterNames() const; - - std::set EdsServiceNames() const; - - std::map> ClientStatsMap() const; + std::map, StringLess> ClientStatsMap() + const; // Channel arg vtable functions. static void* ChannelArgCopy(void* p); @@ -208,6 +205,8 @@ class XdsClient : public InternallyRefCounted { static const grpc_arg_pointer_vtable kXdsClientVtable; + const grpc_millis request_timeout_; + grpc_core::UniquePtr build_version_; Combiner* combiner_; @@ -225,10 +224,9 @@ class XdsClient : public InternallyRefCounted { std::string route_config_name_; std::string cluster_name_; // All the received clusters are cached, no matter they are watched or not. - std::map cluster_map_; + std::map cluster_map_; // Only the watched EDS service names are stored. - std::map - endpoint_map_; + std::map endpoint_map_; bool shutting_down_ = false; }; diff --git a/src/core/lib/gprpp/string_view.h b/src/core/lib/gprpp/string_view.h index 5b03616c280..a718b79231c 100644 --- a/src/core/lib/gprpp/string_view.h +++ b/src/core/lib/gprpp/string_view.h @@ -75,6 +75,11 @@ class StringView final { : StringView(ptr, ptr == nullptr ? 0 : strlen(ptr)) {} constexpr StringView() : StringView(nullptr, 0) {} + template + StringView( + const std::basic_string, Allocator>& str) + : StringView(str.data(), str.size()) {} + constexpr const char* data() const { return ptr_; } constexpr size_t size() const { return size_; } constexpr bool empty() const { return size_ == 0; } diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 58fd8fcd78f..71977f82493 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -427,6 +427,7 @@ class AdsServiceImpl : public AdsService { const std::string version_str = "version_1"; const std::string nonce_str = "nonce_1"; grpc_core::MutexLock lock(&ads_mu_); + if (lds_ignore_) return; if (lds_response_state_ == NOT_SENT) { DiscoveryResponse response; response.set_type_url(kLdsTypeUrl); @@ -452,6 +453,7 @@ class AdsServiceImpl : public AdsService { const std::string version_str = "version_1"; const std::string nonce_str = "nonce_1"; grpc_core::MutexLock lock(&ads_mu_); + if (rds_ignore_) return; if (rds_response_state_ == NOT_SENT) { DiscoveryResponse response; response.set_type_url(kRdsTypeUrl); @@ -477,6 +479,7 @@ class AdsServiceImpl : public AdsService { const std::string version_str = "version_1"; const std::string nonce_str = "nonce_1"; grpc_core::MutexLock lock(&ads_mu_); + if (cds_ignore_) return; if (cds_response_state_ == NOT_SENT) { DiscoveryResponse response; response.set_type_url(kCdsTypeUrl); @@ -503,6 +506,7 @@ class AdsServiceImpl : public AdsService { std::vector responses_and_delays; { grpc_core::MutexLock lock(&ads_mu_); + if (eds_ignore_) return; responses_and_delays = eds_responses_and_delays_; } // Send response. @@ -538,8 +542,13 @@ class AdsServiceImpl : public AdsService { // resource names). It's not causing a big problem now but should be // fixed. bool eds_sent = false; + bool seen_first_request = false; while (!eds_sent || cds_response_state_ == SENT) { if (!stream->Read(&request)) return; + if (!seen_first_request) { + EXPECT_TRUE(request.has_node()); + seen_first_request = true; + } if (request.type_url() == kLdsTypeUrl) { HandleLdsRequest(&request, stream); } else if (request.type_url() == kRdsTypeUrl) { @@ -585,23 +594,31 @@ class AdsServiceImpl : public AdsService { lds_response_data_ = std::move(lds_response_data); } + void set_lds_ignore() { lds_ignore_ = true; } + void SetRdsResponse( std::map rds_response_data) { rds_response_data_ = std::move(rds_response_data); } + void set_rds_ignore() { rds_ignore_ = true; } + void SetCdsResponse( std::map cds_response_data) { cds_response_data_ = std::move(cds_response_data); } + void set_cds_ignore() { cds_ignore_ = true; } + void AddEdsResponse(const DiscoveryResponse& response, int send_after_ms) { grpc_core::MutexLock lock(&ads_mu_); eds_responses_and_delays_.push_back( std::make_pair(response, send_after_ms)); } + void set_eds_ignore() { eds_ignore_ = true; } + void SetLdsToUseDynamicRds() { auto listener = default_listener_; HttpConnectionManager http_connection_manager; @@ -701,17 +718,21 @@ class AdsServiceImpl : public AdsService { Listener default_listener_; std::map lds_response_data_; ResponseState lds_response_state_ = NOT_SENT; + bool lds_ignore_ = false; // RDS response data. RouteConfiguration default_route_config_; std::map rds_response_data_; ResponseState rds_response_state_ = NOT_SENT; + bool rds_ignore_ = false; // CDS response data. Cluster default_cluster_; std::map cds_response_data_; ResponseState cds_response_state_ = NOT_SENT; + bool cds_ignore_ = false; // EDS response data. std::vector eds_responses_and_delays_; + bool eds_ignore_ = false; }; class LrsServiceImpl : public LrsService { @@ -895,7 +916,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam { void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); } void ResetStub(int fallback_timeout = 0, int failover_timeout = 0, - const grpc::string& expected_targets = "") { + const grpc::string& expected_targets = "", + int xds_resource_does_not_exist_timeout = 0) { ChannelArguments args; // TODO(juanlishen): Add setter to ChannelArguments. if (fallback_timeout > 0) { @@ -904,6 +926,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam { if (failover_timeout > 0) { args.SetInt(GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, failover_timeout); } + if (xds_resource_does_not_exist_timeout > 0) { + args.SetInt(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS, + xds_resource_does_not_exist_timeout); + } // If the parent channel is using the fake resolver, we inject the // response generator for the parent here, and then SetNextResolution() // will inject the xds channel's response generator via the parent's @@ -1625,6 +1651,15 @@ TEST_P(LdsTest, RouteActionHasNoCluster) { AdsServiceImpl::NACKED); } +// Tests that LDS client times out when no response received. +TEST_P(LdsTest, Timeout) { + ResetStub(0, 0, "", 500); + balancers_[0]->ads_service()->set_lds_ignore(); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); +} + using RdsTest = BasicTest; // Tests that RDS client should send an ACK upon correct RDS response. @@ -1751,6 +1786,16 @@ TEST_P(RdsTest, RouteActionHasNoCluster) { AdsServiceImpl::NACKED); } +// Tests that RDS client times out when no response received. +TEST_P(RdsTest, Timeout) { + ResetStub(0, 0, "", 500); + balancers_[0]->ads_service()->SetLdsToUseDynamicRds(); + balancers_[0]->ads_service()->set_rds_ignore(); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); +} + using CdsTest = BasicTest; // Tests that CDS client should send an ACK upon correct CDS response. @@ -1818,6 +1863,27 @@ TEST_P(CdsTest, WrongLrsServer) { AdsServiceImpl::NACKED); } +// Tests that CDS client times out when no response received. +TEST_P(CdsTest, Timeout) { + ResetStub(0, 0, "", 500); + balancers_[0]->ads_service()->set_cds_ignore(); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); +} + +using EdsTest = BasicTest; + +// TODO(roth): Add tests showing that RPCs fail when EDS data is invalid. + +TEST_P(EdsTest, Timeout) { + ResetStub(0, 0, "", 500); + balancers_[0]->ads_service()->set_eds_ignore(); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); +} + using LocalityMapTest = BasicTest; // Tests that the localities in a locality map are picked according to their @@ -3031,6 +3097,13 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest, TestType(true, true)), &TestTypeName); +// EDS could be tested with or without XdsResolver, but the tests would +// be the same either way, so we test it only with XdsResolver. +INSTANTIATE_TEST_SUITE_P(XdsTest, EdsTest, + ::testing::Values(TestType(true, false), + TestType(true, true)), + &TestTypeName); + INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest, ::testing::Values(TestType(false, true), TestType(false, false),