xds: accept resources wrapped in a Resource message (#29090)

* xds: accept resources wrapped in a Resource message

* fix v2 proto name
pull/29159/head
Mark D. Roth 3 years ago committed by GitHub
parent 4b9c915e81
commit 7b4a5fcba5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      src/core/ext/xds/xds_api.cc
  2. 45
      src/proto/grpc/testing/xds/v3/discovery.proto
  3. 24
      test/cpp/end2end/xds/xds_end2end_test.cc
  4. 19
      test/cpp/end2end/xds/xds_server.h

@ -326,6 +326,9 @@ grpc_slice XdsApi::CreateAdsRequest(
arena.ptr());
PopulateNode(context, node_, build_version_, user_agent_name_,
user_agent_version_, node_msg);
envoy_config_core_v3_Node_add_client_features(
node_msg, upb_StringView_FromString("xds.config.resource-in-sotw"),
context.arena);
}
// Add resource_names.
for (const std::string& resource_name : resource_names) {
@ -399,6 +402,23 @@ absl::Status XdsApi::ParseAdsResponse(const XdsBootstrap::XdsServer& server,
"type.googleapis.com/");
absl::string_view serialized_resource =
UpbStringToAbsl(google_protobuf_Any_value(resources[i]));
// Unwrap Resource messages, if so wrapped.
if (type_url == "envoy.api.v2.Resource" ||
type_url == "envoy.service.discovery.v3.Resource") {
const auto* resource_wrapper = envoy_service_discovery_v3_Resource_parse(
serialized_resource.data(), serialized_resource.size(), arena.ptr());
if (resource_wrapper == nullptr) {
return absl::InvalidArgumentError(
"Can't decode Resource proto wrapper");
}
const auto* resource =
envoy_service_discovery_v3_Resource_resource(resource_wrapper);
type_url = absl::StripPrefix(
UpbStringToAbsl(google_protobuf_Any_type_url(resource)),
"type.googleapis.com/");
serialized_resource =
UpbStringToAbsl(google_protobuf_Any_value(resource));
}
parser->ParseResource(context, i, type_url, serialized_resource);
}
return absl::OkStatus();

@ -21,6 +21,7 @@ package envoy.service.discovery.v3;
import "src/proto/grpc/testing/xds/v3/base.proto";
import "google/protobuf/any.proto";
import "google/protobuf/duration.proto";
message Status {
// The status code, which should be an enum value of [google.rpc.Code][].
@ -120,3 +121,47 @@ message DiscoveryResponse {
// required for non-stream based xDS implementations.
string nonce = 5;
}
// [#next-free-field: 8]
message Resource {
// Cache control properties for the resource.
// [#not-implemented-hide:]
message CacheControl {
// If true, xDS proxies may not cache this resource.
// Note that this does not apply to clients other than xDS proxies, which must cache resources
// for their own use, regardless of the value of this field.
bool do_not_cache = 1;
}
// The resource's name, to distinguish it from others of the same type of resource.
string name = 3;
// The aliases are a list of other names that this resource can go by.
repeated string aliases = 4;
// The resource level version. It allows xDS to track the state of individual
// resources.
string version = 1;
// The resource being tracked.
google.protobuf.Any resource = 2;
// Time-to-live value for the resource. For each resource, a timer is started. The timer is
// reset each time the resource is received with a new TTL. If the resource is received with
// no TTL set, the timer is removed for the resource. Upon expiration of the timer, the
// configuration for the resource will be removed.
//
// The TTL can be refreshed or changed by sending a response that doesn't change the resource
// version. In this case the resource field does not need to be populated, which allows for
// light-weight "heartbeat" updates to keep a resource with a TTL alive.
//
// The TTL feature is meant to support configurations that should be removed in the event of
// a management server failure. For example, the feature may be used for fault injection
// testing where the fault injection should be terminated in the event that Envoy loses contact
// with the management server.
google.protobuf.Duration ttl = 6;
// Cache control properties for the resource.
// [#not-implemented-hide:]
CacheControl cache_control = 7;
}

@ -1996,6 +1996,30 @@ TEST_P(BasicTest, Vanilla) {
channel_->GetLoadBalancingPolicyName());
}
// Tests that the client can handle resource wrapped in a Resource message.
TEST_P(BasicTest, ResourceWrappedInResourceMessage) {
balancer_->ads_service()->set_wrap_resources(true);
const size_t kNumRpcsPerAddress = 100;
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
// We need to wait for all backends to come online.
WaitForAllBackends();
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress,
backends_[i]->backend_service()->request_count());
}
// Check LB policy name for the channel.
EXPECT_EQ("xds_cluster_manager_experimental",
channel_->GetLoadBalancingPolicyName());
}
TEST_P(BasicTest, IgnoresUnhealthyEndpoints) {
const size_t kNumRpcsPerAddress = 100;
auto endpoints = CreateEndpointsForBackends();

@ -37,6 +37,7 @@
#include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
#include "src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/listener.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
@ -93,6 +94,11 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
return &v3_rpc_service_;
}
void set_wrap_resources(bool wrap_resources) {
grpc_core::MutexLock lock(&ads_mu_);
wrap_resources_ = wrap_resources;
}
// Sets a resource to a particular value, overwriting any previous value.
void SetResource(google::protobuf::Any resource, const std::string& type_url,
const std::string& name);
@ -471,6 +477,11 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
if (is_v2_) {
resource->set_type_url(request.type_url());
}
if (parent_->wrap_resources_) {
envoy::service::discovery::v3::Resource resource_wrapper;
*resource_wrapper.mutable_resource() = std::move(*resource);
resource->PackFrom(resource_wrapper);
}
}
} else {
gpr_log(GPR_INFO,
@ -537,9 +548,10 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
while (stream->Read(&request)) {
if (!seen_first_request) {
EXPECT_TRUE(request.has_node());
ASSERT_FALSE(request.node().client_features().empty());
EXPECT_EQ(request.node().client_features(0),
"envoy.lb.does_not_support_overprovisioning");
EXPECT_THAT(request.node().client_features(),
::testing::UnorderedElementsAre(
"envoy.lb.does_not_support_overprovisioning",
"xds.config.resource-in-sotw"));
CheckBuildVersion(request);
seen_first_request = true;
}
@ -677,6 +689,7 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
// - There is at least one subscription for the resource.
ResourceMap resource_map_ ABSL_GUARDED_BY(ads_mu_);
absl::optional<Status> forced_ads_failure_ ABSL_GUARDED_BY(ads_mu_);
bool wrap_resources_ ABSL_GUARDED_BY(ads_mu_) = false;
grpc_core::Mutex clients_mu_;
std::set<std::string> clients_ ABSL_GUARDED_BY(clients_mu_);

Loading…
Cancel
Save