From 63546c0fafd15640551e507b74c74969964c21b0 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 15 Oct 2020 16:22:20 -0700 Subject: [PATCH] Fix ADS server resource type version logic and refactor code. --- test/cpp/end2end/xds_end2end_test.cc | 542 +++++++++++++++------------ 1 file changed, 301 insertions(+), 241 deletions(-) diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 4310bd4bf4f..a4f5b2c4ba6 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -540,12 +540,17 @@ class AdsServiceImpl : public std::enable_shared_from_this { void UnsetResource(const std::string& type_url, const std::string& name) { grpc_core::MutexLock lock(&ads_mu_); - ResourceState& state = resource_map_[type_url][name]; - ++state.version; - state.resource.reset(); - gpr_log(GPR_INFO, "ADS[%p]: Unsetting %s resource %s to version %u", this, - type_url.c_str(), name.c_str(), state.version); - for (SubscriptionState* subscription : state.subscriptions) { + ResourceTypeState& resource_type_state = resource_map_[type_url]; + ++resource_type_state.resource_type_version; + ResourceState& resource_state = resource_type_state.resource_name_map[name]; + resource_state.resource_type_version = + resource_type_state.resource_type_version; + resource_state.resource.reset(); + gpr_log(GPR_INFO, + "ADS[%p]: Unsetting %s resource %s; resource_type_version now %u", + this, type_url.c_str(), name.c_str(), + resource_type_state.resource_type_version); + for (SubscriptionState* subscription : resource_state.subscriptions) { subscription->update_queue->emplace_back(type_url, name); } } @@ -553,12 +558,17 @@ class AdsServiceImpl : public std::enable_shared_from_this { void SetResource(google::protobuf::Any resource, const std::string& type_url, const std::string& name) { grpc_core::MutexLock lock(&ads_mu_); - ResourceState& state = resource_map_[type_url][name]; - ++state.version; - state.resource = std::move(resource); - gpr_log(GPR_INFO, "ADS[%p]: Updating %s resource %s to version %u", this, - type_url.c_str(), name.c_str(), state.version); - for (SubscriptionState* subscription : state.subscriptions) { + ResourceTypeState& resource_type_state = resource_map_[type_url]; + ++resource_type_state.resource_type_version; + ResourceState& resource_state = resource_type_state.resource_name_map[name]; + resource_state.resource_type_version = + resource_type_state.resource_type_version; + resource_state.resource = std::move(resource); + gpr_log(GPR_INFO, + "ADS[%p]: Updating %s resource %s; resource_type_version now %u", + this, type_url.c_str(), name.c_str(), + resource_type_state.resource_type_version); + for (SubscriptionState* subscription : resource_state.subscriptions) { subscription->update_queue->emplace_back(type_url, name); } } @@ -688,8 +698,6 @@ class AdsServiceImpl : public std::enable_shared_from_this { // A struct representing a client's subscription to a particular resource. struct SubscriptionState { - // Version that the client currently knows about. - int current_version = 0; // The queue upon which to place updates when the resource is updated. UpdateQueue* update_queue; }; @@ -700,20 +708,32 @@ class AdsServiceImpl : public std::enable_shared_from_this { using SubscriptionMap = std::map; - // A struct representing the current state for a resource: - // - the version of the resource that is set by the SetResource() methods. - // - a list of subscriptions interested in this resource. + // Sent state for a given resource type. + struct SentState { + int nonce = 0; + int resource_type_version = 0; + }; + + // A struct representing the current state for an individual resource. struct ResourceState { - int version = 0; + // The resource itself, if present. absl::optional resource; + // The resource type version that this resource was last updated in. + int resource_type_version = 0; + // A list of subscriptions to this resource. std::set subscriptions; }; - // A struct representing the current state for all resources: - // LDS, CDS, EDS, and RDS for the class as a whole. + // The current state for all individual resources of a given type. using ResourceNameMap = std::map; - using ResourceMap = std::map; + + struct ResourceTypeState { + int resource_type_version = 0; + ResourceNameMap resource_name_map; + }; + + using ResourceMap = std::map; template class RpcService : public RpcApi::Service { @@ -732,201 +752,101 @@ class AdsServiceImpl : public std::enable_shared_from_this { } else { parent_->seen_v3_client_ = true; } + // Balancer shouldn't receive the call credentials metadata. + EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey), + context->client_metadata().end()); + // Take a reference of the AdsServiceImpl object, which will go + // out of scope when this request handler returns. This ensures + // that the parent won't be destroyed until this stream is complete. + std::shared_ptr ads_service_impl = + parent_->shared_from_this(); // Resources (type/name pairs) that have changed since the client // subscribed to them. UpdateQueue update_queue; // Resources that the client will be subscribed to keyed by resource type // url. SubscriptionMap subscription_map; - [&]() { + // Sent state for each resource type. + std::map sent_state_map; + // Spawn a thread to read requests from the stream. + // Requests will be delivered to this thread in a queue. + std::deque requests; + bool stream_closed = false; + std::thread reader(std::bind(&RpcService::BlockingRead, this, stream, + &requests, &stream_closed)); + // Main loop to process requests and updates. + while (true) { + // Boolean to keep track if the loop received any work to do: a + // request or an update; regardless whether a response was actually + // sent out. + bool did_work = false; + // Look for new requests and and decide what to handle. + absl::optional response; { grpc_core::MutexLock lock(&parent_->ads_mu_); - if (parent_->ads_done_) return; - } - // Balancer shouldn't receive the call credentials metadata. - EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey), - context->client_metadata().end()); - // Current Version map keyed by resource type url. - std::map resource_type_version; - // Creating blocking thread to read from stream. - std::deque requests; - bool stream_closed = false; - // Take a reference of the AdsServiceImpl object, reference will go - // out of scope after the reader thread is joined. - std::shared_ptr ads_service_impl = - parent_->shared_from_this(); - std::thread reader(std::bind(&RpcService::BlockingRead, this, stream, - &requests, &stream_closed)); - // Main loop to look for requests and updates. - while (true) { - // Look for new requests and and decide what to handle. - absl::optional response; - // Boolean to keep track if the loop received any work to do: a - // request or an update; regardless whether a response was actually - // sent out. - bool did_work = false; - { - grpc_core::MutexLock lock(&parent_->ads_mu_); - if (stream_closed) break; - if (!requests.empty()) { - DiscoveryRequest request = std::move(requests.front()); - requests.pop_front(); - did_work = true; - gpr_log(GPR_INFO, - "ADS[%p]: Received request for type %s with content %s", - this, request.type_url().c_str(), - request.DebugString().c_str()); - const std::string v3_resource_type = - TypeUrlToV3(request.type_url()); - // As long as we are not in shutdown, identify ACK and NACK by - // looking for version information and comparing it to nonce (this - // server ensures they are always set to the same in a response.) - auto it = - parent_->resource_type_response_state_.find(v3_resource_type); - if (it != parent_->resource_type_response_state_.end()) { - if (!request.response_nonce().empty()) { - it->second.state = - (!request.version_info().empty() && - request.version_info() == request.response_nonce()) - ? ResponseState::ACKED - : ResponseState::NACKED; - } - if (request.has_error_detail()) { - it->second.error_message = request.error_detail().message(); - } - } - // As long as the test did not tell us to ignore this type of - // request, look at all the resource names. - if (parent_->resource_types_to_ignore_.find(v3_resource_type) == - parent_->resource_types_to_ignore_.end()) { - auto& subscription_name_map = - subscription_map[v3_resource_type]; - auto& resource_name_map = - parent_->resource_map_[v3_resource_type]; - std::set resources_in_current_request; - std::set resources_added_to_response; - for (const std::string& resource_name : - request.resource_names()) { - resources_in_current_request.emplace(resource_name); - auto& subscription_state = - subscription_name_map[resource_name]; - auto& resource_state = resource_name_map[resource_name]; - // Subscribe if needed. - parent_->MaybeSubscribe(v3_resource_type, resource_name, - &subscription_state, &resource_state, - &update_queue); - // Send update if needed. - if (ClientNeedsResourceUpdate(resource_state, - &subscription_state)) { - gpr_log(GPR_INFO, - "ADS[%p]: Sending update for type=%s name=%s " - "version=%d", - this, request.type_url().c_str(), - resource_name.c_str(), resource_state.version); - resources_added_to_response.emplace(resource_name); - if (!response.has_value()) response.emplace(); - if (resource_state.resource.has_value()) { - auto* resource = response->add_resources(); - resource->CopyFrom(resource_state.resource.value()); - if (is_v2_) { - resource->set_type_url(request.type_url()); - } - } - } else { - gpr_log(GPR_INFO, - "ADS[%p]: client does not need update for " - "type=%s name=%s version=%d", - this, request.type_url().c_str(), - resource_name.c_str(), resource_state.version); - } - } - // Process unsubscriptions for any resource no longer - // present in the request's resource list. - parent_->ProcessUnsubscriptions( - v3_resource_type, resources_in_current_request, - &subscription_name_map, &resource_name_map); - // Send response if needed. - if (!resources_added_to_response.empty()) { - CompleteBuildingDiscoveryResponse( - v3_resource_type, request.type_url(), - ++resource_type_version[v3_resource_type], - subscription_name_map, resources_added_to_response, - &response.value()); - } - } - } - } - if (response.has_value()) { - gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this, - response->DebugString().c_str()); - stream->Write(response.value()); - } - response.reset(); - // Look for updates and decide what to handle. - { - grpc_core::MutexLock lock(&parent_->ads_mu_); - if (!update_queue.empty()) { - const std::string resource_type = - std::move(update_queue.front().first); - const std::string resource_name = - std::move(update_queue.front().second); - update_queue.pop_front(); - const std::string v2_resource_type = TypeUrlToV2(resource_type); - did_work = true; - gpr_log(GPR_INFO, "ADS[%p]: Received update for type=%s name=%s", - this, resource_type.c_str(), resource_name.c_str()); - auto& subscription_name_map = subscription_map[resource_type]; - auto& resource_name_map = parent_->resource_map_[resource_type]; - auto it = subscription_name_map.find(resource_name); - if (it != subscription_name_map.end()) { - SubscriptionState& subscription_state = it->second; - ResourceState& resource_state = - resource_name_map[resource_name]; - if (ClientNeedsResourceUpdate(resource_state, - &subscription_state)) { - gpr_log( - GPR_INFO, - "ADS[%p]: Sending update for type=%s name=%s version=%d", - this, resource_type.c_str(), resource_name.c_str(), - resource_state.version); - response.emplace(); - if (resource_state.resource.has_value()) { - auto* resource = response->add_resources(); - resource->CopyFrom(resource_state.resource.value()); - if (is_v2_) { - resource->set_type_url(v2_resource_type); - } - } - CompleteBuildingDiscoveryResponse( - resource_type, v2_resource_type, - ++resource_type_version[resource_type], - subscription_name_map, {resource_name}, - &response.value()); - } - } - } + // If the stream has been closed or our parent is being shut + // down, stop immediately. + if (stream_closed || parent_->ads_done_) break; + // Otherwise, see if there's a request to read from the queue. + if (!requests.empty()) { + DiscoveryRequest request = std::move(requests.front()); + requests.pop_front(); + did_work = true; + gpr_log(GPR_INFO, + "ADS[%p]: Received request for type %s with content %s", + this, request.type_url().c_str(), + request.DebugString().c_str()); + const std::string v3_resource_type = + TypeUrlToV3(request.type_url()); + SentState& sent_state = sent_state_map[v3_resource_type]; + // Process request. + ProcessRequest(request, v3_resource_type, &update_queue, + &subscription_map, &sent_state, &response); } - if (response.has_value()) { - gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this, - response->DebugString().c_str()); - stream->Write(response.value()); + } + if (response.has_value()) { + gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this, + response->DebugString().c_str()); + stream->Write(response.value()); + } + response.reset(); + // Look for updates and decide what to handle. + { + grpc_core::MutexLock lock(&parent_->ads_mu_); + if (!update_queue.empty()) { + const std::string resource_type = + std::move(update_queue.front().first); + const std::string resource_name = + std::move(update_queue.front().second); + update_queue.pop_front(); + did_work = true; + SentState& sent_state = sent_state_map[resource_type]; + ProcessUpdate(resource_type, resource_name, &subscription_map, + &sent_state, &response); } - // If we didn't find anything to do, delay before the next loop - // iteration; otherwise, check whether we should exit and then - // immediately continue. - gpr_timespec deadline = - grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10); - { - grpc_core::MutexLock lock(&parent_->ads_mu_); - if (!parent_->ads_cond_.WaitUntil( - &parent_->ads_mu_, [this] { return parent_->ads_done_; }, - deadline)) { - break; - } + } + if (response.has_value()) { + gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this, + response->DebugString().c_str()); + stream->Write(response.value()); + } + // If we didn't find anything to do, delay before the next loop + // iteration; otherwise, check whether we should exit and then + // immediately continue. + gpr_timespec deadline = + grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10); + { + grpc_core::MutexLock lock(&parent_->ads_mu_); + if (!parent_->ads_cond_.WaitUntil( + &parent_->ads_mu_, [this] { return parent_->ads_done_; }, + deadline)) { + break; } } - reader.join(); - }(); + } + // Done with main loop. Clean up before returning. + // Join reader thread. + reader.join(); // Clean up any subscriptions that were still active when the call // finished. { @@ -937,8 +857,9 @@ class AdsServiceImpl : public std::enable_shared_from_this { for (auto& q : subscription_name_map) { const std::string& resource_name = q.first; SubscriptionState& subscription_state = q.second; - ResourceState& resource_state = - parent_->resource_map_[type_url][resource_name]; + ResourceNameMap& resource_name_map = + parent_->resource_map_[type_url].resource_name_map; + ResourceState& resource_state = resource_name_map[resource_name]; resource_state.subscriptions.erase(&subscription_state); } } @@ -949,20 +870,139 @@ class AdsServiceImpl : public std::enable_shared_from_this { } private: - static std::string TypeUrlToV2(const std::string& resource_type) { - if (resource_type == kLdsTypeUrl) return kLdsV2TypeUrl; - if (resource_type == kRdsTypeUrl) return kRdsV2TypeUrl; - if (resource_type == kCdsTypeUrl) return kCdsV2TypeUrl; - if (resource_type == kEdsTypeUrl) return kEdsV2TypeUrl; - return resource_type; + // Processes a response read from the client. + // Populates response if needed. + void ProcessRequest(const DiscoveryRequest& request, + const std::string& v3_resource_type, + UpdateQueue* update_queue, + SubscriptionMap* subscription_map, + SentState* sent_state, + absl::optional* response) { + // Determine client resource type version. + int client_resource_type_version = 0; + if (!request.version_info().empty()) { + GPR_ASSERT(absl::SimpleAtoi(request.version_info(), + &client_resource_type_version)); + } + // Check the nonce sent by the client, if any. + // (This will be absent on the first request on a stream.) + if (!request.response_nonce().empty()) { + int client_nonce; + GPR_ASSERT(absl::SimpleAtoi(request.response_nonce(), &client_nonce)); + // Ignore requests with stale nonces. + if (client_nonce < sent_state->nonce) return; + // Check for ACK or NACK. + auto it = parent_->resource_type_response_state_.find(v3_resource_type); + if (it != parent_->resource_type_response_state_.end()) { + if (client_resource_type_version == + sent_state->resource_type_version) { + it->second.state = ResponseState::ACKED; + it->second.error_message.clear(); + gpr_log(GPR_INFO, + "ADS[%p]: client ACKed resource_type=%s version=%s", this, + request.type_url().c_str(), request.version_info().c_str()); + } else { + it->second.state = ResponseState::NACKED; + it->second.error_message = request.error_detail().message(); + gpr_log(GPR_INFO, + "ADS[%p]: client NACKed resource_type=%s version=%s: %s", + this, request.type_url().c_str(), + request.version_info().c_str(), + it->second.error_message.c_str()); + } + } + } + // Ignore resource types as requested by tests. + if (parent_->resource_types_to_ignore_.find(v3_resource_type) != + parent_->resource_types_to_ignore_.end()) { + return; + } + // Look at all the resource names in the request. + auto& subscription_name_map = (*subscription_map)[v3_resource_type]; + auto& resource_type_state = parent_->resource_map_[v3_resource_type]; + auto& resource_name_map = resource_type_state.resource_name_map; + std::set resources_in_current_request; + std::set resources_added_to_response; + for (const std::string& resource_name : request.resource_names()) { + resources_in_current_request.emplace(resource_name); + auto& subscription_state = subscription_name_map[resource_name]; + auto& resource_state = resource_name_map[resource_name]; + // Subscribe if needed. + // Send the resource in the response if either (a) this is + // a new subscription or (b) there is an updated version of + // this resource to send. + if (parent_->MaybeSubscribe(v3_resource_type, resource_name, + &subscription_state, &resource_state, + update_queue) || + ClientNeedsResourceUpdate(resource_type_state, resource_state, + client_resource_type_version, + &subscription_state)) { + gpr_log(GPR_INFO, "ADS[%p]: Sending update for type=%s name=%s", this, + request.type_url().c_str(), resource_name.c_str()); + resources_added_to_response.emplace(resource_name); + if (!response->has_value()) response->emplace(); + if (resource_state.resource.has_value()) { + auto* resource = (*response)->add_resources(); + resource->CopyFrom(resource_state.resource.value()); + if (is_v2_) { + resource->set_type_url(request.type_url()); + } + } + } else { + gpr_log(GPR_INFO, + "ADS[%p]: client does not need update for type=%s name=%s", + this, request.type_url().c_str(), resource_name.c_str()); + } + } + // Process unsubscriptions for any resource no longer + // present in the request's resource list. + parent_->ProcessUnsubscriptions( + v3_resource_type, resources_in_current_request, + &subscription_name_map, &resource_name_map); + // Construct response if needed. + if (!resources_added_to_response.empty()) { + CompleteBuildingDiscoveryResponse( + v3_resource_type, request.type_url(), + resource_type_state.resource_type_version, subscription_name_map, + resources_added_to_response, sent_state, &response->value()); + } } - static std::string TypeUrlToV3(const std::string& resource_type) { - if (resource_type == kLdsV2TypeUrl) return kLdsTypeUrl; - if (resource_type == kRdsV2TypeUrl) return kRdsTypeUrl; - if (resource_type == kCdsV2TypeUrl) return kCdsTypeUrl; - if (resource_type == kEdsV2TypeUrl) return kEdsTypeUrl; - return resource_type; + // Processes a resource update from the test. + // Populates response if needed. + void ProcessUpdate(const std::string& resource_type, + const std::string& resource_name, + SubscriptionMap* subscription_map, SentState* sent_state, + absl::optional* response) { + const std::string v2_resource_type = TypeUrlToV2(resource_type); + gpr_log(GPR_INFO, "ADS[%p]: Received update for type=%s name=%s", this, + resource_type.c_str(), resource_name.c_str()); + auto& subscription_name_map = (*subscription_map)[resource_type]; + auto& resource_type_state = parent_->resource_map_[resource_type]; + auto& resource_name_map = resource_type_state.resource_name_map; + auto it = subscription_name_map.find(resource_name); + if (it != subscription_name_map.end()) { + SubscriptionState& subscription_state = it->second; + ResourceState& resource_state = resource_name_map[resource_name]; + if (ClientNeedsResourceUpdate(resource_type_state, resource_state, + sent_state->resource_type_version, + &subscription_state)) { + gpr_log(GPR_INFO, "ADS[%p]: Sending update for type=%s name=%s", this, + resource_type.c_str(), resource_name.c_str()); + response->emplace(); + if (resource_state.resource.has_value()) { + auto* resource = (*response)->add_resources(); + resource->CopyFrom(resource_state.resource.value()); + if (is_v2_) { + resource->set_type_url(v2_resource_type); + } + } + CompleteBuildingDiscoveryResponse( + resource_type, v2_resource_type, + resource_type_state.resource_type_version, subscription_name_map, + {resource_name}, sent_state, &response->value()); + } + } } // Starting a thread to do blocking read on the stream until cancel. @@ -989,29 +1029,21 @@ class AdsServiceImpl : public std::enable_shared_from_this { *stream_closed = true; } - static void CheckBuildVersion( - const ::envoy::api::v2::DiscoveryRequest& request) { - EXPECT_FALSE(request.node().build_version().empty()); - } - - static void CheckBuildVersion( - const ::envoy::service::discovery::v3::DiscoveryRequest& request) {} - // Completing the building a DiscoveryResponse by adding common information // for all resources and by adding all subscribed resources for LDS and CDS. void CompleteBuildingDiscoveryResponse( const std::string& resource_type, const std::string& v2_resource_type, const int version, const SubscriptionNameMap& subscription_name_map, const std::set& resources_added_to_response, - DiscoveryResponse* response) { + SentState* sent_state, DiscoveryResponse* response) { auto& response_state = parent_->resource_type_response_state_[resource_type]; if (response_state.state == ResponseState::NOT_SENT) { response_state.state = ResponseState::SENT; } response->set_type_url(is_v2_ ? v2_resource_type : resource_type); - response->set_version_info(absl::StrCat(version)); - response->set_nonce(absl::StrCat(version)); + response->set_version_info(std::to_string(version)); + response->set_nonce(std::to_string(++sent_state->nonce)); if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) { // For LDS and CDS we must send back all subscribed resources // (even the unchanged ones) @@ -1019,8 +1051,10 @@ class AdsServiceImpl : public std::enable_shared_from_this { const std::string& resource_name = p.first; if (resources_added_to_response.find(resource_name) == resources_added_to_response.end()) { + ResourceNameMap& resource_name_map = + parent_->resource_map_[resource_type].resource_name_map; const ResourceState& resource_state = - parent_->resource_map_[resource_type][resource_name]; + resource_name_map[resource_name]; if (resource_state.resource.has_value()) { auto* resource = response->add_resources(); resource->CopyFrom(resource_state.resource.value()); @@ -1031,39 +1065,65 @@ class AdsServiceImpl : public std::enable_shared_from_this { } } } + sent_state->resource_type_version = version; } + static std::string TypeUrlToV2(const std::string& resource_type) { + if (resource_type == kLdsTypeUrl) return kLdsV2TypeUrl; + if (resource_type == kRdsTypeUrl) return kRdsV2TypeUrl; + if (resource_type == kCdsTypeUrl) return kCdsV2TypeUrl; + if (resource_type == kEdsTypeUrl) return kEdsV2TypeUrl; + return resource_type; + } + + static std::string TypeUrlToV3(const std::string& resource_type) { + if (resource_type == kLdsV2TypeUrl) return kLdsTypeUrl; + if (resource_type == kRdsV2TypeUrl) return kRdsTypeUrl; + if (resource_type == kCdsV2TypeUrl) return kCdsTypeUrl; + if (resource_type == kEdsV2TypeUrl) return kEdsTypeUrl; + return resource_type; + } + + static void CheckBuildVersion( + const ::envoy::api::v2::DiscoveryRequest& request) { + EXPECT_FALSE(request.node().build_version().empty()); + } + + static void CheckBuildVersion( + const ::envoy::service::discovery::v3::DiscoveryRequest& request) {} + AdsServiceImpl* parent_; const bool is_v2_; }; // Checks whether the client needs to receive a newer version of - // the resource. If so, updates subscription_state->current_version and - // returns true. - static bool ClientNeedsResourceUpdate(const ResourceState& resource_state, - SubscriptionState* subscription_state) { - if (subscription_state->current_version < resource_state.version) { - subscription_state->current_version = resource_state.version; - return true; - } - return false; + // the resource. + static bool ClientNeedsResourceUpdate( + const ResourceTypeState& resource_type_state, + const ResourceState& resource_state, int client_resource_type_version, + SubscriptionState* subscription_state) { + return client_resource_type_version < + resource_type_state.resource_type_version && + resource_state.resource_type_version <= + resource_type_state.resource_type_version; } // Subscribes to a resource if not already subscribed: // 1. Sets the update_queue field in subscription_state. // 2. Adds subscription_state to resource_state->subscriptions. - void MaybeSubscribe(const std::string& resource_type, + bool MaybeSubscribe(const std::string& resource_type, const std::string& resource_name, SubscriptionState* subscription_state, ResourceState* resource_state, UpdateQueue* update_queue) { // The update_queue will be null if we were not previously subscribed. - if (subscription_state->update_queue != nullptr) return; + if (subscription_state->update_queue != nullptr) return false; subscription_state->update_queue = update_queue; resource_state->subscriptions.emplace(subscription_state); gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p", this, resource_type.c_str(), resource_name.c_str(), &subscription_state); + return true; } // Removes subscriptions for resources no longer present in the