diff --git a/CMakeLists.txt b/CMakeLists.txt index 023d843552b..b29cd7b836a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1248,7 +1248,6 @@ if(gRPC_BUILD_TESTS) endif() add_dependencies(buildtests_cxx xds_bootstrap_test) add_dependencies(buildtests_cxx xds_certificate_provider_test) - add_dependencies(buildtests_cxx xds_client_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx xds_cluster_end2end_test) endif() @@ -19890,54 +19889,6 @@ target_link_libraries(xds_certificate_provider_test ) -endif() -if(gRPC_BUILD_TESTS) - -add_executable(xds_client_test - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.h - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.cc - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.cc - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.h - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.cc - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h - test/core/xds/xds_client_test.cc - test/core/xds/xds_transport_fake.cc - third_party/googletest/googletest/src/gtest-all.cc - third_party/googletest/googlemock/src/gmock-all.cc -) - -target_include_directories(xds_client_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} - third_party/googletest/googletest/include - third_party/googletest/googletest - third_party/googletest/googlemock/include - third_party/googletest/googlemock - ${_gRPC_PROTO_GENS_DIR} -) - -target_link_libraries(xds_client_test - ${_gRPC_PROTOBUF_LIBRARIES} - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc_test_util -) - - endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 79f33e592f3..bbb2a8cf445 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -10851,21 +10851,6 @@ targets: - test/core/xds/xds_certificate_provider_test.cc deps: - grpc_test_util -- name: xds_client_test - gtest: true - build: test - language: c++ - headers: - - test/core/xds/xds_transport_fake.h - src: - - src/proto/grpc/testing/xds/v3/base.proto - - src/proto/grpc/testing/xds/v3/discovery.proto - - src/proto/grpc/testing/xds/v3/percent.proto - - test/core/xds/xds_client_test.cc - - test/core/xds/xds_transport_fake.cc - deps: - - grpc_test_util - uses_polling: false - name: xds_cluster_end2end_test gtest: true build: test diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 3f3bc8e33fd..030574d5140 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -433,11 +433,11 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr xds_client, server, [self = WeakRef(DEBUG_LOCATION, "OnConnectivityFailure")]( absl::Status status) { - self->OnConnectivityFailure(std::move(status)); + self->OnConnectivityStateChange(std::move(status)); }, &status); GPR_ASSERT(transport_ != nullptr); - if (!status.ok()) SetChannelStatusLocked(std::move(status)); + if (!status.ok()) OnConnectivityStateChangeLocked(std::move(status)); } XdsClient::ChannelState::~ChannelState() { @@ -475,6 +475,10 @@ XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld() return lrs_calld_->calld(); } +bool XdsClient::ChannelState::HasActiveAdsCall() const { + return ads_calld_ != nullptr && ads_calld_->calld() != nullptr; +} + void XdsClient::ChannelState::MaybeStartLrsCall() { if (lrs_calld_ != nullptr) return; lrs_calld_.reset(new RetryableCall( @@ -518,54 +522,27 @@ void XdsClient::ChannelState::UnsubscribeLocked(const XdsResourceType* type, } } -void XdsClient::ChannelState::OnConnectivityFailure(absl::Status status) { +void XdsClient::ChannelState::OnConnectivityStateChange(absl::Status status) { { MutexLock lock(&xds_client_->mu_); - SetChannelStatusLocked(std::move(status)); + OnConnectivityStateChangeLocked(std::move(status)); } xds_client_->work_serializer_.DrainQueue(); } -void XdsClient::ChannelState::SetChannelStatusLocked(absl::Status status) { - if (shutting_down_) return; - status = absl::Status(status.code(), absl::StrCat("xDS channel for server ", - server_.server_uri(), ": ", - status.message())); - gpr_log(GPR_INFO, "[xds_client %p] %s", xds_client(), - status.ToString().c_str()); - // If the node ID is set, append that to the status message that we send to - // the watchers, so that it will appear in log messages visible to users. - const auto* node = xds_client_->bootstrap_->node(); - if (node != nullptr) { - status = absl::Status( - status.code(), - absl::StrCat(status.message(), - " (node ID:", xds_client_->bootstrap_->node()->id(), ")")); - } - // Save status in channel, so that we can immediately generate an - // error for any new watchers that may be started. - status_ = status; - // Find all watchers for this channel. - std::set> watchers; - for (const auto& a : xds_client_->authority_state_map_) { // authority - if (a.second.channel_state != this) continue; - for (const auto& t : a.second.resource_map) { // type - for (const auto& r : t.second) { // resource id - for (const auto& w : r.second.watchers) { // watchers - watchers.insert(w.second); - } - } - } +void XdsClient::ChannelState::OnConnectivityStateChangeLocked( + absl::Status status) { + if (!shutting_down_) { + // Notify all watchers of error. + gpr_log(GPR_INFO, + "[xds_client %p] xds channel for server %s in " + "state TRANSIENT_FAILURE: %s", + xds_client(), server_.server_uri().c_str(), + status.ToString().c_str()); + xds_client_->NotifyOnErrorLocked(absl::UnavailableError( + absl::StrCat("xds channel in TRANSIENT_FAILURE, connectivity error: ", + status.ToString()))); } - // Enqueue notification for the watchers. - xds_client_->work_serializer_.Schedule( - [watchers = std::move(watchers), status = std::move(status)]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(xds_client_->work_serializer_) { - for (const auto& watcher : watchers) { - watcher->OnError(status); - } - }, - DEBUG_LOCATION); } // @@ -732,32 +709,19 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( XdsResourceType::DecodeContext context = { xds_client(), ads_call_state_->chand()->server_, &grpc_xds_client_trace, xds_client()->symtab_.ptr(), arena}; - absl::StatusOr decode_result = + absl::StatusOr result = result_.type->Decode(context, serialized_resource, is_v2); - // If we didn't already have the resource name from the Resource - // wrapper, try to get it from the decoding result. - if (resource_name.empty()) { - if (decode_result.ok()) { - resource_name = decode_result->name; - error_prefix = - absl::StrCat("resource index ", idx, ": ", resource_name, ": "); - } else { - // We don't have any way of determining the resource name, so - // there's nothing more we can do here. - result_.errors.emplace_back( - absl::StrCat(error_prefix, decode_result.status().ToString())); - return; - } - } - // If decoding failed, make sure we include the error in the NACK. - const absl::Status& decode_status = decode_result.ok() - ? decode_result->resource.status() - : decode_result.status(); - if (!decode_status.ok()) { + if (!result.ok()) { result_.errors.emplace_back( - absl::StrCat(error_prefix, decode_status.ToString())); + absl::StrCat(error_prefix, result.status().ToString())); + return; } // Check the resource name. + if (resource_name.empty()) { + resource_name = result->name; + error_prefix = + absl::StrCat("resource index ", idx, ": ", resource_name, ": "); + } auto parsed_resource_name = xds_client()->ParseXdsResourceName(resource_name, result_.type); if (!parsed_resource_name.ok()) { @@ -814,12 +778,16 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( resource_state.ignored_deletion = false; } // Update resource state based on whether the resource is valid. - if (!decode_status.ok()) { + if (!result->resource.ok()) { + result_.errors.emplace_back(absl::StrCat( + error_prefix, + "validation error: ", result->resource.status().ToString())); xds_client()->NotifyWatchersOnErrorLocked( resource_state.watchers, - absl::UnavailableError( - absl::StrCat("invalid resource: ", decode_status.ToString()))); - UpdateResourceMetadataNacked(result_.version, decode_status.ToString(), + absl::UnavailableError(absl::StrCat( + "invalid resource: ", result->resource.status().ToString()))); + UpdateResourceMetadataNacked(result_.version, + result->resource.status().ToString(), update_time_, &resource_state.meta); return; } @@ -828,7 +796,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( // If it didn't change, ignore it. if (resource_state.resource != nullptr && result_.type->ResourcesEqual(resource_state.resource.get(), - decode_result->resource->get())) { + result->resource->get())) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] %s resource %s identical to current, ignoring.", @@ -838,7 +806,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( return; } // Update the resource state. - resource_state.resource = std::move(*decode_result->resource); + resource_state.resource = std::move(*result->resource); resource_state.meta = CreateResourceMetadataAcked( std::string(serialized_resource), result_.version, update_time_); // Notify watchers. @@ -1019,12 +987,11 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage( status.ToString().c_str()); } else { seen_response_ = true; - chand()->status_ = absl::OkStatus(); AdsResponseParser::Result result = parser.TakeResult(); // Update nonce. auto& state = state_map_[result.type]; state.nonce = result.nonce; - // If we got an error, set state.status so that we'll NACK the update. + // If we got an error, set state.error so that we'll NACK the update. if (!result.errors.empty()) { state.status = absl::UnavailableError( absl::StrCat("xDS response validation errors: [", @@ -1117,9 +1084,10 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived( if (IsCurrentCallOnChannel()) { // Try to restart the call. parent_->OnCallFinishedLocked(); - // Send error to all watchers for the channel. - chand()->SetChannelStatusLocked(absl::UnavailableError(absl::StrFormat( - "xDS call failed; status: %s", status.ToString().c_str()))); + // Send error to all watchers. + xds_client()->NotifyOnErrorLocked(absl::UnavailableError(absl::StrFormat( + "xDS call failed: xDS server: %s, ADS call status: %s", + chand()->server_.server_uri(), status.ToString().c_str()))); } } xds_client()->work_serializer_.DrainQueue(); @@ -1493,16 +1461,17 @@ void XdsClient::WatchResource(const XdsResourceType* type, invalid_watchers_[w] = watcher; } work_serializer_.Run( - [watcher = std::move(watcher), status = std::move(status)]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { - watcher->OnError(status); - }, + // TODO(yashykt): When we move to C++14, capture watcher using + // std::move() + [watcher, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { + watcher->OnError(status); + }, DEBUG_LOCATION); }; auto resource_name = ParseXdsResourceName(name, type); if (!resource_name.ok()) { - fail(absl::UnavailableError( - absl::StrCat("Unable to parse resource name ", name))); + fail(absl::UnavailableError(absl::StrFormat( + "Unable to parse resource name for listener %s", name))); return; } // Find server to use. @@ -1542,39 +1511,6 @@ void XdsClient::WatchResource(const XdsResourceType* type, delete value; }, DEBUG_LOCATION); - } else if (resource_state.meta.client_status == - XdsApi::ResourceMetadata::DOES_NOT_EXIST) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] reporting cached does-not-exist for %s", this, - std::string(name).c_str()); - } - work_serializer_.Schedule( - [watcher]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { - watcher->OnResourceDoesNotExist(); - }, - DEBUG_LOCATION); - } else if (resource_state.meta.client_status == - XdsApi::ResourceMetadata::NACKED) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log( - GPR_INFO, - "[xds_client %p] reporting cached validation failure for %s: %s", - this, std::string(name).c_str(), - resource_state.meta.failed_details.c_str()); - } - std::string details = resource_state.meta.failed_details; - const auto* node = bootstrap_->node(); - if (node != nullptr) { - absl::StrAppend(&details, " (node ID:", bootstrap_->node()->id(), ")"); - } - work_serializer_.Schedule( - [watcher, details = std::move(details)]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { - watcher->OnError(absl::UnavailableError( - absl::StrCat("invalid resource: ", details))); - }, - DEBUG_LOCATION); } // If the authority doesn't yet have a channel, set it, creating it if // needed. @@ -1582,21 +1518,6 @@ void XdsClient::WatchResource(const XdsResourceType* type, authority_state.channel_state = GetOrCreateChannelStateLocked(*xds_server, "start watch"); } - absl::Status channel_status = authority_state.channel_state->status(); - if (!channel_status.ok()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] returning cached channel error for %s: %s", - this, std::string(name).c_str(), - channel_status.ToString().c_str()); - } - work_serializer_.Schedule( - [watcher = std::move(watcher), status = std::move(channel_status)]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) mutable { - watcher->OnError(std::move(status)); - }, - DEBUG_LOCATION); - } authority_state.channel_state->SubscribeLocked(type, *resource_name); } work_serializer_.DrainQueue(); @@ -1860,6 +1781,34 @@ void XdsClient::ResetBackoff() { } } +void XdsClient::NotifyOnErrorLocked(absl::Status status) { + const auto* node = bootstrap_->node(); + if (node != nullptr) { + status = absl::Status( + status.code(), + absl::StrCat(status.message(), " (node ID:", node->id(), ")")); + } + std::set> watchers; + for (const auto& a : authority_state_map_) { // authority + for (const auto& t : a.second.resource_map) { // type + for (const auto& r : t.second) { // resource id + for (const auto& w : r.second.watchers) { // watchers + watchers.insert(w.second); + } + } + } + } + work_serializer_.Schedule( + // TODO(yashykt): When we move to C++14, capture watchers using + // std::move() + [watchers, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { + for (const auto& watcher : watchers) { + watcher->OnError(status); + } + }, + DEBUG_LOCATION); +} + void XdsClient::NotifyWatchersOnErrorLocked( const std::map>& watchers, @@ -1871,12 +1820,11 @@ void XdsClient::NotifyWatchersOnErrorLocked( absl::StrCat(status.message(), " (node ID:", node->id(), ")")); } work_serializer_.Schedule( - [watchers, status = std::move(status)]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { - for (const auto& p : watchers) { - p.first->OnError(status); - } - }, + [watchers, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { + for (const auto& p : watchers) { + p.first->OnError(status); + } + }, DEBUG_LOCATION); } diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index b936e9ac82c..b5fbabb9651 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -186,9 +186,8 @@ class XdsClient : public DualRefCounted { void MaybeStartLrsCall(); void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - // Returns non-OK if there has been an error since the last time the - // ADS stream saw a response. - const absl::Status& status() const { return status_; } + bool HasAdsCall() const; + bool HasActiveAdsCall() const; void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name) @@ -199,11 +198,8 @@ class XdsClient : public DualRefCounted { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); private: - void OnConnectivityFailure(absl::Status status); - - // Enqueues error notifications to watchers. Caller must drain - // XdsClient::work_serializer_ after releasing the lock. - void SetChannelStatusLocked(absl::Status status) + void OnConnectivityStateChange(absl::Status status); + void OnConnectivityStateChangeLocked(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); // The owning xds client. @@ -222,8 +218,6 @@ class XdsClient : public DualRefCounted { // Stores the most recent accepted resource version for each resource type. std::map resource_type_version_map_; - - absl::Status status_; }; struct ResourceState { @@ -265,6 +259,9 @@ class XdsClient : public DualRefCounted { LoadReportMap load_report_map; }; + // Sends an error notification to all watchers. + void NotifyOnErrorLocked(absl::Status status) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Sends an error notification to a specific set of watchers. void NotifyWatchersOnErrorLocked( const std::map #include -#include "absl/strings/str_cat.h" - #include #include #include @@ -237,9 +235,7 @@ class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& status) override { if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - on_connectivity_failure_(absl::Status( - status.code(), - absl::StrCat("channel in TRANSIENT_FAILURE: ", status.message()))); + on_connectivity_failure_(status); } } diff --git a/src/core/lib/json/json_object_loader.h b/src/core/lib/json/json_object_loader.h index baf61afbbcf..2fbec033136 100644 --- a/src/core/lib/json/json_object_loader.h +++ b/src/core/lib/json/json_object_loader.h @@ -51,7 +51,7 @@ // struct Foo { // int a; // int b; -// static const JsonLoaderInterface* JsonLoader(const JsonArgs& args) { +// static const JsonLoaderInterface* JsonLoader() { // // Note: Field names must be string constants; they are not copied. // static const auto* loader = JsonObjectLoader() // .Field("a", &Foo::a) @@ -60,10 +60,7 @@ // return loader; // } // // Optional; omit if no post-processing needed. -// void JsonPostLoad(const Json& source, const JsonArgs& args, -// ErrorList* errors) { -// ++a; -// } +// void JsonPostLoad(const Json& source, ErrorList* errors) { ++a; } // }; // Now we can load Foo objects from JSON: // absl::StatusOr foo = LoadFromJson(json); diff --git a/test/core/xds/BUILD b/test/core/xds/BUILD index e77e713d6a2..789fdbc4562 100644 --- a/test/core/xds/BUILD +++ b/test/core/xds/BUILD @@ -12,12 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -load( - "//bazel:grpc_build_system.bzl", - "grpc_cc_library", - "grpc_cc_test", - "grpc_package", -) +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") grpc_package(name = "test/core/xds") @@ -121,36 +116,3 @@ grpc_cc_test( "//test/cpp/util:grpc_cli_utils", ], ) - -grpc_cc_library( - name = "xds_transport_fake", - testonly = True, - srcs = ["xds_transport_fake.cc"], - hdrs = ["xds_transport_fake.h"], - external_deps = [ - "absl/strings", - "absl/types:optional", - ], - language = "C++", - deps = [ - "//:orphanable", - "//:ref_counted_ptr", - "//:xds_client", - "//test/core/util:grpc_test_util", - ], -) - -grpc_cc_test( - name = "xds_client_test", - srcs = ["xds_client_test.cc"], - external_deps = ["gtest"], - language = "C++", - uses_event_engine = True, - uses_polling = False, - deps = [ - ":xds_transport_fake", - "//:xds_client", - "//src/proto/grpc/testing/xds/v3:discovery_proto", - "//test/core/util:grpc_test_util", - ], -) diff --git a/test/core/xds/xds_client_test.cc b/test/core/xds/xds_client_test.cc deleted file mode 100644 index 65ae89f055d..00000000000 --- a/test/core/xds/xds_client_test.cc +++ /dev/null @@ -1,2303 +0,0 @@ -// -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -// TODO(roth): Add the following tests: -// - tests for DumpClientConfigBinary() -// - tests for load-reporting APIs? (or maybe move those out of XdsClient?) - -#include "src/core/ext/xds/xds_client.h" - -#include -#include -#include -#include - -#include -#include - -#include "src/core/ext/xds/xds_bootstrap.h" -#include "src/core/ext/xds/xds_resource_type_impl.h" -#include "src/core/lib/gpr/env.h" -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/json/json.h" -#include "src/core/lib/json/json_object_loader.h" -#include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h" -#include "test/core/util/test_config.h" -#include "test/core/xds/xds_transport_fake.h" - -using envoy::service::discovery::v3::DiscoveryRequest; -using envoy::service::discovery::v3::DiscoveryResponse; - -namespace grpc_core { -namespace testing { -namespace { - -class XdsClientTest : public ::testing::Test { - protected: - // A fake bootstrap implementation that allows tests to populate the - // fields however they want. - class FakeXdsBootstrap : public XdsBootstrap { - public: - class FakeNode : public Node { - public: - const std::string& id() const override { return id_; } - const std::string& cluster() const override { return cluster_; } - const std::string& locality_region() const override { - return locality_region_; - } - const std::string& locality_zone() const override { - return locality_zone_; - } - const std::string& locality_sub_zone() const override { - return locality_sub_zone_; - } - const Json::Object& metadata() const override { return metadata_; } - - void set_id(std::string id) { id_ = std::move(id); } - void set_cluster(std::string cluster) { cluster_ = std::move(cluster); } - void set_locality_region(std::string locality_region) { - locality_region_ = std::move(locality_region); - } - void set_locality_zone(std::string locality_zone) { - locality_zone_ = std::move(locality_zone); - } - void set_locality_sub_zone(std::string locality_sub_zone) { - locality_sub_zone_ = std::move(locality_sub_zone); - } - void set_metadata(Json::Object metadata) { - metadata_ = std::move(metadata); - } - - private: - std::string id_ = "xds_client_test"; - std::string cluster_; - std::string locality_region_; - std::string locality_zone_; - std::string locality_sub_zone_; - Json::Object metadata_; - }; - - class FakeXdsServer : public XdsServer { - public: - const std::string& server_uri() const override { return server_uri_; } - bool ShouldUseV3() const override { return use_v3_; } - bool IgnoreResourceDeletion() const override { - return ignore_resource_deletion_; - } - bool Equals(const XdsServer& other) const override { - const auto& o = static_cast(other); - return server_uri_ == o.server_uri_ && use_v3_ == o.use_v3_ && - ignore_resource_deletion_ == o.ignore_resource_deletion_; - } - - void set_server_uri(std::string server_uri) { - server_uri_ = std::move(server_uri); - } - void set_use_v3(bool use_v3) { use_v3_ = use_v3; } - void set_ignore_resource_deletion(bool ignore_resource_deletion) { - ignore_resource_deletion_ = ignore_resource_deletion; - } - - private: - std::string server_uri_ = "default_xds_server"; - bool use_v3_ = true; - bool ignore_resource_deletion_ = false; - }; - - class FakeAuthority : public Authority { - public: - const XdsServer* server() const override { - return server_.has_value() ? &*server_ : nullptr; - } - - void set_server(absl::optional server) { - server_ = std::move(server); - } - - private: - absl::optional server_; - }; - - class Builder { - public: - Builder() { node_.emplace(); } - - Builder& set_use_v2() { - server_.set_use_v3(false); - return *this; - } - Builder& set_node_id(std::string id) { - if (!node_.has_value()) node_.emplace(); - node_->set_id(std::move(id)); - return *this; - } - Builder& AddAuthority(std::string name, FakeAuthority authority) { - authorities_[std::move(name)] = std::move(authority); - return *this; - } - std::unique_ptr Build() { - auto bootstrap = absl::make_unique(); - bootstrap->server_ = std::move(server_); - bootstrap->node_ = std::move(node_); - bootstrap->authorities_ = std::move(authorities_); - return bootstrap; - } - - private: - FakeXdsServer server_; - absl::optional node_; - std::map authorities_; - }; - - std::string ToString() const override { return ""; } - - const XdsServer& server() const override { return server_; } - const Node* node() const override { - return node_.has_value() ? &*node_ : nullptr; - } - const Authority* LookupAuthority(const std::string& name) const override { - auto it = authorities_.find(name); - if (it == authorities_.end()) return nullptr; - return &it->second; - } - const XdsServer* FindXdsServer(const XdsServer& server) const override { - const auto& fake_server = static_cast(server); - if (fake_server == server_) return &server_; - for (const auto& p : authorities_) { - const auto* authority_server = - static_cast(p.second.server()); - if (authority_server != nullptr && *authority_server == fake_server) { - return authority_server; - } - } - return nullptr; - } - - private: - FakeXdsServer server_; - absl::optional node_; - std::map authorities_; - }; - - // A template for a test xDS resource type with an associated watcher impl. - // For simplicity, we use JSON instead of proto for serialization. - // The specified ResourceStruct must provide the following: - // - a static JsonLoader() method, as described in json_object_loader.h - // - an AsJsonString() method that returns the object in JSON string form - // - a static TypeUrl() method that returns the V3 resource type - // - a static TypeUrlV2() method that returns the V2 resource type - template - class XdsTestResourceType - : public XdsResourceTypeImpl, - ResourceStruct> { - public: - // A watcher implementation that queues delivered watches. - class Watcher - : public XdsResourceTypeImpl, - ResourceStruct>::WatcherInterface { - public: - bool ExpectNoEvent(absl::Duration timeout) { - MutexLock lock(&mu_); - return !WaitForEventLocked(timeout); - } - - bool HasEvent() { - MutexLock lock(&mu_); - return !queue_.empty(); - } - - absl::optional WaitForNextResource( - absl::Duration timeout = absl::Seconds(1), - SourceLocation location = SourceLocation()) { - MutexLock lock(&mu_); - if (!WaitForEventLocked(timeout)) return absl::nullopt; - Event& event = queue_.front(); - if (!absl::holds_alternative(event)) { - EXPECT_TRUE(false) - << "got unexpected event " - << (absl::holds_alternative(event) - ? "error" - : "does-not-exist") - << " at " << location.file() << ":" << location.line(); - return absl::nullopt; - } - ResourceStruct foo = std::move(absl::get(event)); - queue_.pop_front(); - return std::move(foo); - } - - absl::optional WaitForNextError( - absl::Duration timeout = absl::Seconds(1), - SourceLocation location = SourceLocation()) { - MutexLock lock(&mu_); - if (!WaitForEventLocked(timeout)) return absl::nullopt; - Event& event = queue_.front(); - if (!absl::holds_alternative(event)) { - EXPECT_TRUE(false) - << "got unexpected event " - << (absl::holds_alternative(event) - ? "resource" - : "does-not-exist") - << " at " << location.file() << ":" << location.line(); - return absl::nullopt; - } - absl::Status error = std::move(absl::get(event)); - queue_.pop_front(); - return std::move(error); - } - - bool WaitForDoesNotExist(absl::Duration timeout, - SourceLocation location = SourceLocation()) { - MutexLock lock(&mu_); - if (!WaitForEventLocked(timeout)) return false; - Event& event = queue_.front(); - if (!absl::holds_alternative(event)) { - EXPECT_TRUE(false) - << "got unexpected event " - << (absl::holds_alternative(event) ? "resource" - : "error") - << " at " << location.file() << ":" << location.line(); - return false; - } - queue_.pop_front(); - return true; - } - - private: - struct DoesNotExist {}; - using Event = absl::variant; - - void OnResourceChanged(ResourceStruct foo) override { - MutexLock lock(&mu_); - queue_.push_back(std::move(foo)); - cv_.Signal(); - } - void OnError(absl::Status status) override { - MutexLock lock(&mu_); - queue_.push_back(std::move(status)); - cv_.Signal(); - } - void OnResourceDoesNotExist() override { - MutexLock lock(&mu_); - queue_.push_back(DoesNotExist()); - cv_.Signal(); - } - - bool WaitForEventLocked(absl::Duration timeout) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { - while (queue_.empty()) { - if (cv_.WaitWithTimeout(&mu_, - timeout * grpc_test_slowdown_factor())) { - return false; - } - } - return true; - } - - Mutex mu_; - CondVar cv_; - std::deque queue_ ABSL_GUARDED_BY(&mu_); - }; - - absl::string_view type_url() const override { - return ResourceStruct::TypeUrl(); - } - absl::string_view v2_type_url() const override { - return ResourceStruct::TypeUrlV2(); - } - absl::StatusOr Decode( - const XdsResourceType::DecodeContext& /*context*/, - absl::string_view serialized_resource, bool /*is_v2*/) const override { - auto json = Json::Parse(serialized_resource); - if (!json.ok()) return json.status(); - absl::StatusOr foo = LoadFromJson(*json); - XdsResourceType::DecodeResult result; - if (!foo.ok()) { - auto it = json->object_value().find("name"); - if (it == json->object_value().end()) { - return absl::InvalidArgumentError( - "cannot determine name for invalid resource"); - } - result.name = it->second.string_value(); - result.resource = foo.status(); - } else { - result.name = foo->name; - auto resource = absl::make_unique, - ResourceStruct>::ResourceDataSubclass>(); - resource->resource = std::move(*foo); - result.resource = std::move(resource); - } - return std::move(result); - } - void InitUpbSymtab(upb_DefPool* /*symtab*/) const override {} - - static google::protobuf::Any EncodeAsAny(const ResourceStruct& resource) { - google::protobuf::Any any; - any.set_type_url( - absl::StrCat("type.googleapis.com/", ResourceStruct::TypeUrl())); - any.set_value(resource.AsJsonString()); - return any; - } - }; - - // A fake "Foo" xDS resource type. - struct XdsFooResource { - std::string name; - uint32_t value; - - bool operator==(const XdsFooResource& other) const { - return name == other.name && value == other.value; - } - - std::string AsJsonString() const { - return absl::StrCat("{\"name\":\"", name, "\",\"value\":", value, "}"); - } - - static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { - static const auto* loader = JsonObjectLoader() - .Field("name", &XdsFooResource::name) - .Field("value", &XdsFooResource::value) - .Finish(); - return loader; - } - - static absl::string_view TypeUrl() { return "test.v3.foo"; } - static absl::string_view TypeUrlV2() { return "test.v2.foo"; } - }; - using XdsFooResourceType = XdsTestResourceType; - - // A fake "Bar" xDS resource type. - struct XdsBarResource { - std::string name; - std::string value; - - bool operator==(const XdsBarResource& other) const { - return name == other.name && value == other.value; - } - - std::string AsJsonString() const { - return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value, - "\"}"); - } - - static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { - static const auto* loader = JsonObjectLoader() - .Field("name", &XdsBarResource::name) - .Field("value", &XdsBarResource::value) - .Finish(); - return loader; - } - - static absl::string_view TypeUrl() { return "test.v3.bar"; } - static absl::string_view TypeUrlV2() { return "test.v2.bar"; } - }; - using XdsBarResourceType = XdsTestResourceType; - - // A helper class to build and serialize a DiscoveryResponse. - class ResponseBuilder { - public: - explicit ResponseBuilder(absl::string_view type_url) { - response_.set_type_url(absl::StrCat("type.googleapis.com/", type_url)); - } - - ResponseBuilder& set_version_info(absl::string_view version_info) { - response_.set_version_info(std::string(version_info)); - return *this; - } - ResponseBuilder& set_nonce(absl::string_view nonce) { - response_.set_nonce(std::string(nonce)); - return *this; - } - - template - ResponseBuilder& AddResource( - const decltype(ResourceType::ResourceDataSubclass::resource)& resource, - bool in_resource_wrapper = false) { - auto* res = response_.add_resources(); - *res = ResourceType::EncodeAsAny(resource); - if (in_resource_wrapper) { - envoy::service::discovery::v3::Resource resource_wrapper; - resource_wrapper.set_name(resource.name); - *resource_wrapper.mutable_resource() = std::move(*res); - res->PackFrom(resource_wrapper); - } - return *this; - } - - ResponseBuilder& AddFooResource(const XdsFooResource& resource, - bool in_resource_wrapper = false) { - return AddResource(resource, in_resource_wrapper); - } - - ResponseBuilder& AddBarResource(const XdsBarResource& resource, - bool in_resource_wrapper = false) { - return AddResource(resource, in_resource_wrapper); - } - - ResponseBuilder& AddInvalidResource( - absl::string_view type_url, absl::string_view value, - absl::string_view resource_wrapper_name = "") { - auto* res = response_.add_resources(); - res->set_type_url(absl::StrCat("type.googleapis.com/", type_url)); - res->set_value(std::string(value)); - if (!resource_wrapper_name.empty()) { - envoy::service::discovery::v3::Resource resource_wrapper; - resource_wrapper.set_name(std::string(resource_wrapper_name)); - *resource_wrapper.mutable_resource() = std::move(*res); - res->PackFrom(resource_wrapper); - } - return *this; - } - - std::string Serialize() { - std::string serialized_response; - EXPECT_TRUE(response_.SerializeToString(&serialized_response)); - return serialized_response; - } - - private: - DiscoveryResponse response_; - }; - - class ScopedExperimentalEnvVar { - public: - explicit ScopedExperimentalEnvVar(const char* env_var) : env_var_(env_var) { - gpr_setenv(env_var_, "true"); - } - - ~ScopedExperimentalEnvVar() { gpr_unsetenv(env_var_); } - - private: - const char* env_var_; - }; - - // Sets transport_factory_ and initializes xds_client_ with the - // specified bootstrap config. - void InitXdsClient( - FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(), - Duration resource_request_timeout = Duration::Seconds(15) * - grpc_test_slowdown_factor()) { - auto transport_factory = MakeOrphanable(); - transport_factory_ = transport_factory->Ref(); - xds_client_ = MakeRefCounted(bootstrap_builder.Build(), - std::move(transport_factory), - resource_request_timeout); - } - - // Starts and cancels a watch for a Foo resource. - RefCountedPtr StartFooWatch( - absl::string_view resource_name) { - auto watcher = MakeRefCounted(); - XdsFooResourceType::StartWatch(xds_client_.get(), resource_name, watcher); - return watcher; - } - void CancelFooWatch(XdsFooResourceType::Watcher* watcher, - absl::string_view resource_name, - bool delay_unsubscription = false) { - XdsFooResourceType::CancelWatch(xds_client_.get(), resource_name, watcher, - delay_unsubscription); - } - - // Starts and cancels a watch for a Bar resource. - RefCountedPtr StartBarWatch( - absl::string_view resource_name) { - auto watcher = MakeRefCounted(); - XdsBarResourceType::StartWatch(xds_client_.get(), resource_name, watcher); - return watcher; - } - void CancelBarWatch(XdsBarResourceType::Watcher* watcher, - absl::string_view resource_name, - bool delay_unsubscription = false) { - XdsBarResourceType::CancelWatch(xds_client_.get(), resource_name, watcher, - delay_unsubscription); - } - - RefCountedPtr WaitForAdsStream( - const XdsBootstrap::XdsServer& server, - absl::Duration timeout = absl::Seconds(5)) { - const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server); - GPR_ASSERT(xds_server != nullptr); - return transport_factory_->WaitForStream( - *xds_server, FakeXdsTransportFactory::kAdsMethod, - timeout * grpc_test_slowdown_factor()); - } - - void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server, - absl::Status status) { - const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server); - GPR_ASSERT(xds_server != nullptr); - transport_factory_->TriggerConnectionFailure(*xds_server, status); - } - - RefCountedPtr WaitForAdsStream( - absl::Duration timeout = absl::Seconds(5)) { - return WaitForAdsStream(xds_client_->bootstrap().server(), timeout); - } - - // Gets the latest request sent to the fake xDS server. - absl::optional WaitForRequest( - FakeXdsTransportFactory::FakeStreamingCall* stream, - absl::Duration timeout = absl::Seconds(1), - SourceLocation location = SourceLocation()) { - auto message = - stream->WaitForMessageFromClient(timeout * grpc_test_slowdown_factor()); - if (!message.has_value()) return absl::nullopt; - DiscoveryRequest request; - bool success = request.ParseFromString(*message); - EXPECT_TRUE(success) << "Failed to deserialize DiscoveryRequest at " - << location.file() << ":" << location.line(); - if (!success) return absl::nullopt; - return std::move(request); - } - - // Helper function to check the fields of a DiscoveryRequest. - void CheckRequest(const DiscoveryRequest& request, absl::string_view type_url, - absl::string_view version_info, - absl::string_view response_nonce, absl::Status error_detail, - std::set resource_names, - SourceLocation location = SourceLocation()) { - EXPECT_EQ(request.type_url(), - absl::StrCat("type.googleapis.com/", type_url)) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.version_info(), version_info) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.response_nonce(), response_nonce) - << location.file() << ":" << location.line(); - if (error_detail.ok()) { - EXPECT_FALSE(request.has_error_detail()) - << location.file() << ":" << location.line(); - } else { - EXPECT_EQ(request.error_detail().code(), - static_cast(error_detail.code())) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.error_detail().message(), error_detail.message()) - << location.file() << ":" << location.line(); - } - EXPECT_THAT(request.resource_names(), - ::testing::UnorderedElementsAreArray(resource_names)) - << location.file() << ":" << location.line(); - } - - // Helper function to check the contents of the node message in a - // request against the client's node info. - void CheckRequestNode(const DiscoveryRequest& request, - bool check_build_version = false, - SourceLocation location = SourceLocation()) { - // These fields come from the bootstrap config. - EXPECT_EQ(request.node().id(), xds_client_->bootstrap().node()->id()) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.node().cluster(), - xds_client_->bootstrap().node()->cluster()) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.node().locality().region(), - xds_client_->bootstrap().node()->locality_region()) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.node().locality().zone(), - xds_client_->bootstrap().node()->locality_zone()) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.node().locality().sub_zone(), - xds_client_->bootstrap().node()->locality_sub_zone()) - << location.file() << ":" << location.line(); - if (xds_client_->bootstrap().node()->metadata().empty()) { - EXPECT_FALSE(request.node().has_metadata()) - << location.file() << ":" << location.line(); - } else { - std::string metadata_json_str; - auto status = - MessageToJsonString(request.node().metadata(), &metadata_json_str, - google::protobuf::util::JsonPrintOptions()); - ASSERT_TRUE(status.ok()) - << status << " on " << location.file() << ":" << location.line(); - auto metadata_json = Json::Parse(metadata_json_str); - ASSERT_TRUE(metadata_json.ok()) - << metadata_json.status() << " on " << location.file() << ":" - << location.line(); - EXPECT_EQ(*metadata_json, xds_client_->bootstrap().node()->metadata()) - << location.file() << ":" << location.line() << ":\nexpected: " - << Json{xds_client_->bootstrap().node()->metadata()}.Dump() - << "\nactual: " << metadata_json->Dump(); - } - // These are hard-coded by XdsClient. - EXPECT_EQ(request.node().user_agent_name(), - absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING)) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.node().user_agent_version(), - absl::StrCat("C-core ", grpc_version_string())) - << location.file() << ":" << location.line(); - if (check_build_version) { - auto build_version = GetBuildVersion(request.node()); - ASSERT_TRUE(build_version.has_value()) - << location.file() << ":" << location.line(); - EXPECT_EQ(*build_version, - absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING, " ", - grpc_version_string())) - << location.file() << ":" << location.line(); - } - } - - // Helper function to find the "build_version" field, which was - // removed in v3, but which we still populate in v2. - static absl::optional GetBuildVersion( - const envoy::config::core::v3::Node& node) { - const auto& unknown_field_set = - node.GetReflection()->GetUnknownFields(node); - for (int i = 0; i < unknown_field_set.field_count(); ++i) { - const auto& unknown_field = unknown_field_set.field(i); - if (unknown_field.number() == 5) return unknown_field.length_delimited(); - } - return absl::nullopt; - } - - RefCountedPtr transport_factory_; - RefCountedPtr xds_client_; -}; - -TEST_F(XdsClientTest, BasicWatch) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Cancel watch. - CancelFooWatch(watcher.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, UpdateFromServer) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Server sends an updated version of the resource. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("2") - .set_nonce("B") - .AddFooResource(XdsFooResource{"foo1", 9}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 9); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Cancel watch. - CancelFooWatch(watcher.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, MultipleWatchersForSameResource) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Start a second watcher for the same resource. - auto watcher2 = StartFooWatch("foo1"); - // This watcher should get an immediate notification, because the - // resource is already cached. - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // Server should not have seen another request from the client. - ASSERT_FALSE(stream->HaveMessageFromClient()); - // Server sends an updated version of the resource. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("2") - .set_nonce("B") - .AddFooResource(XdsFooResource{"foo1", 9}) - .Serialize()); - // XdsClient should deliver the response to both watchers. - resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 9); - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 9); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Cancel one of the watchers. - CancelFooWatch(watcher.get(), "foo1"); - // The server should not see any new request. - ASSERT_FALSE(WaitForRequest(stream.get())); - // Now cancel the second watcher. - CancelFooWatch(watcher2.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, SubscribeToMultipleResources) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Start a watch for "foo2". - auto watcher2 = StartFooWatch("foo2"); - // XdsClient should have sent a subscription request on the ADS stream. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2"}); - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("B") - .AddFooResource(XdsFooResource{"foo2", 7}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo2"); - EXPECT_EQ(resource->value, 7); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2"}); - // Cancel watch for "foo1". - CancelFooWatch(watcher.get(), "foo1"); - // XdsClient should send an unsubscription request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"}); - // Now cancel watch for "foo2". - CancelFooWatch(watcher2.get(), "foo2"); - // The XdsClient may or may not send another unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Start a watch for "foo2". - auto watcher2 = StartFooWatch("foo2"); - // XdsClient should have sent a subscription request on the ADS stream. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2"}); - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("B") - .AddFooResource(XdsFooResource{"foo2", 7}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo2"); - EXPECT_EQ(resource->value, 7); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2"}); - // Server sends an update for "foo1". The response does not contain "foo2". - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("2") - .set_nonce("C") - .AddFooResource(XdsFooResource{"foo1", 9}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 9); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"C", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2"}); - // Cancel watch for "foo1". - CancelFooWatch(watcher.get(), "foo1"); - // XdsClient should send an unsubscription request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"C", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"}); - // Now cancel watch for "foo2". - CancelFooWatch(watcher2.get(), "foo2"); - // The XdsClient may or may not send another unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"C", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, ResourceValidationFailure) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response containing an invalid resource. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddInvalidResource(XdsFooResourceType::Get()->type_url(), - "{\"name\":\"foo1\",\"value\":[]}") - .Serialize()); - // XdsClient should deliver an error to the watcher. - auto error = watcher->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "invalid resource: INVALID_ARGUMENT: errors validating JSON: " - "[field:value error:is not a number] (node ID:xds_client_test)") - << *error; - // XdsClient should NACK the update. - // Note that version_info is not populated in the request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest( - *request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"A", - /*error_detail=*/ - absl::InvalidArgumentError( - "xDS response validation errors: [" - "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: " - "[field:value error:is not a number]]"), - /*resource_names=*/{"foo1"}); - // Start a second watch for the same resource. It should immediately - // receive the same error. - auto watcher2 = StartFooWatch("foo1"); - error = watcher2->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "invalid resource: INVALID_ARGUMENT: errors validating JSON: " - "[field:value error:is not a number] (node ID:xds_client_test)") - << *error; - // Now server sends an updated version of the resource. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("2") - .set_nonce("B") - .AddFooResource(XdsFooResource{"foo1", 9}) - .Serialize()); - // XdsClient should deliver the response to both watchers. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 9); - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 9); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Cancel watch. - CancelFooWatch(watcher.get(), "foo1"); - CancelFooWatch(watcher2.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Before the server responds, add a watch for another resource. - auto watcher2 = StartFooWatch("foo2"); - // Client should send another request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2"}); - // Add a watch for a third resource. - auto watcher3 = StartFooWatch("foo3"); - // Client should send another request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2", "foo3"}); - // Add a watch for a fourth resource. - auto watcher4 = StartFooWatch("foo4"); - // Client should send another request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"}); - // Server sends a response containing three invalid resources and one - // valid resource. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - // foo1: JSON parsing succeeds, so we know the resource name, - // but validation fails. - .AddInvalidResource(XdsFooResourceType::Get()->type_url(), - "{\"name\":\"foo1\",\"value\":[]}") - // foo2: JSON parsing fails, and not wrapped in a Resource - // wrapper, so we don't actually know the resource's name. - .AddInvalidResource(XdsFooResourceType::Get()->type_url(), - "{\"name\":\"foo2,\"value\":6}") - // foo3: JSON parsing fails, but it is wrapped in a Resource - // wrapper, so we do know the resource's name. - .AddInvalidResource(XdsFooResourceType::Get()->type_url(), - "{\"name\":\"foo3,\"value\":6}", - /*resource_wrapper_name=*/"foo3") - // foo4: valid resource. - .AddFooResource(XdsFooResource{"foo4", 5}) - .Serialize()); - // XdsClient should deliver an error to the watchers for foo1 and foo3. - auto error = watcher->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "invalid resource: INVALID_ARGUMENT: errors validating JSON: " - "[field:value error:is not a number] (node ID:xds_client_test)") - << *error; - error = watcher3->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "invalid resource: INVALID_ARGUMENT: JSON parsing failed: " - "[JSON parse error at index 15] (node ID:xds_client_test)") - << *error; - // It cannot delivery an error for foo2, because the client doesn't know - // that that resource in the response was actually supposed to be foo2. - EXPECT_FALSE(watcher2->HasEvent()); - // It will delivery a valid resource update for foo4. - auto resource = watcher4->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo4"); - EXPECT_EQ(resource->value, 5); - // XdsClient should NACK the update. - // There was one good resource, so the version will be updated. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/ - absl::InvalidArgumentError( - "xDS response validation errors: [" - // foo1 - "resource index 0: foo1: " - "INVALID_ARGUMENT: errors validating JSON: " - "[field:value error:is not a number]; " - // foo2 (name not known) - "resource index 1: INVALID_ARGUMENT: JSON parsing failed: " - "[JSON parse error at index 15]; " - // foo3 - "resource index 2: foo3: " - "INVALID_ARGUMENT: JSON parsing failed: " - "[JSON parse error at index 15]]"), - /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"}); - // Cancel watches. - CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); - CancelFooWatch(watcher.get(), "foo2", /*delay_unsubscription=*/true); - CancelFooWatch(watcher.get(), "foo3", /*delay_unsubscription=*/true); - CancelFooWatch(watcher.get(), "foo4"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Send an update containing an invalid resource. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("2") - .set_nonce("B") - .AddInvalidResource(XdsFooResourceType::Get()->type_url(), - "{\"name\":\"foo1\",\"value\":[]}") - .Serialize()); - // XdsClient should deliver an error to the watcher. - auto error = watcher->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "invalid resource: INVALID_ARGUMENT: errors validating JSON: " - "[field:value error:is not a number] (node ID:xds_client_test)") - << *error; - // XdsClient should NACK the update. - // Note that version_info is set to the previous version in this request, - // because there were no valid resources in it. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest( - *request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"B", - /*error_detail=*/ - absl::InvalidArgumentError( - "xDS response validation errors: [" - "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: " - "[field:value error:is not a number]]"), - /*resource_names=*/{"foo1"}); - // Start a second watcher for the same resource. Even though the last - // update was a NACK, we should still deliver the cached resource to - // the watcher. - // TODO(roth): Consider what the right behavior is here. It seems - // inconsistent that the watcher sees the error if it had started - // before the error was seen but does not if it was started afterwards. - // One option is to not send errors at all for already-cached resources; - // another option is to send the errors even for newly started watchers. - auto watcher2 = StartFooWatch("foo1"); - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // Cancel watches. - CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); - CancelFooWatch(watcher2.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, ResourceDoesNotExist) { - InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Do not send a response, but wait for the resource to be reported as - // not existing. - EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5))); - // Start a new watcher for the same resource. It should immediately - // receive the same does-not-exist notification. - auto watcher2 = StartFooWatch("foo1"); - EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1))); - // Now server sends a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watchers. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Cancel watch. - CancelFooWatch(watcher.get(), "foo1"); - CancelFooWatch(watcher2.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, StreamClosedByServer) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Server sends a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Now server closes the stream. - stream->MaybeSendStatusToClient(absl::OkStatus()); - // XdsClient should report error to watcher. - auto error = watcher->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "xDS channel for server default_xds_server: xDS call failed; " - "status: OK (node ID:xds_client_test)") - << *error; - // XdsClient should create a new stream. - stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient sends a subscription request. - // Note that the version persists from the previous stream, but the - // nonce does not. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Before the server resends the resource, start a new watcher for the - // same resource. This watcher should immediately receive the cached - // resource and then the error notification -- in that order. - auto watcher2 = StartFooWatch("foo1"); - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - error = watcher2->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "xDS channel for server default_xds_server: xDS call failed; " - "status: OK (node ID:xds_client_test)") - << *error; - // Create a watcher for a new resource. This should immediately - // receive the cached channel error. - auto watcher3 = StartFooWatch("foo2"); - error = watcher3->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "xDS channel for server default_xds_server: xDS call failed; " - "status: OK (node ID:xds_client_test)") - << *error; - // And the client will send a new request subscribing to the new resource. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2"}); - // Server now sends the requested resources. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("B") - .AddFooResource(XdsFooResource{"foo1", 6}) - .AddFooResource(XdsFooResource{"foo2", 7}) - .Serialize()); - // Watchers for foo1 do NOT get an update, since the resource has not changed. - EXPECT_FALSE(watcher->WaitForNextResource()); - EXPECT_FALSE(watcher2->WaitForNextResource()); - // The watcher for foo2 gets the newly delivered resource. - resource = watcher3->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo2"); - EXPECT_EQ(resource->value, 7); - // XdsClient sends an ACK. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2"}); - // Cancel watchers. - CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); - CancelFooWatch(watcher2.get(), "foo1", /*delay_unsubscription=*/true); - CancelFooWatch(watcher3.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, StreamClosedByServerAndResourcesNotResentOnNewStream) { - // Lower resources-does-not-exist timeout, to make sure that we're not - // triggering that here. - InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Server sends a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Now server closes the stream. - stream->MaybeSendStatusToClient(absl::OkStatus()); - // XdsClient should report error to watcher. - auto error = watcher->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "xDS channel for server default_xds_server: xDS call failed; " - "status: OK (node ID:xds_client_test)") - << *error; - // XdsClient should create a new stream. - stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient sends a subscription request. - // Note that the version persists from the previous stream, but the - // nonce does not. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Server does NOT send the resource again. - // Watcher should not get any update, since the resource has not changed. - // We wait 5s here to ensure we don't trigger the resource-does-not-exist - // timeout. - EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5))); - // Cancel watch. - CancelFooWatch(watcher.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, ConnectionFails) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Transport reports connection failure. - TriggerConnectionFailure(xds_client_->bootstrap().server(), - absl::UnavailableError("connection failed")); - // XdsClient should report an error to the watcher. - auto error = watcher->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "xDS channel for server default_xds_server: " - "connection failed (node ID:xds_client_test)") - << *error; - // Start a new watch. This watcher should be given the same error, - // since we have not yet recovered. - auto watcher2 = StartFooWatch("foo1"); - error = watcher2->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "xDS channel for server default_xds_server: " - "connection failed (node ID:xds_client_test)") - << *error; - // Inside the XdsTransport interface, the channel will eventually - // reconnect, and the call will proceed. None of that will be visible - // to the XdsClient, because the call uses wait_for_ready. So here, - // to simulate the connection being established, all we need to do is - // allow the stream to proceed. - // Server sends a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watchers. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Cancel watch. - CancelFooWatch(watcher.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, ResourceWrappedInResourceMessage) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response with the resource wrapped in a Resource message. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}, - /*in_resource_wrapper=*/true) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Cancel watch. - CancelFooWatch(watcher.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, MultipleResourceTypes) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Start a watch for "bar1". - auto watcher2 = StartBarWatch("bar1"); - // XdsClient should have sent a subscription request on the ADS stream. - // Note that version and nonce here do NOT use the values for Foo, - // since each resource type has its own state. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsBarResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"bar1"}); - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsBarResourceType::Get()->type_url()) - .set_version_info("2") - .set_nonce("B") - .AddBarResource(XdsBarResource{"bar1", "whee"}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource2 = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource2.has_value()); - EXPECT_EQ(resource2->name, "bar1"); - EXPECT_EQ(resource2->value, "whee"); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsBarResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"bar1"}); - // Cancel watch for "foo1". - CancelFooWatch(watcher.get(), "foo1"); - // XdsClient should send an unsubscription request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - // Now cancel watch for "bar1". - CancelBarWatch(watcher2.get(), "bar1"); - // The XdsClient may or may not send another unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsBarResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, BasicWatchV2) { - InitXdsClient(FakeXdsBootstrap::Builder().set_use_v2()); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = transport_factory_->WaitForStream( - xds_client_->bootstrap().server(), FakeXdsTransportFactory::kAdsV2Method, - absl::Seconds(5) * grpc_test_slowdown_factor()); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->v2_type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Node Should be present on the first request. - CheckRequestNode(*request, /*check_build_version=*/true); - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->v2_type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->v2_type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Cancel watch. - CancelFooWatch(watcher.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->v2_type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, Federation) { - ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION"); - constexpr char kAuthority[] = "xds.example.com"; - const std::string kXdstpResourceName = absl::StrCat( - "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); - FakeXdsBootstrap::FakeXdsServer authority_server; - authority_server.set_server_uri("other_xds_server"); - FakeXdsBootstrap::FakeAuthority authority; - authority.set_server(authority_server); - InitXdsClient( - FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority)); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream to the top-level xDS server. - auto stream = WaitForAdsStream(xds_client_->bootstrap().server()); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Start a watch for the xdstp resource name. - auto watcher2 = StartFooWatch(kXdstpResourceName); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher2->HasEvent()); - // XdsClient will create a new stream to the server for this authority. - auto stream2 = WaitForAdsStream(authority_server); - ASSERT_TRUE(stream2 != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - // Note that version and nonce here do NOT use the values for Foo, - // since each authority has its own state. - request = WaitForRequest(stream2.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{kXdstpResourceName}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream2->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("2") - .set_nonce("B") - .AddFooResource(XdsFooResource{kXdstpResourceName, 3}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, kXdstpResourceName); - EXPECT_EQ(resource->value, 3); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream2.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{kXdstpResourceName}); - // Cancel watch for "foo1". - CancelFooWatch(watcher.get(), "foo1"); - // XdsClient should send an unsubscription request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - // Now cancel watch for xdstp resource name. - CancelFooWatch(watcher2.get(), kXdstpResourceName); - // The XdsClient may or may not send the unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream2.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, FederationAuthorityDefaultsToTopLevelXdsServer) { - ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION"); - constexpr char kAuthority[] = "xds.example.com"; - const std::string kXdstpResourceName = absl::StrCat( - "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); - // Authority does not specify any xDS servers, so XdsClient will use - // the top-level xDS server in the bootstrap config for this authority. - InitXdsClient(FakeXdsBootstrap::Builder().AddAuthority( - kAuthority, FakeXdsBootstrap::FakeAuthority())); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream to the top-level xDS server. - auto stream = WaitForAdsStream(xds_client_->bootstrap().server()); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Start a watch for the xdstp resource name. - auto watcher2 = StartFooWatch(kXdstpResourceName); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher2->HasEvent()); - // XdsClient will send a subscription request on the ADS stream that - // includes both resources, since both are being obtained from the - // same server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", kXdstpResourceName}); - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("2") - .set_nonce("B") - .AddFooResource(XdsFooResource{kXdstpResourceName, 3}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, kXdstpResourceName); - EXPECT_EQ(resource->value, 3); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", kXdstpResourceName}); - // Cancel watch for "foo1". - CancelFooWatch(watcher.get(), "foo1"); - // XdsClient should send an unsubscription request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{kXdstpResourceName}); - // Now cancel watch for xdstp resource name. - CancelFooWatch(watcher2.get(), kXdstpResourceName); - // The XdsClient may or may not send the unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, FederationWithUnknownAuthority) { - ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION"); - constexpr char kAuthority[] = "xds.example.com"; - const std::string kXdstpResourceName = absl::StrCat( - "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); - // Note: Not adding authority to bootstrap config. - InitXdsClient(); - // Start a watch for the xdstp resource name. - auto watcher = StartFooWatch(kXdstpResourceName); - // Watcher should immediately get an error about the unknown authority. - auto error = watcher->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "authority \"xds.example.com\" not present in bootstrap config") - << *error; -} - -TEST_F(XdsClientTest, FederationWithUnparseableXdstpResourceName) { - ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION"); - // Note: Not adding authority to bootstrap config. - InitXdsClient(); - // Start a watch for the xdstp resource name. - auto watcher = StartFooWatch("xdstp://x"); - // Watcher should immediately get an error about the unknown authority. - auto error = watcher->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), "Unable to parse resource name xdstp://x") - << *error; -} - -TEST_F(XdsClientTest, FederationDisabledWithNewStyleName) { - // We will use this xdstp name, whose authority is not present in - // the bootstrap config. But since federation is not enabled, we - // will treat this as an opaque old-style name, so we'll send it to - // the default server. - constexpr char kXdstpResourceName[] = - "xdstp://xds.example.com/test.v3.foo/foo1"; - InitXdsClient(); - // Start a watch for the xdstp name. - auto watcher = StartFooWatch(kXdstpResourceName); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{kXdstpResourceName}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{kXdstpResourceName, 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, kXdstpResourceName); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{kXdstpResourceName}); - // Cancel watch. - CancelFooWatch(watcher.get(), kXdstpResourceName); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) { - ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION"); - constexpr char kAuthority[] = "xds.example.com"; - const std::string kXdstpResourceName = absl::StrCat( - "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); - FakeXdsBootstrap::FakeXdsServer authority_server; - authority_server.set_server_uri("other_xds_server"); - FakeXdsBootstrap::FakeAuthority authority; - authority.set_server(authority_server); - InitXdsClient( - FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority)); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream to the top-level xDS server. - auto stream = WaitForAdsStream(xds_client_->bootstrap().server()); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource{"foo1", 6}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Start a watch for the xdstp resource name. - auto watcher2 = StartFooWatch(kXdstpResourceName); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher2->HasEvent()); - // XdsClient will create a new stream to the server for this authority. - auto stream2 = WaitForAdsStream(authority_server); - ASSERT_TRUE(stream2 != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - // Note that version and nonce here do NOT use the values for Foo, - // since each authority has its own state. - request = WaitForRequest(stream2.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{kXdstpResourceName}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream2->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("2") - .set_nonce("B") - .AddFooResource(XdsFooResource{kXdstpResourceName, 3}) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, kXdstpResourceName); - EXPECT_EQ(resource->value, 3); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream2.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{kXdstpResourceName}); - // Now cause a channel failure on the stream to the authority's xDS server. - TriggerConnectionFailure(authority_server, - absl::UnavailableError("connection failed")); - // The watcher for the xdstp resource name should see the error. - auto error = watcher2->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "xDS channel for server other_xds_server: connection failed " - "(node ID:xds_client_test)") - << *error; - // The watcher for "foo1" should not see any error. - EXPECT_FALSE(watcher->HasEvent()); - // Cancel watch for "foo1". - CancelFooWatch(watcher.get(), "foo1"); - // XdsClient should send an unsubscription request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - // Now cancel watch for xdstp resource name. - CancelFooWatch(watcher2.get(), kXdstpResourceName); - // The XdsClient may or may not send the unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream2.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -} // namespace -} // namespace testing -} // namespace grpc_core - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - grpc::testing::TestEnvironment env(&argc, argv); - grpc_init(); - int ret = RUN_ALL_TESTS(); - grpc_shutdown(); - return ret; -} diff --git a/test/core/xds/xds_transport_fake.cc b/test/core/xds/xds_transport_fake.cc deleted file mode 100644 index 12b57134545..00000000000 --- a/test/core/xds/xds_transport_fake.cc +++ /dev/null @@ -1,230 +0,0 @@ -// -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#include - -#include "test/core/xds/xds_transport_fake.h" - -#include -#include -#include - -#include - -#include "src/core/ext/xds/xds_bootstrap.h" -#include "src/core/lib/event_engine/default_event_engine.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "test/core/util/test_config.h" - -using grpc_event_engine::experimental::GetDefaultEventEngine; - -namespace grpc_core { - -// -// FakeXdsTransportFactory::FakeStreamingCall -// - -void FakeXdsTransportFactory::FakeStreamingCall::Orphan() { - { - MutexLock lock(&mu_); - // Can't call event_handler_->OnStatusReceived() or unref event_handler_ - // synchronously, since those operations will trigger code in - // XdsClient that acquires its mutex, but it was already holding its - // mutex when it called us, so it would deadlock. - GetDefaultEventEngine()->Run([event_handler = std::move(event_handler_), - status_sent = status_sent_]() mutable { - ExecCtx exec_ctx; - if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus()); - event_handler.reset(); - }); - status_sent_ = true; - } - transport_->RemoveStream(method_, this); - Unref(); -} - -void FakeXdsTransportFactory::FakeStreamingCall::SendMessage( - std::string payload) { - MutexLock lock(&mu_); - from_client_messages_.push_back(std::move(payload)); - cv_.Signal(); - // Can't call event_handler_->OnRequestSent() synchronously, since that - // operation will trigger code in XdsClient that acquires its mutex, but it - // was already holding its mutex when it called us, so it would deadlock. - GetDefaultEventEngine()->Run( - [event_handler = event_handler_->Ref()]() mutable { - ExecCtx exec_ctx; - event_handler->OnRequestSent(/*ok=*/true); - event_handler.reset(); - }); -} - -bool FakeXdsTransportFactory::FakeStreamingCall::HaveMessageFromClient() { - MutexLock lock(&mu_); - return !from_client_messages_.empty(); -} - -absl::optional -FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient( - absl::Duration timeout) { - MutexLock lock(&mu_); - while (from_client_messages_.empty()) { - if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) { - return absl::nullopt; - } - } - std::string payload = from_client_messages_.front(); - from_client_messages_.pop_front(); - return payload; -} - -void FakeXdsTransportFactory::FakeStreamingCall::SendMessageToClient( - absl::string_view payload) { - ExecCtx exec_ctx; - RefCountedPtr event_handler; - { - MutexLock lock(&mu_); - event_handler = event_handler_->Ref(); - } - event_handler->OnRecvMessage(payload); -} - -void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient( - absl::Status status) { - ExecCtx exec_ctx; - RefCountedPtr event_handler; - { - MutexLock lock(&mu_); - if (status_sent_) return; - status_sent_ = true; - event_handler = event_handler_->Ref(); - } - event_handler->OnStatusReceived(std::move(status)); -} - -// -// FakeXdsTransportFactory::FakeXdsTransport -// - -void FakeXdsTransportFactory::FakeXdsTransport::TriggerConnectionFailure( - absl::Status status) { - RefCountedPtr on_connectivity_failure; - { - MutexLock lock(&mu_); - on_connectivity_failure = on_connectivity_failure_->Ref(); - } - ExecCtx exec_ctx; - if (on_connectivity_failure != nullptr) { - on_connectivity_failure->Run(std::move(status)); - } -} - -void FakeXdsTransportFactory::FakeXdsTransport::Orphan() { - { - MutexLock lock(&mu_); - // Can't destroy on_connectivity_failure_ synchronously, since that - // operation will trigger code in XdsClient that acquires its mutex, but - // it was already holding its mutex when it called us, so it would deadlock. - GetDefaultEventEngine()->Run([on_connectivity_failure = std::move( - on_connectivity_failure_)]() mutable { - ExecCtx exec_ctx; - on_connectivity_failure.reset(); - }); - } - Unref(); -} - -RefCountedPtr -FakeXdsTransportFactory::FakeXdsTransport::WaitForStream( - const char* method, absl::Duration timeout) { - MutexLock lock(&mu_); - auto it = active_calls_.find(method); - while (it == active_calls_.end() || it->second == nullptr) { - if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) { - return nullptr; - } - it = active_calls_.find(method); - } - return it->second; -} - -void FakeXdsTransportFactory::FakeXdsTransport::RemoveStream( - const char* method, FakeStreamingCall* call) { - MutexLock lock(&mu_); - auto it = active_calls_.find(method); - if (it != active_calls_.end() && it->second.get() == call) { - active_calls_.erase(it); - } -} - -OrphanablePtr -FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall( - const char* method, - std::unique_ptr event_handler) { - auto call = MakeOrphanable(Ref(), method, - std::move(event_handler)); - MutexLock lock(&mu_); - active_calls_[method] = call->Ref(); - cv_.Signal(); - return call; -} - -// -// FakeXdsTransportFactory -// - -constexpr char FakeXdsTransportFactory::kAdsMethod[]; -constexpr char FakeXdsTransportFactory::kAdsV2Method[]; - -OrphanablePtr -FakeXdsTransportFactory::Create( - const XdsBootstrap::XdsServer& server, - std::function on_connectivity_failure, - absl::Status* /*status*/) { - auto transport = - MakeOrphanable(std::move(on_connectivity_failure)); - MutexLock lock(&mu_); - auto& entry = transport_map_[&server]; - GPR_ASSERT(entry == nullptr); - entry = transport->Ref(); - return transport; -} - -void FakeXdsTransportFactory::TriggerConnectionFailure( - const XdsBootstrap::XdsServer& server, absl::Status status) { - auto transport = GetTransport(server); - transport->TriggerConnectionFailure(std::move(status)); -} - -RefCountedPtr -FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server, - const char* method, - absl::Duration timeout) { - auto transport = GetTransport(server); - return transport->WaitForStream(method, timeout); -} - -RefCountedPtr -FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server) { - MutexLock lock(&mu_); - RefCountedPtr transport = transport_map_[&server]; - GPR_ASSERT(transport != nullptr); - return transport; -} - -} // namespace grpc_core diff --git a/test/core/xds/xds_transport_fake.h b/test/core/xds/xds_transport_fake.h deleted file mode 100644 index ed0d746c90e..00000000000 --- a/test/core/xds/xds_transport_fake.h +++ /dev/null @@ -1,180 +0,0 @@ -// -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#ifndef GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H -#define GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H - -#include - -#include -#include -#include -#include - -#include "absl/status/status.h" -#include "absl/types/optional.h" - -#include "src/core/ext/xds/xds_bootstrap.h" -#include "src/core/ext/xds/xds_transport.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/sync.h" - -namespace grpc_core { - -class FakeXdsTransportFactory : public XdsTransportFactory { - private: - class FakeXdsTransport; - - public: - static constexpr char kAdsMethod[] = - "/envoy.service.discovery.v3.AggregatedDiscoveryService/" - "StreamAggregatedResources"; - static constexpr char kAdsV2Method[] = - "/envoy.service.discovery.v2.AggregatedDiscoveryService/" - "StreamAggregatedResources"; - - class FakeStreamingCall : public XdsTransport::StreamingCall { - public: - FakeStreamingCall( - RefCountedPtr transport, const char* method, - std::unique_ptr event_handler) - : transport_(std::move(transport)), - method_(method), - event_handler_(MakeRefCounted( - std::move(event_handler))) {} - - void Orphan() override; - - using StreamingCall::Ref; // Make it public. - - bool HaveMessageFromClient(); - absl::optional WaitForMessageFromClient( - absl::Duration timeout); - - void SendMessageToClient(absl::string_view payload); - void MaybeSendStatusToClient(absl::Status status); - - private: - class RefCountedEventHandler : public RefCounted { - public: - explicit RefCountedEventHandler( - std::unique_ptr event_handler) - : event_handler_(std::move(event_handler)) {} - - void OnRequestSent(bool ok) { event_handler_->OnRequestSent(ok); } - void OnRecvMessage(absl::string_view payload) { - event_handler_->OnRecvMessage(payload); - } - void OnStatusReceived(absl::Status status) { - event_handler_->OnStatusReceived(std::move(status)); - } - - private: - std::unique_ptr event_handler_; - }; - - void SendMessage(std::string payload) override; - - RefCountedPtr transport_; - const char* method_; - - Mutex mu_; - CondVar cv_; - RefCountedPtr event_handler_ ABSL_GUARDED_BY(&mu_); - std::deque from_client_messages_ ABSL_GUARDED_BY(&mu_); - bool status_sent_ ABSL_GUARDED_BY(&mu_) = false; - }; - - FakeXdsTransportFactory() = default; - - using XdsTransportFactory::Ref; // Make it public. - - void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server, - absl::Status status); - - RefCountedPtr WaitForStream( - const XdsBootstrap::XdsServer& server, const char* method, - absl::Duration timeout); - - void Orphan() override { Unref(); } - - private: - class FakeXdsTransport : public XdsTransport { - public: - explicit FakeXdsTransport( - std::function on_connectivity_failure) - : on_connectivity_failure_( - MakeRefCounted( - std::move(on_connectivity_failure))) {} - - void Orphan() override; - - using XdsTransport::Ref; // Make it public. - - void TriggerConnectionFailure(absl::Status status); - - RefCountedPtr WaitForStream(const char* method, - absl::Duration timeout); - - void RemoveStream(const char* method, FakeStreamingCall* call); - - private: - class RefCountedOnConnectivityFailure - : public RefCounted { - public: - explicit RefCountedOnConnectivityFailure( - std::function on_connectivity_failure) - : on_connectivity_failure_(std::move(on_connectivity_failure)) {} - - void Run(absl::Status status) { - on_connectivity_failure_(std::move(status)); - } - - private: - std::function on_connectivity_failure_; - }; - - OrphanablePtr CreateStreamingCall( - const char* method, - std::unique_ptr event_handler) override; - - void ResetBackoff() override {} - - Mutex mu_; - CondVar cv_; - RefCountedPtr on_connectivity_failure_ - ABSL_GUARDED_BY(&mu_); - std::map> - active_calls_ ABSL_GUARDED_BY(&mu_); - }; - - OrphanablePtr Create( - const XdsBootstrap::XdsServer& server, - std::function on_connectivity_failure, - absl::Status* status) override; - - RefCountedPtr GetTransport( - const XdsBootstrap::XdsServer& server); - - Mutex mu_; - std::map> - transport_map_ ABSL_GUARDED_BY(&mu_); -}; - -} // namespace grpc_core - -#endif // GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index 16cfa1369b1..868e0b6966f 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -87,10 +87,10 @@ TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) { const auto response_state = WaitForCdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK); ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_EQ(response_state->error_message, - "xDS response validation errors: [resource index 0: cluster_name: " - "INVALID_ARGUMENT: errors parsing CDS resource: [" - "DiscoveryType is not valid.]]"); + EXPECT_THAT(response_state->error_message, + ::testing::ContainsRegex(absl::StrCat( + kDefaultClusterName, + ": validation error.*DiscoveryType is not valid"))); CheckRpcSendOk(DEBUG_LOCATION); } diff --git a/test/cpp/end2end/xds/xds_core_end2end_test.cc b/test/cpp/end2end/xds/xds_core_end2end_test.cc index 6dadb621130..38cc2bc7c79 100644 --- a/test/cpp/end2end/xds/xds_core_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_core_end2end_test.cc @@ -182,18 +182,19 @@ TEST_P(XdsClientTest, MultipleBadCdsResources) { ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; EXPECT_EQ( response_state->error_message, - absl::StrCat("xDS response validation errors: [" - "resource index 0: Can't decode Resource proto wrapper; ", - "resource index 1: foo: " - "INVALID_ARGUMENT: Can't parse Cluster resource.; " - "resource index 3: ", - kClusterName2, - ": INVALID_ARGUMENT: errors parsing CDS resource: " - "[DiscoveryType is not valid.]; " - "resource index 4: ", - kClusterName3, - ": INVALID_ARGUMENT: errors parsing CDS resource: " - "[DiscoveryType is not valid.]]")); + absl::StrCat( + "xDS response validation errors: [" + "resource index 0: Can't decode Resource proto wrapper; ", + "resource index 1: foo: " + "INVALID_ARGUMENT: Can't parse Cluster resource.; " + "resource index 3: ", + kClusterName2, + ": validation error: INVALID_ARGUMENT: errors parsing CDS resource: " + "[DiscoveryType is not valid.]; " + "resource index 4: ", + kClusterName3, + ": validation error: INVALID_ARGUMENT: errors parsing CDS resource: " + "[DiscoveryType is not valid.]]")); // RPCs for default cluster should succeed. std::vector> metadata_default_cluster = { {"cluster", kDefaultClusterName}, @@ -311,10 +312,11 @@ TEST_P(GlobalXdsClientTest, MultipleBadLdsResources) { balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); const auto response_state = WaitForLdsNack(DEBUG_LOCATION); ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_EQ(response_state->error_message, - "xDS response validation errors: [" - "resource index 0: server.example.com: " - "INVALID_ARGUMENT: Listener has neither address nor ApiListener]"); + EXPECT_THAT(response_state->error_message, + ::testing::ContainsRegex(absl::StrCat( + kServerName, + ": validation error.*" + "Listener has neither address nor ApiListener.*"))); // Need to create a second channel to subscribe to a second LDS resource. auto channel2 = CreateChannel(0, kServerName2); auto stub2 = grpc::testing::EchoTestService::NewStub(channel2); @@ -328,13 +330,16 @@ TEST_P(GlobalXdsClientTest, MultipleBadLdsResources) { // Wait for second NACK to be reported to xDS server. const auto response_state = WaitForLdsNack(DEBUG_LOCATION); ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_EQ( - response_state->error_message, - "xDS response validation errors: [" - "resource index 0: server.other.com: " - "INVALID_ARGUMENT: Listener has neither address nor ApiListener; " - "resource index 1: server.example.com: " - "INVALID_ARGUMENT: Listener has neither address nor ApiListener]"); + EXPECT_THAT(response_state->error_message, + ::testing::ContainsRegex(absl::StrCat( + kServerName, + ": validation error.*" + "Listener has neither address nor ApiListener.*"))); + EXPECT_THAT(response_state->error_message, + ::testing::ContainsRegex(absl::StrCat( + kServerName2, + ": validation error.*" + "Listener has neither address nor ApiListener.*"))); } // Now start a new channel with a third server name, this one with a // valid resource. @@ -366,10 +371,11 @@ TEST_P(GlobalXdsClientTest, InvalidListenerStillExistsIfPreviouslyCached) { const auto response_state = WaitForLdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK); ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_EQ(response_state->error_message, - "xDS response validation errors: [" - "resource index 0: server.example.com: " - "INVALID_ARGUMENT: Listener has neither address nor ApiListener]"); + EXPECT_THAT(response_state->error_message, + ::testing::ContainsRegex(absl::StrCat( + kServerName, + ": validation error.*" + "Listener has neither address nor ApiListener"))); CheckRpcSendOk(DEBUG_LOCATION); } diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 57f30f9ed27..cc19dea0b7f 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -7891,30 +7891,6 @@ ], "uses_polling": true }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": true, - "language": "c++", - "name": "xds_client_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "uses_polling": false - }, { "args": [], "benchmark": false,