Fix ADS server resource type version logic and refactor code.

pull/24434/head
Mark D. Roth 4 years ago
parent 776cf3bc69
commit 63546c0faf
  1. 542
      test/cpp/end2end/xds_end2end_test.cc

@ -540,12 +540,17 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
void UnsetResource(const std::string& type_url, const std::string& name) {
grpc_core::MutexLock lock(&ads_mu_);
ResourceState& state = resource_map_[type_url][name];
++state.version;
state.resource.reset();
gpr_log(GPR_INFO, "ADS[%p]: Unsetting %s resource %s to version %u", this,
type_url.c_str(), name.c_str(), state.version);
for (SubscriptionState* subscription : state.subscriptions) {
ResourceTypeState& resource_type_state = resource_map_[type_url];
++resource_type_state.resource_type_version;
ResourceState& resource_state = resource_type_state.resource_name_map[name];
resource_state.resource_type_version =
resource_type_state.resource_type_version;
resource_state.resource.reset();
gpr_log(GPR_INFO,
"ADS[%p]: Unsetting %s resource %s; resource_type_version now %u",
this, type_url.c_str(), name.c_str(),
resource_type_state.resource_type_version);
for (SubscriptionState* subscription : resource_state.subscriptions) {
subscription->update_queue->emplace_back(type_url, name);
}
}
@ -553,12 +558,17 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
void SetResource(google::protobuf::Any resource, const std::string& type_url,
const std::string& name) {
grpc_core::MutexLock lock(&ads_mu_);
ResourceState& state = resource_map_[type_url][name];
++state.version;
state.resource = std::move(resource);
gpr_log(GPR_INFO, "ADS[%p]: Updating %s resource %s to version %u", this,
type_url.c_str(), name.c_str(), state.version);
for (SubscriptionState* subscription : state.subscriptions) {
ResourceTypeState& resource_type_state = resource_map_[type_url];
++resource_type_state.resource_type_version;
ResourceState& resource_state = resource_type_state.resource_name_map[name];
resource_state.resource_type_version =
resource_type_state.resource_type_version;
resource_state.resource = std::move(resource);
gpr_log(GPR_INFO,
"ADS[%p]: Updating %s resource %s; resource_type_version now %u",
this, type_url.c_str(), name.c_str(),
resource_type_state.resource_type_version);
for (SubscriptionState* subscription : resource_state.subscriptions) {
subscription->update_queue->emplace_back(type_url, name);
}
}
@ -688,8 +698,6 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
// A struct representing a client's subscription to a particular resource.
struct SubscriptionState {
// Version that the client currently knows about.
int current_version = 0;
// The queue upon which to place updates when the resource is updated.
UpdateQueue* update_queue;
};
@ -700,20 +708,32 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
using SubscriptionMap =
std::map<std::string /* type_url */, SubscriptionNameMap>;
// A struct representing the current state for a resource:
// - the version of the resource that is set by the SetResource() methods.
// - a list of subscriptions interested in this resource.
// Sent state for a given resource type.
struct SentState {
int nonce = 0;
int resource_type_version = 0;
};
// A struct representing the current state for an individual resource.
struct ResourceState {
int version = 0;
// The resource itself, if present.
absl::optional<google::protobuf::Any> resource;
// The resource type version that this resource was last updated in.
int resource_type_version = 0;
// A list of subscriptions to this resource.
std::set<SubscriptionState*> subscriptions;
};
// A struct representing the current state for all resources:
// LDS, CDS, EDS, and RDS for the class as a whole.
// The current state for all individual resources of a given type.
using ResourceNameMap =
std::map<std::string /* resource_name */, ResourceState>;
using ResourceMap = std::map<std::string /* type_url */, ResourceNameMap>;
struct ResourceTypeState {
int resource_type_version = 0;
ResourceNameMap resource_name_map;
};
using ResourceMap = std::map<std::string /* type_url */, ResourceTypeState>;
template <class RpcApi, class DiscoveryRequest, class DiscoveryResponse>
class RpcService : public RpcApi::Service {
@ -732,201 +752,101 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
} else {
parent_->seen_v3_client_ = true;
}
// Balancer shouldn't receive the call credentials metadata.
EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey),
context->client_metadata().end());
// Take a reference of the AdsServiceImpl object, which will go
// out of scope when this request handler returns. This ensures
// that the parent won't be destroyed until this stream is complete.
std::shared_ptr<AdsServiceImpl> ads_service_impl =
parent_->shared_from_this();
// Resources (type/name pairs) that have changed since the client
// subscribed to them.
UpdateQueue update_queue;
// Resources that the client will be subscribed to keyed by resource type
// url.
SubscriptionMap subscription_map;
[&]() {
// Sent state for each resource type.
std::map<std::string /*type_url*/, SentState> sent_state_map;
// Spawn a thread to read requests from the stream.
// Requests will be delivered to this thread in a queue.
std::deque<DiscoveryRequest> requests;
bool stream_closed = false;
std::thread reader(std::bind(&RpcService::BlockingRead, this, stream,
&requests, &stream_closed));
// Main loop to process requests and updates.
while (true) {
// Boolean to keep track if the loop received any work to do: a
// request or an update; regardless whether a response was actually
// sent out.
bool did_work = false;
// Look for new requests and and decide what to handle.
absl::optional<DiscoveryResponse> response;
{
grpc_core::MutexLock lock(&parent_->ads_mu_);
if (parent_->ads_done_) return;
}
// Balancer shouldn't receive the call credentials metadata.
EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey),
context->client_metadata().end());
// Current Version map keyed by resource type url.
std::map<std::string, int> resource_type_version;
// Creating blocking thread to read from stream.
std::deque<DiscoveryRequest> requests;
bool stream_closed = false;
// Take a reference of the AdsServiceImpl object, reference will go
// out of scope after the reader thread is joined.
std::shared_ptr<AdsServiceImpl> ads_service_impl =
parent_->shared_from_this();
std::thread reader(std::bind(&RpcService::BlockingRead, this, stream,
&requests, &stream_closed));
// Main loop to look for requests and updates.
while (true) {
// Look for new requests and and decide what to handle.
absl::optional<DiscoveryResponse> response;
// Boolean to keep track if the loop received any work to do: a
// request or an update; regardless whether a response was actually
// sent out.
bool did_work = false;
{
grpc_core::MutexLock lock(&parent_->ads_mu_);
if (stream_closed) break;
if (!requests.empty()) {
DiscoveryRequest request = std::move(requests.front());
requests.pop_front();
did_work = true;
gpr_log(GPR_INFO,
"ADS[%p]: Received request for type %s with content %s",
this, request.type_url().c_str(),
request.DebugString().c_str());
const std::string v3_resource_type =
TypeUrlToV3(request.type_url());
// As long as we are not in shutdown, identify ACK and NACK by
// looking for version information and comparing it to nonce (this
// server ensures they are always set to the same in a response.)
auto it =
parent_->resource_type_response_state_.find(v3_resource_type);
if (it != parent_->resource_type_response_state_.end()) {
if (!request.response_nonce().empty()) {
it->second.state =
(!request.version_info().empty() &&
request.version_info() == request.response_nonce())
? ResponseState::ACKED
: ResponseState::NACKED;
}
if (request.has_error_detail()) {
it->second.error_message = request.error_detail().message();
}
}
// As long as the test did not tell us to ignore this type of
// request, look at all the resource names.
if (parent_->resource_types_to_ignore_.find(v3_resource_type) ==
parent_->resource_types_to_ignore_.end()) {
auto& subscription_name_map =
subscription_map[v3_resource_type];
auto& resource_name_map =
parent_->resource_map_[v3_resource_type];
std::set<std::string> resources_in_current_request;
std::set<std::string> resources_added_to_response;
for (const std::string& resource_name :
request.resource_names()) {
resources_in_current_request.emplace(resource_name);
auto& subscription_state =
subscription_name_map[resource_name];
auto& resource_state = resource_name_map[resource_name];
// Subscribe if needed.
parent_->MaybeSubscribe(v3_resource_type, resource_name,
&subscription_state, &resource_state,
&update_queue);
// Send update if needed.
if (ClientNeedsResourceUpdate(resource_state,
&subscription_state)) {
gpr_log(GPR_INFO,
"ADS[%p]: Sending update for type=%s name=%s "
"version=%d",
this, request.type_url().c_str(),
resource_name.c_str(), resource_state.version);
resources_added_to_response.emplace(resource_name);
if (!response.has_value()) response.emplace();
if (resource_state.resource.has_value()) {
auto* resource = response->add_resources();
resource->CopyFrom(resource_state.resource.value());
if (is_v2_) {
resource->set_type_url(request.type_url());
}
}
} else {
gpr_log(GPR_INFO,
"ADS[%p]: client does not need update for "
"type=%s name=%s version=%d",
this, request.type_url().c_str(),
resource_name.c_str(), resource_state.version);
}
}
// Process unsubscriptions for any resource no longer
// present in the request's resource list.
parent_->ProcessUnsubscriptions(
v3_resource_type, resources_in_current_request,
&subscription_name_map, &resource_name_map);
// Send response if needed.
if (!resources_added_to_response.empty()) {
CompleteBuildingDiscoveryResponse(
v3_resource_type, request.type_url(),
++resource_type_version[v3_resource_type],
subscription_name_map, resources_added_to_response,
&response.value());
}
}
}
}
if (response.has_value()) {
gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this,
response->DebugString().c_str());
stream->Write(response.value());
}
response.reset();
// Look for updates and decide what to handle.
{
grpc_core::MutexLock lock(&parent_->ads_mu_);
if (!update_queue.empty()) {
const std::string resource_type =
std::move(update_queue.front().first);
const std::string resource_name =
std::move(update_queue.front().second);
update_queue.pop_front();
const std::string v2_resource_type = TypeUrlToV2(resource_type);
did_work = true;
gpr_log(GPR_INFO, "ADS[%p]: Received update for type=%s name=%s",
this, resource_type.c_str(), resource_name.c_str());
auto& subscription_name_map = subscription_map[resource_type];
auto& resource_name_map = parent_->resource_map_[resource_type];
auto it = subscription_name_map.find(resource_name);
if (it != subscription_name_map.end()) {
SubscriptionState& subscription_state = it->second;
ResourceState& resource_state =
resource_name_map[resource_name];
if (ClientNeedsResourceUpdate(resource_state,
&subscription_state)) {
gpr_log(
GPR_INFO,
"ADS[%p]: Sending update for type=%s name=%s version=%d",
this, resource_type.c_str(), resource_name.c_str(),
resource_state.version);
response.emplace();
if (resource_state.resource.has_value()) {
auto* resource = response->add_resources();
resource->CopyFrom(resource_state.resource.value());
if (is_v2_) {
resource->set_type_url(v2_resource_type);
}
}
CompleteBuildingDiscoveryResponse(
resource_type, v2_resource_type,
++resource_type_version[resource_type],
subscription_name_map, {resource_name},
&response.value());
}
}
}
// If the stream has been closed or our parent is being shut
// down, stop immediately.
if (stream_closed || parent_->ads_done_) break;
// Otherwise, see if there's a request to read from the queue.
if (!requests.empty()) {
DiscoveryRequest request = std::move(requests.front());
requests.pop_front();
did_work = true;
gpr_log(GPR_INFO,
"ADS[%p]: Received request for type %s with content %s",
this, request.type_url().c_str(),
request.DebugString().c_str());
const std::string v3_resource_type =
TypeUrlToV3(request.type_url());
SentState& sent_state = sent_state_map[v3_resource_type];
// Process request.
ProcessRequest(request, v3_resource_type, &update_queue,
&subscription_map, &sent_state, &response);
}
if (response.has_value()) {
gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this,
response->DebugString().c_str());
stream->Write(response.value());
}
if (response.has_value()) {
gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this,
response->DebugString().c_str());
stream->Write(response.value());
}
response.reset();
// Look for updates and decide what to handle.
{
grpc_core::MutexLock lock(&parent_->ads_mu_);
if (!update_queue.empty()) {
const std::string resource_type =
std::move(update_queue.front().first);
const std::string resource_name =
std::move(update_queue.front().second);
update_queue.pop_front();
did_work = true;
SentState& sent_state = sent_state_map[resource_type];
ProcessUpdate(resource_type, resource_name, &subscription_map,
&sent_state, &response);
}
// If we didn't find anything to do, delay before the next loop
// iteration; otherwise, check whether we should exit and then
// immediately continue.
gpr_timespec deadline =
grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10);
{
grpc_core::MutexLock lock(&parent_->ads_mu_);
if (!parent_->ads_cond_.WaitUntil(
&parent_->ads_mu_, [this] { return parent_->ads_done_; },
deadline)) {
break;
}
}
if (response.has_value()) {
gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this,
response->DebugString().c_str());
stream->Write(response.value());
}
// If we didn't find anything to do, delay before the next loop
// iteration; otherwise, check whether we should exit and then
// immediately continue.
gpr_timespec deadline =
grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10);
{
grpc_core::MutexLock lock(&parent_->ads_mu_);
if (!parent_->ads_cond_.WaitUntil(
&parent_->ads_mu_, [this] { return parent_->ads_done_; },
deadline)) {
break;
}
}
reader.join();
}();
}
// Done with main loop. Clean up before returning.
// Join reader thread.
reader.join();
// Clean up any subscriptions that were still active when the call
// finished.
{
@ -937,8 +857,9 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
for (auto& q : subscription_name_map) {
const std::string& resource_name = q.first;
SubscriptionState& subscription_state = q.second;
ResourceState& resource_state =
parent_->resource_map_[type_url][resource_name];
ResourceNameMap& resource_name_map =
parent_->resource_map_[type_url].resource_name_map;
ResourceState& resource_state = resource_name_map[resource_name];
resource_state.subscriptions.erase(&subscription_state);
}
}
@ -949,20 +870,139 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
}
private:
static std::string TypeUrlToV2(const std::string& resource_type) {
if (resource_type == kLdsTypeUrl) return kLdsV2TypeUrl;
if (resource_type == kRdsTypeUrl) return kRdsV2TypeUrl;
if (resource_type == kCdsTypeUrl) return kCdsV2TypeUrl;
if (resource_type == kEdsTypeUrl) return kEdsV2TypeUrl;
return resource_type;
// Processes a response read from the client.
// Populates response if needed.
void ProcessRequest(const DiscoveryRequest& request,
const std::string& v3_resource_type,
UpdateQueue* update_queue,
SubscriptionMap* subscription_map,
SentState* sent_state,
absl::optional<DiscoveryResponse>* response) {
// Determine client resource type version.
int client_resource_type_version = 0;
if (!request.version_info().empty()) {
GPR_ASSERT(absl::SimpleAtoi(request.version_info(),
&client_resource_type_version));
}
// Check the nonce sent by the client, if any.
// (This will be absent on the first request on a stream.)
if (!request.response_nonce().empty()) {
int client_nonce;
GPR_ASSERT(absl::SimpleAtoi(request.response_nonce(), &client_nonce));
// Ignore requests with stale nonces.
if (client_nonce < sent_state->nonce) return;
// Check for ACK or NACK.
auto it = parent_->resource_type_response_state_.find(v3_resource_type);
if (it != parent_->resource_type_response_state_.end()) {
if (client_resource_type_version ==
sent_state->resource_type_version) {
it->second.state = ResponseState::ACKED;
it->second.error_message.clear();
gpr_log(GPR_INFO,
"ADS[%p]: client ACKed resource_type=%s version=%s", this,
request.type_url().c_str(), request.version_info().c_str());
} else {
it->second.state = ResponseState::NACKED;
it->second.error_message = request.error_detail().message();
gpr_log(GPR_INFO,
"ADS[%p]: client NACKed resource_type=%s version=%s: %s",
this, request.type_url().c_str(),
request.version_info().c_str(),
it->second.error_message.c_str());
}
}
}
// Ignore resource types as requested by tests.
if (parent_->resource_types_to_ignore_.find(v3_resource_type) !=
parent_->resource_types_to_ignore_.end()) {
return;
}
// Look at all the resource names in the request.
auto& subscription_name_map = (*subscription_map)[v3_resource_type];
auto& resource_type_state = parent_->resource_map_[v3_resource_type];
auto& resource_name_map = resource_type_state.resource_name_map;
std::set<std::string> resources_in_current_request;
std::set<std::string> resources_added_to_response;
for (const std::string& resource_name : request.resource_names()) {
resources_in_current_request.emplace(resource_name);
auto& subscription_state = subscription_name_map[resource_name];
auto& resource_state = resource_name_map[resource_name];
// Subscribe if needed.
// Send the resource in the response if either (a) this is
// a new subscription or (b) there is an updated version of
// this resource to send.
if (parent_->MaybeSubscribe(v3_resource_type, resource_name,
&subscription_state, &resource_state,
update_queue) ||
ClientNeedsResourceUpdate(resource_type_state, resource_state,
client_resource_type_version,
&subscription_state)) {
gpr_log(GPR_INFO, "ADS[%p]: Sending update for type=%s name=%s", this,
request.type_url().c_str(), resource_name.c_str());
resources_added_to_response.emplace(resource_name);
if (!response->has_value()) response->emplace();
if (resource_state.resource.has_value()) {
auto* resource = (*response)->add_resources();
resource->CopyFrom(resource_state.resource.value());
if (is_v2_) {
resource->set_type_url(request.type_url());
}
}
} else {
gpr_log(GPR_INFO,
"ADS[%p]: client does not need update for type=%s name=%s",
this, request.type_url().c_str(), resource_name.c_str());
}
}
// Process unsubscriptions for any resource no longer
// present in the request's resource list.
parent_->ProcessUnsubscriptions(
v3_resource_type, resources_in_current_request,
&subscription_name_map, &resource_name_map);
// Construct response if needed.
if (!resources_added_to_response.empty()) {
CompleteBuildingDiscoveryResponse(
v3_resource_type, request.type_url(),
resource_type_state.resource_type_version, subscription_name_map,
resources_added_to_response, sent_state, &response->value());
}
}
static std::string TypeUrlToV3(const std::string& resource_type) {
if (resource_type == kLdsV2TypeUrl) return kLdsTypeUrl;
if (resource_type == kRdsV2TypeUrl) return kRdsTypeUrl;
if (resource_type == kCdsV2TypeUrl) return kCdsTypeUrl;
if (resource_type == kEdsV2TypeUrl) return kEdsTypeUrl;
return resource_type;
// Processes a resource update from the test.
// Populates response if needed.
void ProcessUpdate(const std::string& resource_type,
const std::string& resource_name,
SubscriptionMap* subscription_map, SentState* sent_state,
absl::optional<DiscoveryResponse>* response) {
const std::string v2_resource_type = TypeUrlToV2(resource_type);
gpr_log(GPR_INFO, "ADS[%p]: Received update for type=%s name=%s", this,
resource_type.c_str(), resource_name.c_str());
auto& subscription_name_map = (*subscription_map)[resource_type];
auto& resource_type_state = parent_->resource_map_[resource_type];
auto& resource_name_map = resource_type_state.resource_name_map;
auto it = subscription_name_map.find(resource_name);
if (it != subscription_name_map.end()) {
SubscriptionState& subscription_state = it->second;
ResourceState& resource_state = resource_name_map[resource_name];
if (ClientNeedsResourceUpdate(resource_type_state, resource_state,
sent_state->resource_type_version,
&subscription_state)) {
gpr_log(GPR_INFO, "ADS[%p]: Sending update for type=%s name=%s", this,
resource_type.c_str(), resource_name.c_str());
response->emplace();
if (resource_state.resource.has_value()) {
auto* resource = (*response)->add_resources();
resource->CopyFrom(resource_state.resource.value());
if (is_v2_) {
resource->set_type_url(v2_resource_type);
}
}
CompleteBuildingDiscoveryResponse(
resource_type, v2_resource_type,
resource_type_state.resource_type_version, subscription_name_map,
{resource_name}, sent_state, &response->value());
}
}
}
// Starting a thread to do blocking read on the stream until cancel.
@ -989,29 +1029,21 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
*stream_closed = true;
}
static void CheckBuildVersion(
const ::envoy::api::v2::DiscoveryRequest& request) {
EXPECT_FALSE(request.node().build_version().empty());
}
static void CheckBuildVersion(
const ::envoy::service::discovery::v3::DiscoveryRequest& request) {}
// Completing the building a DiscoveryResponse by adding common information
// for all resources and by adding all subscribed resources for LDS and CDS.
void CompleteBuildingDiscoveryResponse(
const std::string& resource_type, const std::string& v2_resource_type,
const int version, const SubscriptionNameMap& subscription_name_map,
const std::set<std::string>& resources_added_to_response,
DiscoveryResponse* response) {
SentState* sent_state, DiscoveryResponse* response) {
auto& response_state =
parent_->resource_type_response_state_[resource_type];
if (response_state.state == ResponseState::NOT_SENT) {
response_state.state = ResponseState::SENT;
}
response->set_type_url(is_v2_ ? v2_resource_type : resource_type);
response->set_version_info(absl::StrCat(version));
response->set_nonce(absl::StrCat(version));
response->set_version_info(std::to_string(version));
response->set_nonce(std::to_string(++sent_state->nonce));
if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) {
// For LDS and CDS we must send back all subscribed resources
// (even the unchanged ones)
@ -1019,8 +1051,10 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
const std::string& resource_name = p.first;
if (resources_added_to_response.find(resource_name) ==
resources_added_to_response.end()) {
ResourceNameMap& resource_name_map =
parent_->resource_map_[resource_type].resource_name_map;
const ResourceState& resource_state =
parent_->resource_map_[resource_type][resource_name];
resource_name_map[resource_name];
if (resource_state.resource.has_value()) {
auto* resource = response->add_resources();
resource->CopyFrom(resource_state.resource.value());
@ -1031,39 +1065,65 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
}
}
}
sent_state->resource_type_version = version;
}
static std::string TypeUrlToV2(const std::string& resource_type) {
if (resource_type == kLdsTypeUrl) return kLdsV2TypeUrl;
if (resource_type == kRdsTypeUrl) return kRdsV2TypeUrl;
if (resource_type == kCdsTypeUrl) return kCdsV2TypeUrl;
if (resource_type == kEdsTypeUrl) return kEdsV2TypeUrl;
return resource_type;
}
static std::string TypeUrlToV3(const std::string& resource_type) {
if (resource_type == kLdsV2TypeUrl) return kLdsTypeUrl;
if (resource_type == kRdsV2TypeUrl) return kRdsTypeUrl;
if (resource_type == kCdsV2TypeUrl) return kCdsTypeUrl;
if (resource_type == kEdsV2TypeUrl) return kEdsTypeUrl;
return resource_type;
}
static void CheckBuildVersion(
const ::envoy::api::v2::DiscoveryRequest& request) {
EXPECT_FALSE(request.node().build_version().empty());
}
static void CheckBuildVersion(
const ::envoy::service::discovery::v3::DiscoveryRequest& request) {}
AdsServiceImpl* parent_;
const bool is_v2_;
};
// Checks whether the client needs to receive a newer version of
// the resource. If so, updates subscription_state->current_version and
// returns true.
static bool ClientNeedsResourceUpdate(const ResourceState& resource_state,
SubscriptionState* subscription_state) {
if (subscription_state->current_version < resource_state.version) {
subscription_state->current_version = resource_state.version;
return true;
}
return false;
// the resource.
static bool ClientNeedsResourceUpdate(
const ResourceTypeState& resource_type_state,
const ResourceState& resource_state, int client_resource_type_version,
SubscriptionState* subscription_state) {
return client_resource_type_version <
resource_type_state.resource_type_version &&
resource_state.resource_type_version <=
resource_type_state.resource_type_version;
}
// Subscribes to a resource if not already subscribed:
// 1. Sets the update_queue field in subscription_state.
// 2. Adds subscription_state to resource_state->subscriptions.
void MaybeSubscribe(const std::string& resource_type,
bool MaybeSubscribe(const std::string& resource_type,
const std::string& resource_name,
SubscriptionState* subscription_state,
ResourceState* resource_state,
UpdateQueue* update_queue) {
// The update_queue will be null if we were not previously subscribed.
if (subscription_state->update_queue != nullptr) return;
if (subscription_state->update_queue != nullptr) return false;
subscription_state->update_queue = update_queue;
resource_state->subscriptions.emplace(subscription_state);
gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p",
this, resource_type.c_str(), resource_name.c_str(),
&subscription_state);
return true;
}
// Removes subscriptions for resources no longer present in the

Loading…
Cancel
Save