diff --git a/CMakeLists.txt b/CMakeLists.txt index 819520a408d..1e750b36992 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1245,6 +1245,7 @@ if(gRPC_BUILD_TESTS) endif() add_dependencies(buildtests_cxx xds_bootstrap_test) add_dependencies(buildtests_cxx xds_certificate_provider_test) + add_dependencies(buildtests_cxx xds_client_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx xds_cluster_end2end_test) endif() @@ -19847,6 +19848,54 @@ target_link_libraries(xds_certificate_provider_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(xds_client_test + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h + test/core/xds/xds_client_test.cc + test/core/xds/xds_transport_fake.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(xds_client_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(xds_client_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 37dff643a26..26a526072be 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -10833,6 +10833,21 @@ targets: - test/core/xds/xds_certificate_provider_test.cc deps: - grpc_test_util +- name: xds_client_test + gtest: true + build: test + language: c++ + headers: + - test/core/xds/xds_transport_fake.h + src: + - src/proto/grpc/testing/xds/v3/base.proto + - src/proto/grpc/testing/xds/v3/discovery.proto + - src/proto/grpc/testing/xds/v3/percent.proto + - test/core/xds/xds_client_test.cc + - test/core/xds/xds_transport_fake.cc + deps: + - grpc_test_util + uses_polling: false - name: xds_cluster_end2end_test gtest: true build: test diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 030574d5140..3f3bc8e33fd 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -433,11 +433,11 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr xds_client, server, [self = WeakRef(DEBUG_LOCATION, "OnConnectivityFailure")]( absl::Status status) { - self->OnConnectivityStateChange(std::move(status)); + self->OnConnectivityFailure(std::move(status)); }, &status); GPR_ASSERT(transport_ != nullptr); - if (!status.ok()) OnConnectivityStateChangeLocked(std::move(status)); + if (!status.ok()) SetChannelStatusLocked(std::move(status)); } XdsClient::ChannelState::~ChannelState() { @@ -475,10 +475,6 @@ XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld() return lrs_calld_->calld(); } -bool XdsClient::ChannelState::HasActiveAdsCall() const { - return ads_calld_ != nullptr && ads_calld_->calld() != nullptr; -} - void XdsClient::ChannelState::MaybeStartLrsCall() { if (lrs_calld_ != nullptr) return; lrs_calld_.reset(new RetryableCall( @@ -522,27 +518,54 @@ void XdsClient::ChannelState::UnsubscribeLocked(const XdsResourceType* type, } } -void XdsClient::ChannelState::OnConnectivityStateChange(absl::Status status) { +void XdsClient::ChannelState::OnConnectivityFailure(absl::Status status) { { MutexLock lock(&xds_client_->mu_); - OnConnectivityStateChangeLocked(std::move(status)); + SetChannelStatusLocked(std::move(status)); } xds_client_->work_serializer_.DrainQueue(); } -void XdsClient::ChannelState::OnConnectivityStateChangeLocked( - absl::Status status) { - if (!shutting_down_) { - // Notify all watchers of error. - gpr_log(GPR_INFO, - "[xds_client %p] xds channel for server %s in " - "state TRANSIENT_FAILURE: %s", - xds_client(), server_.server_uri().c_str(), - status.ToString().c_str()); - xds_client_->NotifyOnErrorLocked(absl::UnavailableError( - absl::StrCat("xds channel in TRANSIENT_FAILURE, connectivity error: ", - status.ToString()))); +void XdsClient::ChannelState::SetChannelStatusLocked(absl::Status status) { + if (shutting_down_) return; + status = absl::Status(status.code(), absl::StrCat("xDS channel for server ", + server_.server_uri(), ": ", + status.message())); + gpr_log(GPR_INFO, "[xds_client %p] %s", xds_client(), + status.ToString().c_str()); + // If the node ID is set, append that to the status message that we send to + // the watchers, so that it will appear in log messages visible to users. + const auto* node = xds_client_->bootstrap_->node(); + if (node != nullptr) { + status = absl::Status( + status.code(), + absl::StrCat(status.message(), + " (node ID:", xds_client_->bootstrap_->node()->id(), ")")); } + // Save status in channel, so that we can immediately generate an + // error for any new watchers that may be started. + status_ = status; + // Find all watchers for this channel. + std::set> watchers; + for (const auto& a : xds_client_->authority_state_map_) { // authority + if (a.second.channel_state != this) continue; + for (const auto& t : a.second.resource_map) { // type + for (const auto& r : t.second) { // resource id + for (const auto& w : r.second.watchers) { // watchers + watchers.insert(w.second); + } + } + } + } + // Enqueue notification for the watchers. + xds_client_->work_serializer_.Schedule( + [watchers = std::move(watchers), status = std::move(status)]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(xds_client_->work_serializer_) { + for (const auto& watcher : watchers) { + watcher->OnError(status); + } + }, + DEBUG_LOCATION); } // @@ -709,19 +732,32 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( XdsResourceType::DecodeContext context = { xds_client(), ads_call_state_->chand()->server_, &grpc_xds_client_trace, xds_client()->symtab_.ptr(), arena}; - absl::StatusOr result = + absl::StatusOr decode_result = result_.type->Decode(context, serialized_resource, is_v2); - if (!result.ok()) { + // If we didn't already have the resource name from the Resource + // wrapper, try to get it from the decoding result. + if (resource_name.empty()) { + if (decode_result.ok()) { + resource_name = decode_result->name; + error_prefix = + absl::StrCat("resource index ", idx, ": ", resource_name, ": "); + } else { + // We don't have any way of determining the resource name, so + // there's nothing more we can do here. + result_.errors.emplace_back( + absl::StrCat(error_prefix, decode_result.status().ToString())); + return; + } + } + // If decoding failed, make sure we include the error in the NACK. + const absl::Status& decode_status = decode_result.ok() + ? decode_result->resource.status() + : decode_result.status(); + if (!decode_status.ok()) { result_.errors.emplace_back( - absl::StrCat(error_prefix, result.status().ToString())); - return; + absl::StrCat(error_prefix, decode_status.ToString())); } // Check the resource name. - if (resource_name.empty()) { - resource_name = result->name; - error_prefix = - absl::StrCat("resource index ", idx, ": ", resource_name, ": "); - } auto parsed_resource_name = xds_client()->ParseXdsResourceName(resource_name, result_.type); if (!parsed_resource_name.ok()) { @@ -778,16 +814,12 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( resource_state.ignored_deletion = false; } // Update resource state based on whether the resource is valid. - if (!result->resource.ok()) { - result_.errors.emplace_back(absl::StrCat( - error_prefix, - "validation error: ", result->resource.status().ToString())); + if (!decode_status.ok()) { xds_client()->NotifyWatchersOnErrorLocked( resource_state.watchers, - absl::UnavailableError(absl::StrCat( - "invalid resource: ", result->resource.status().ToString()))); - UpdateResourceMetadataNacked(result_.version, - result->resource.status().ToString(), + absl::UnavailableError( + absl::StrCat("invalid resource: ", decode_status.ToString()))); + UpdateResourceMetadataNacked(result_.version, decode_status.ToString(), update_time_, &resource_state.meta); return; } @@ -796,7 +828,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( // If it didn't change, ignore it. if (resource_state.resource != nullptr && result_.type->ResourcesEqual(resource_state.resource.get(), - result->resource->get())) { + decode_result->resource->get())) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] %s resource %s identical to current, ignoring.", @@ -806,7 +838,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( return; } // Update the resource state. - resource_state.resource = std::move(*result->resource); + resource_state.resource = std::move(*decode_result->resource); resource_state.meta = CreateResourceMetadataAcked( std::string(serialized_resource), result_.version, update_time_); // Notify watchers. @@ -987,11 +1019,12 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage( status.ToString().c_str()); } else { seen_response_ = true; + chand()->status_ = absl::OkStatus(); AdsResponseParser::Result result = parser.TakeResult(); // Update nonce. auto& state = state_map_[result.type]; state.nonce = result.nonce; - // If we got an error, set state.error so that we'll NACK the update. + // If we got an error, set state.status so that we'll NACK the update. if (!result.errors.empty()) { state.status = absl::UnavailableError( absl::StrCat("xDS response validation errors: [", @@ -1084,10 +1117,9 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived( if (IsCurrentCallOnChannel()) { // Try to restart the call. parent_->OnCallFinishedLocked(); - // Send error to all watchers. - xds_client()->NotifyOnErrorLocked(absl::UnavailableError(absl::StrFormat( - "xDS call failed: xDS server: %s, ADS call status: %s", - chand()->server_.server_uri(), status.ToString().c_str()))); + // Send error to all watchers for the channel. + chand()->SetChannelStatusLocked(absl::UnavailableError(absl::StrFormat( + "xDS call failed; status: %s", status.ToString().c_str()))); } } xds_client()->work_serializer_.DrainQueue(); @@ -1461,17 +1493,16 @@ void XdsClient::WatchResource(const XdsResourceType* type, invalid_watchers_[w] = watcher; } work_serializer_.Run( - // TODO(yashykt): When we move to C++14, capture watcher using - // std::move() - [watcher, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { - watcher->OnError(status); - }, + [watcher = std::move(watcher), status = std::move(status)]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { + watcher->OnError(status); + }, DEBUG_LOCATION); }; auto resource_name = ParseXdsResourceName(name, type); if (!resource_name.ok()) { - fail(absl::UnavailableError(absl::StrFormat( - "Unable to parse resource name for listener %s", name))); + fail(absl::UnavailableError( + absl::StrCat("Unable to parse resource name ", name))); return; } // Find server to use. @@ -1511,6 +1542,39 @@ void XdsClient::WatchResource(const XdsResourceType* type, delete value; }, DEBUG_LOCATION); + } else if (resource_state.meta.client_status == + XdsApi::ResourceMetadata::DOES_NOT_EXIST) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] reporting cached does-not-exist for %s", this, + std::string(name).c_str()); + } + work_serializer_.Schedule( + [watcher]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { + watcher->OnResourceDoesNotExist(); + }, + DEBUG_LOCATION); + } else if (resource_state.meta.client_status == + XdsApi::ResourceMetadata::NACKED) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log( + GPR_INFO, + "[xds_client %p] reporting cached validation failure for %s: %s", + this, std::string(name).c_str(), + resource_state.meta.failed_details.c_str()); + } + std::string details = resource_state.meta.failed_details; + const auto* node = bootstrap_->node(); + if (node != nullptr) { + absl::StrAppend(&details, " (node ID:", bootstrap_->node()->id(), ")"); + } + work_serializer_.Schedule( + [watcher, details = std::move(details)]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { + watcher->OnError(absl::UnavailableError( + absl::StrCat("invalid resource: ", details))); + }, + DEBUG_LOCATION); } // If the authority doesn't yet have a channel, set it, creating it if // needed. @@ -1518,6 +1582,21 @@ void XdsClient::WatchResource(const XdsResourceType* type, authority_state.channel_state = GetOrCreateChannelStateLocked(*xds_server, "start watch"); } + absl::Status channel_status = authority_state.channel_state->status(); + if (!channel_status.ok()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] returning cached channel error for %s: %s", + this, std::string(name).c_str(), + channel_status.ToString().c_str()); + } + work_serializer_.Schedule( + [watcher = std::move(watcher), status = std::move(channel_status)]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) mutable { + watcher->OnError(std::move(status)); + }, + DEBUG_LOCATION); + } authority_state.channel_state->SubscribeLocked(type, *resource_name); } work_serializer_.DrainQueue(); @@ -1781,34 +1860,6 @@ void XdsClient::ResetBackoff() { } } -void XdsClient::NotifyOnErrorLocked(absl::Status status) { - const auto* node = bootstrap_->node(); - if (node != nullptr) { - status = absl::Status( - status.code(), - absl::StrCat(status.message(), " (node ID:", node->id(), ")")); - } - std::set> watchers; - for (const auto& a : authority_state_map_) { // authority - for (const auto& t : a.second.resource_map) { // type - for (const auto& r : t.second) { // resource id - for (const auto& w : r.second.watchers) { // watchers - watchers.insert(w.second); - } - } - } - } - work_serializer_.Schedule( - // TODO(yashykt): When we move to C++14, capture watchers using - // std::move() - [watchers, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { - for (const auto& watcher : watchers) { - watcher->OnError(status); - } - }, - DEBUG_LOCATION); -} - void XdsClient::NotifyWatchersOnErrorLocked( const std::map>& watchers, @@ -1820,11 +1871,12 @@ void XdsClient::NotifyWatchersOnErrorLocked( absl::StrCat(status.message(), " (node ID:", node->id(), ")")); } work_serializer_.Schedule( - [watchers, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { - for (const auto& p : watchers) { - p.first->OnError(status); - } - }, + [watchers, status = std::move(status)]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { + for (const auto& p : watchers) { + p.first->OnError(status); + } + }, DEBUG_LOCATION); } diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index b5fbabb9651..b936e9ac82c 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -186,8 +186,9 @@ class XdsClient : public DualRefCounted { void MaybeStartLrsCall(); void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - bool HasAdsCall() const; - bool HasActiveAdsCall() const; + // Returns non-OK if there has been an error since the last time the + // ADS stream saw a response. + const absl::Status& status() const { return status_; } void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name) @@ -198,8 +199,11 @@ class XdsClient : public DualRefCounted { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); private: - void OnConnectivityStateChange(absl::Status status); - void OnConnectivityStateChangeLocked(absl::Status status) + void OnConnectivityFailure(absl::Status status); + + // Enqueues error notifications to watchers. Caller must drain + // XdsClient::work_serializer_ after releasing the lock. + void SetChannelStatusLocked(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); // The owning xds client. @@ -218,6 +222,8 @@ class XdsClient : public DualRefCounted { // Stores the most recent accepted resource version for each resource type. std::map resource_type_version_map_; + + absl::Status status_; }; struct ResourceState { @@ -259,9 +265,6 @@ class XdsClient : public DualRefCounted { LoadReportMap load_report_map; }; - // Sends an error notification to all watchers. - void NotifyOnErrorLocked(absl::Status status) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Sends an error notification to a specific set of watchers. void NotifyWatchersOnErrorLocked( const std::map #include +#include "absl/strings/str_cat.h" + #include #include #include @@ -235,7 +237,9 @@ class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& status) override { if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - on_connectivity_failure_(status); + on_connectivity_failure_(absl::Status( + status.code(), + absl::StrCat("channel in TRANSIENT_FAILURE: ", status.message()))); } } diff --git a/src/core/lib/json/json_object_loader.h b/src/core/lib/json/json_object_loader.h index 2fbec033136..baf61afbbcf 100644 --- a/src/core/lib/json/json_object_loader.h +++ b/src/core/lib/json/json_object_loader.h @@ -51,7 +51,7 @@ // struct Foo { // int a; // int b; -// static const JsonLoaderInterface* JsonLoader() { +// static const JsonLoaderInterface* JsonLoader(const JsonArgs& args) { // // Note: Field names must be string constants; they are not copied. // static const auto* loader = JsonObjectLoader() // .Field("a", &Foo::a) @@ -60,7 +60,10 @@ // return loader; // } // // Optional; omit if no post-processing needed. -// void JsonPostLoad(const Json& source, ErrorList* errors) { ++a; } +// void JsonPostLoad(const Json& source, const JsonArgs& args, +// ErrorList* errors) { +// ++a; +// } // }; // Now we can load Foo objects from JSON: // absl::StatusOr foo = LoadFromJson(json); diff --git a/test/core/xds/BUILD b/test/core/xds/BUILD index 789fdbc4562..e77e713d6a2 100644 --- a/test/core/xds/BUILD +++ b/test/core/xds/BUILD @@ -12,7 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") +load( + "//bazel:grpc_build_system.bzl", + "grpc_cc_library", + "grpc_cc_test", + "grpc_package", +) grpc_package(name = "test/core/xds") @@ -116,3 +121,36 @@ grpc_cc_test( "//test/cpp/util:grpc_cli_utils", ], ) + +grpc_cc_library( + name = "xds_transport_fake", + testonly = True, + srcs = ["xds_transport_fake.cc"], + hdrs = ["xds_transport_fake.h"], + external_deps = [ + "absl/strings", + "absl/types:optional", + ], + language = "C++", + deps = [ + "//:orphanable", + "//:ref_counted_ptr", + "//:xds_client", + "//test/core/util:grpc_test_util", + ], +) + +grpc_cc_test( + name = "xds_client_test", + srcs = ["xds_client_test.cc"], + external_deps = ["gtest"], + language = "C++", + uses_event_engine = True, + uses_polling = False, + deps = [ + ":xds_transport_fake", + "//:xds_client", + "//src/proto/grpc/testing/xds/v3:discovery_proto", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/xds/xds_client_test.cc b/test/core/xds/xds_client_test.cc new file mode 100644 index 00000000000..65ae89f055d --- /dev/null +++ b/test/core/xds/xds_client_test.cc @@ -0,0 +1,2303 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// TODO(roth): Add the following tests: +// - tests for DumpClientConfigBinary() +// - tests for load-reporting APIs? (or maybe move those out of XdsClient?) + +#include "src/core/ext/xds/xds_client.h" + +#include +#include +#include +#include + +#include +#include + +#include "src/core/ext/xds/xds_bootstrap.h" +#include "src/core/ext/xds/xds_resource_type_impl.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/json/json.h" +#include "src/core/lib/json/json_object_loader.h" +#include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h" +#include "test/core/util/test_config.h" +#include "test/core/xds/xds_transport_fake.h" + +using envoy::service::discovery::v3::DiscoveryRequest; +using envoy::service::discovery::v3::DiscoveryResponse; + +namespace grpc_core { +namespace testing { +namespace { + +class XdsClientTest : public ::testing::Test { + protected: + // A fake bootstrap implementation that allows tests to populate the + // fields however they want. + class FakeXdsBootstrap : public XdsBootstrap { + public: + class FakeNode : public Node { + public: + const std::string& id() const override { return id_; } + const std::string& cluster() const override { return cluster_; } + const std::string& locality_region() const override { + return locality_region_; + } + const std::string& locality_zone() const override { + return locality_zone_; + } + const std::string& locality_sub_zone() const override { + return locality_sub_zone_; + } + const Json::Object& metadata() const override { return metadata_; } + + void set_id(std::string id) { id_ = std::move(id); } + void set_cluster(std::string cluster) { cluster_ = std::move(cluster); } + void set_locality_region(std::string locality_region) { + locality_region_ = std::move(locality_region); + } + void set_locality_zone(std::string locality_zone) { + locality_zone_ = std::move(locality_zone); + } + void set_locality_sub_zone(std::string locality_sub_zone) { + locality_sub_zone_ = std::move(locality_sub_zone); + } + void set_metadata(Json::Object metadata) { + metadata_ = std::move(metadata); + } + + private: + std::string id_ = "xds_client_test"; + std::string cluster_; + std::string locality_region_; + std::string locality_zone_; + std::string locality_sub_zone_; + Json::Object metadata_; + }; + + class FakeXdsServer : public XdsServer { + public: + const std::string& server_uri() const override { return server_uri_; } + bool ShouldUseV3() const override { return use_v3_; } + bool IgnoreResourceDeletion() const override { + return ignore_resource_deletion_; + } + bool Equals(const XdsServer& other) const override { + const auto& o = static_cast(other); + return server_uri_ == o.server_uri_ && use_v3_ == o.use_v3_ && + ignore_resource_deletion_ == o.ignore_resource_deletion_; + } + + void set_server_uri(std::string server_uri) { + server_uri_ = std::move(server_uri); + } + void set_use_v3(bool use_v3) { use_v3_ = use_v3; } + void set_ignore_resource_deletion(bool ignore_resource_deletion) { + ignore_resource_deletion_ = ignore_resource_deletion; + } + + private: + std::string server_uri_ = "default_xds_server"; + bool use_v3_ = true; + bool ignore_resource_deletion_ = false; + }; + + class FakeAuthority : public Authority { + public: + const XdsServer* server() const override { + return server_.has_value() ? &*server_ : nullptr; + } + + void set_server(absl::optional server) { + server_ = std::move(server); + } + + private: + absl::optional server_; + }; + + class Builder { + public: + Builder() { node_.emplace(); } + + Builder& set_use_v2() { + server_.set_use_v3(false); + return *this; + } + Builder& set_node_id(std::string id) { + if (!node_.has_value()) node_.emplace(); + node_->set_id(std::move(id)); + return *this; + } + Builder& AddAuthority(std::string name, FakeAuthority authority) { + authorities_[std::move(name)] = std::move(authority); + return *this; + } + std::unique_ptr Build() { + auto bootstrap = absl::make_unique(); + bootstrap->server_ = std::move(server_); + bootstrap->node_ = std::move(node_); + bootstrap->authorities_ = std::move(authorities_); + return bootstrap; + } + + private: + FakeXdsServer server_; + absl::optional node_; + std::map authorities_; + }; + + std::string ToString() const override { return ""; } + + const XdsServer& server() const override { return server_; } + const Node* node() const override { + return node_.has_value() ? &*node_ : nullptr; + } + const Authority* LookupAuthority(const std::string& name) const override { + auto it = authorities_.find(name); + if (it == authorities_.end()) return nullptr; + return &it->second; + } + const XdsServer* FindXdsServer(const XdsServer& server) const override { + const auto& fake_server = static_cast(server); + if (fake_server == server_) return &server_; + for (const auto& p : authorities_) { + const auto* authority_server = + static_cast(p.second.server()); + if (authority_server != nullptr && *authority_server == fake_server) { + return authority_server; + } + } + return nullptr; + } + + private: + FakeXdsServer server_; + absl::optional node_; + std::map authorities_; + }; + + // A template for a test xDS resource type with an associated watcher impl. + // For simplicity, we use JSON instead of proto for serialization. + // The specified ResourceStruct must provide the following: + // - a static JsonLoader() method, as described in json_object_loader.h + // - an AsJsonString() method that returns the object in JSON string form + // - a static TypeUrl() method that returns the V3 resource type + // - a static TypeUrlV2() method that returns the V2 resource type + template + class XdsTestResourceType + : public XdsResourceTypeImpl, + ResourceStruct> { + public: + // A watcher implementation that queues delivered watches. + class Watcher + : public XdsResourceTypeImpl, + ResourceStruct>::WatcherInterface { + public: + bool ExpectNoEvent(absl::Duration timeout) { + MutexLock lock(&mu_); + return !WaitForEventLocked(timeout); + } + + bool HasEvent() { + MutexLock lock(&mu_); + return !queue_.empty(); + } + + absl::optional WaitForNextResource( + absl::Duration timeout = absl::Seconds(1), + SourceLocation location = SourceLocation()) { + MutexLock lock(&mu_); + if (!WaitForEventLocked(timeout)) return absl::nullopt; + Event& event = queue_.front(); + if (!absl::holds_alternative(event)) { + EXPECT_TRUE(false) + << "got unexpected event " + << (absl::holds_alternative(event) + ? "error" + : "does-not-exist") + << " at " << location.file() << ":" << location.line(); + return absl::nullopt; + } + ResourceStruct foo = std::move(absl::get(event)); + queue_.pop_front(); + return std::move(foo); + } + + absl::optional WaitForNextError( + absl::Duration timeout = absl::Seconds(1), + SourceLocation location = SourceLocation()) { + MutexLock lock(&mu_); + if (!WaitForEventLocked(timeout)) return absl::nullopt; + Event& event = queue_.front(); + if (!absl::holds_alternative(event)) { + EXPECT_TRUE(false) + << "got unexpected event " + << (absl::holds_alternative(event) + ? "resource" + : "does-not-exist") + << " at " << location.file() << ":" << location.line(); + return absl::nullopt; + } + absl::Status error = std::move(absl::get(event)); + queue_.pop_front(); + return std::move(error); + } + + bool WaitForDoesNotExist(absl::Duration timeout, + SourceLocation location = SourceLocation()) { + MutexLock lock(&mu_); + if (!WaitForEventLocked(timeout)) return false; + Event& event = queue_.front(); + if (!absl::holds_alternative(event)) { + EXPECT_TRUE(false) + << "got unexpected event " + << (absl::holds_alternative(event) ? "resource" + : "error") + << " at " << location.file() << ":" << location.line(); + return false; + } + queue_.pop_front(); + return true; + } + + private: + struct DoesNotExist {}; + using Event = absl::variant; + + void OnResourceChanged(ResourceStruct foo) override { + MutexLock lock(&mu_); + queue_.push_back(std::move(foo)); + cv_.Signal(); + } + void OnError(absl::Status status) override { + MutexLock lock(&mu_); + queue_.push_back(std::move(status)); + cv_.Signal(); + } + void OnResourceDoesNotExist() override { + MutexLock lock(&mu_); + queue_.push_back(DoesNotExist()); + cv_.Signal(); + } + + bool WaitForEventLocked(absl::Duration timeout) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { + while (queue_.empty()) { + if (cv_.WaitWithTimeout(&mu_, + timeout * grpc_test_slowdown_factor())) { + return false; + } + } + return true; + } + + Mutex mu_; + CondVar cv_; + std::deque queue_ ABSL_GUARDED_BY(&mu_); + }; + + absl::string_view type_url() const override { + return ResourceStruct::TypeUrl(); + } + absl::string_view v2_type_url() const override { + return ResourceStruct::TypeUrlV2(); + } + absl::StatusOr Decode( + const XdsResourceType::DecodeContext& /*context*/, + absl::string_view serialized_resource, bool /*is_v2*/) const override { + auto json = Json::Parse(serialized_resource); + if (!json.ok()) return json.status(); + absl::StatusOr foo = LoadFromJson(*json); + XdsResourceType::DecodeResult result; + if (!foo.ok()) { + auto it = json->object_value().find("name"); + if (it == json->object_value().end()) { + return absl::InvalidArgumentError( + "cannot determine name for invalid resource"); + } + result.name = it->second.string_value(); + result.resource = foo.status(); + } else { + result.name = foo->name; + auto resource = absl::make_unique, + ResourceStruct>::ResourceDataSubclass>(); + resource->resource = std::move(*foo); + result.resource = std::move(resource); + } + return std::move(result); + } + void InitUpbSymtab(upb_DefPool* /*symtab*/) const override {} + + static google::protobuf::Any EncodeAsAny(const ResourceStruct& resource) { + google::protobuf::Any any; + any.set_type_url( + absl::StrCat("type.googleapis.com/", ResourceStruct::TypeUrl())); + any.set_value(resource.AsJsonString()); + return any; + } + }; + + // A fake "Foo" xDS resource type. + struct XdsFooResource { + std::string name; + uint32_t value; + + bool operator==(const XdsFooResource& other) const { + return name == other.name && value == other.value; + } + + std::string AsJsonString() const { + return absl::StrCat("{\"name\":\"", name, "\",\"value\":", value, "}"); + } + + static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { + static const auto* loader = JsonObjectLoader() + .Field("name", &XdsFooResource::name) + .Field("value", &XdsFooResource::value) + .Finish(); + return loader; + } + + static absl::string_view TypeUrl() { return "test.v3.foo"; } + static absl::string_view TypeUrlV2() { return "test.v2.foo"; } + }; + using XdsFooResourceType = XdsTestResourceType; + + // A fake "Bar" xDS resource type. + struct XdsBarResource { + std::string name; + std::string value; + + bool operator==(const XdsBarResource& other) const { + return name == other.name && value == other.value; + } + + std::string AsJsonString() const { + return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value, + "\"}"); + } + + static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { + static const auto* loader = JsonObjectLoader() + .Field("name", &XdsBarResource::name) + .Field("value", &XdsBarResource::value) + .Finish(); + return loader; + } + + static absl::string_view TypeUrl() { return "test.v3.bar"; } + static absl::string_view TypeUrlV2() { return "test.v2.bar"; } + }; + using XdsBarResourceType = XdsTestResourceType; + + // A helper class to build and serialize a DiscoveryResponse. + class ResponseBuilder { + public: + explicit ResponseBuilder(absl::string_view type_url) { + response_.set_type_url(absl::StrCat("type.googleapis.com/", type_url)); + } + + ResponseBuilder& set_version_info(absl::string_view version_info) { + response_.set_version_info(std::string(version_info)); + return *this; + } + ResponseBuilder& set_nonce(absl::string_view nonce) { + response_.set_nonce(std::string(nonce)); + return *this; + } + + template + ResponseBuilder& AddResource( + const decltype(ResourceType::ResourceDataSubclass::resource)& resource, + bool in_resource_wrapper = false) { + auto* res = response_.add_resources(); + *res = ResourceType::EncodeAsAny(resource); + if (in_resource_wrapper) { + envoy::service::discovery::v3::Resource resource_wrapper; + resource_wrapper.set_name(resource.name); + *resource_wrapper.mutable_resource() = std::move(*res); + res->PackFrom(resource_wrapper); + } + return *this; + } + + ResponseBuilder& AddFooResource(const XdsFooResource& resource, + bool in_resource_wrapper = false) { + return AddResource(resource, in_resource_wrapper); + } + + ResponseBuilder& AddBarResource(const XdsBarResource& resource, + bool in_resource_wrapper = false) { + return AddResource(resource, in_resource_wrapper); + } + + ResponseBuilder& AddInvalidResource( + absl::string_view type_url, absl::string_view value, + absl::string_view resource_wrapper_name = "") { + auto* res = response_.add_resources(); + res->set_type_url(absl::StrCat("type.googleapis.com/", type_url)); + res->set_value(std::string(value)); + if (!resource_wrapper_name.empty()) { + envoy::service::discovery::v3::Resource resource_wrapper; + resource_wrapper.set_name(std::string(resource_wrapper_name)); + *resource_wrapper.mutable_resource() = std::move(*res); + res->PackFrom(resource_wrapper); + } + return *this; + } + + std::string Serialize() { + std::string serialized_response; + EXPECT_TRUE(response_.SerializeToString(&serialized_response)); + return serialized_response; + } + + private: + DiscoveryResponse response_; + }; + + class ScopedExperimentalEnvVar { + public: + explicit ScopedExperimentalEnvVar(const char* env_var) : env_var_(env_var) { + gpr_setenv(env_var_, "true"); + } + + ~ScopedExperimentalEnvVar() { gpr_unsetenv(env_var_); } + + private: + const char* env_var_; + }; + + // Sets transport_factory_ and initializes xds_client_ with the + // specified bootstrap config. + void InitXdsClient( + FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(), + Duration resource_request_timeout = Duration::Seconds(15) * + grpc_test_slowdown_factor()) { + auto transport_factory = MakeOrphanable(); + transport_factory_ = transport_factory->Ref(); + xds_client_ = MakeRefCounted(bootstrap_builder.Build(), + std::move(transport_factory), + resource_request_timeout); + } + + // Starts and cancels a watch for a Foo resource. + RefCountedPtr StartFooWatch( + absl::string_view resource_name) { + auto watcher = MakeRefCounted(); + XdsFooResourceType::StartWatch(xds_client_.get(), resource_name, watcher); + return watcher; + } + void CancelFooWatch(XdsFooResourceType::Watcher* watcher, + absl::string_view resource_name, + bool delay_unsubscription = false) { + XdsFooResourceType::CancelWatch(xds_client_.get(), resource_name, watcher, + delay_unsubscription); + } + + // Starts and cancels a watch for a Bar resource. + RefCountedPtr StartBarWatch( + absl::string_view resource_name) { + auto watcher = MakeRefCounted(); + XdsBarResourceType::StartWatch(xds_client_.get(), resource_name, watcher); + return watcher; + } + void CancelBarWatch(XdsBarResourceType::Watcher* watcher, + absl::string_view resource_name, + bool delay_unsubscription = false) { + XdsBarResourceType::CancelWatch(xds_client_.get(), resource_name, watcher, + delay_unsubscription); + } + + RefCountedPtr WaitForAdsStream( + const XdsBootstrap::XdsServer& server, + absl::Duration timeout = absl::Seconds(5)) { + const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server); + GPR_ASSERT(xds_server != nullptr); + return transport_factory_->WaitForStream( + *xds_server, FakeXdsTransportFactory::kAdsMethod, + timeout * grpc_test_slowdown_factor()); + } + + void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server, + absl::Status status) { + const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server); + GPR_ASSERT(xds_server != nullptr); + transport_factory_->TriggerConnectionFailure(*xds_server, status); + } + + RefCountedPtr WaitForAdsStream( + absl::Duration timeout = absl::Seconds(5)) { + return WaitForAdsStream(xds_client_->bootstrap().server(), timeout); + } + + // Gets the latest request sent to the fake xDS server. + absl::optional WaitForRequest( + FakeXdsTransportFactory::FakeStreamingCall* stream, + absl::Duration timeout = absl::Seconds(1), + SourceLocation location = SourceLocation()) { + auto message = + stream->WaitForMessageFromClient(timeout * grpc_test_slowdown_factor()); + if (!message.has_value()) return absl::nullopt; + DiscoveryRequest request; + bool success = request.ParseFromString(*message); + EXPECT_TRUE(success) << "Failed to deserialize DiscoveryRequest at " + << location.file() << ":" << location.line(); + if (!success) return absl::nullopt; + return std::move(request); + } + + // Helper function to check the fields of a DiscoveryRequest. + void CheckRequest(const DiscoveryRequest& request, absl::string_view type_url, + absl::string_view version_info, + absl::string_view response_nonce, absl::Status error_detail, + std::set resource_names, + SourceLocation location = SourceLocation()) { + EXPECT_EQ(request.type_url(), + absl::StrCat("type.googleapis.com/", type_url)) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.version_info(), version_info) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.response_nonce(), response_nonce) + << location.file() << ":" << location.line(); + if (error_detail.ok()) { + EXPECT_FALSE(request.has_error_detail()) + << location.file() << ":" << location.line(); + } else { + EXPECT_EQ(request.error_detail().code(), + static_cast(error_detail.code())) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.error_detail().message(), error_detail.message()) + << location.file() << ":" << location.line(); + } + EXPECT_THAT(request.resource_names(), + ::testing::UnorderedElementsAreArray(resource_names)) + << location.file() << ":" << location.line(); + } + + // Helper function to check the contents of the node message in a + // request against the client's node info. + void CheckRequestNode(const DiscoveryRequest& request, + bool check_build_version = false, + SourceLocation location = SourceLocation()) { + // These fields come from the bootstrap config. + EXPECT_EQ(request.node().id(), xds_client_->bootstrap().node()->id()) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.node().cluster(), + xds_client_->bootstrap().node()->cluster()) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.node().locality().region(), + xds_client_->bootstrap().node()->locality_region()) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.node().locality().zone(), + xds_client_->bootstrap().node()->locality_zone()) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.node().locality().sub_zone(), + xds_client_->bootstrap().node()->locality_sub_zone()) + << location.file() << ":" << location.line(); + if (xds_client_->bootstrap().node()->metadata().empty()) { + EXPECT_FALSE(request.node().has_metadata()) + << location.file() << ":" << location.line(); + } else { + std::string metadata_json_str; + auto status = + MessageToJsonString(request.node().metadata(), &metadata_json_str, + google::protobuf::util::JsonPrintOptions()); + ASSERT_TRUE(status.ok()) + << status << " on " << location.file() << ":" << location.line(); + auto metadata_json = Json::Parse(metadata_json_str); + ASSERT_TRUE(metadata_json.ok()) + << metadata_json.status() << " on " << location.file() << ":" + << location.line(); + EXPECT_EQ(*metadata_json, xds_client_->bootstrap().node()->metadata()) + << location.file() << ":" << location.line() << ":\nexpected: " + << Json{xds_client_->bootstrap().node()->metadata()}.Dump() + << "\nactual: " << metadata_json->Dump(); + } + // These are hard-coded by XdsClient. + EXPECT_EQ(request.node().user_agent_name(), + absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING)) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.node().user_agent_version(), + absl::StrCat("C-core ", grpc_version_string())) + << location.file() << ":" << location.line(); + if (check_build_version) { + auto build_version = GetBuildVersion(request.node()); + ASSERT_TRUE(build_version.has_value()) + << location.file() << ":" << location.line(); + EXPECT_EQ(*build_version, + absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING, " ", + grpc_version_string())) + << location.file() << ":" << location.line(); + } + } + + // Helper function to find the "build_version" field, which was + // removed in v3, but which we still populate in v2. + static absl::optional GetBuildVersion( + const envoy::config::core::v3::Node& node) { + const auto& unknown_field_set = + node.GetReflection()->GetUnknownFields(node); + for (int i = 0; i < unknown_field_set.field_count(); ++i) { + const auto& unknown_field = unknown_field_set.field(i); + if (unknown_field.number() == 5) return unknown_field.length_delimited(); + } + return absl::nullopt; + } + + RefCountedPtr transport_factory_; + RefCountedPtr xds_client_; +}; + +TEST_F(XdsClientTest, BasicWatch) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Cancel watch. + CancelFooWatch(watcher.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, UpdateFromServer) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Server sends an updated version of the resource. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("B") + .AddFooResource(XdsFooResource{"foo1", 9}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 9); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Cancel watch. + CancelFooWatch(watcher.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, MultipleWatchersForSameResource) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Start a second watcher for the same resource. + auto watcher2 = StartFooWatch("foo1"); + // This watcher should get an immediate notification, because the + // resource is already cached. + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // Server should not have seen another request from the client. + ASSERT_FALSE(stream->HaveMessageFromClient()); + // Server sends an updated version of the resource. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("B") + .AddFooResource(XdsFooResource{"foo1", 9}) + .Serialize()); + // XdsClient should deliver the response to both watchers. + resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 9); + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 9); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Cancel one of the watchers. + CancelFooWatch(watcher.get(), "foo1"); + // The server should not see any new request. + ASSERT_FALSE(WaitForRequest(stream.get())); + // Now cancel the second watcher. + CancelFooWatch(watcher2.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, SubscribeToMultipleResources) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Start a watch for "foo2". + auto watcher2 = StartFooWatch("foo2"); + // XdsClient should have sent a subscription request on the ADS stream. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("B") + .AddFooResource(XdsFooResource{"foo2", 7}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo2"); + EXPECT_EQ(resource->value, 7); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + // Cancel watch for "foo1". + CancelFooWatch(watcher.get(), "foo1"); + // XdsClient should send an unsubscription request. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"}); + // Now cancel watch for "foo2". + CancelFooWatch(watcher2.get(), "foo2"); + // The XdsClient may or may not send another unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Start a watch for "foo2". + auto watcher2 = StartFooWatch("foo2"); + // XdsClient should have sent a subscription request on the ADS stream. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("B") + .AddFooResource(XdsFooResource{"foo2", 7}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo2"); + EXPECT_EQ(resource->value, 7); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + // Server sends an update for "foo1". The response does not contain "foo2". + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("C") + .AddFooResource(XdsFooResource{"foo1", 9}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 9); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"C", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + // Cancel watch for "foo1". + CancelFooWatch(watcher.get(), "foo1"); + // XdsClient should send an unsubscription request. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"C", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"}); + // Now cancel watch for "foo2". + CancelFooWatch(watcher2.get(), "foo2"); + // The XdsClient may or may not send another unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"C", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, ResourceValidationFailure) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response containing an invalid resource. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddInvalidResource(XdsFooResourceType::Get()->type_url(), + "{\"name\":\"foo1\",\"value\":[]}") + .Serialize()); + // XdsClient should deliver an error to the watcher. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "invalid resource: INVALID_ARGUMENT: errors validating JSON: " + "[field:value error:is not a number] (node ID:xds_client_test)") + << *error; + // XdsClient should NACK the update. + // Note that version_info is not populated in the request. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest( + *request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"A", + /*error_detail=*/ + absl::InvalidArgumentError( + "xDS response validation errors: [" + "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: " + "[field:value error:is not a number]]"), + /*resource_names=*/{"foo1"}); + // Start a second watch for the same resource. It should immediately + // receive the same error. + auto watcher2 = StartFooWatch("foo1"); + error = watcher2->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "invalid resource: INVALID_ARGUMENT: errors validating JSON: " + "[field:value error:is not a number] (node ID:xds_client_test)") + << *error; + // Now server sends an updated version of the resource. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("B") + .AddFooResource(XdsFooResource{"foo1", 9}) + .Serialize()); + // XdsClient should deliver the response to both watchers. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 9); + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 9); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Cancel watch. + CancelFooWatch(watcher.get(), "foo1"); + CancelFooWatch(watcher2.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Before the server responds, add a watch for another resource. + auto watcher2 = StartFooWatch("foo2"); + // Client should send another request. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + // Add a watch for a third resource. + auto watcher3 = StartFooWatch("foo3"); + // Client should send another request. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2", "foo3"}); + // Add a watch for a fourth resource. + auto watcher4 = StartFooWatch("foo4"); + // Client should send another request. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"}); + // Server sends a response containing three invalid resources and one + // valid resource. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + // foo1: JSON parsing succeeds, so we know the resource name, + // but validation fails. + .AddInvalidResource(XdsFooResourceType::Get()->type_url(), + "{\"name\":\"foo1\",\"value\":[]}") + // foo2: JSON parsing fails, and not wrapped in a Resource + // wrapper, so we don't actually know the resource's name. + .AddInvalidResource(XdsFooResourceType::Get()->type_url(), + "{\"name\":\"foo2,\"value\":6}") + // foo3: JSON parsing fails, but it is wrapped in a Resource + // wrapper, so we do know the resource's name. + .AddInvalidResource(XdsFooResourceType::Get()->type_url(), + "{\"name\":\"foo3,\"value\":6}", + /*resource_wrapper_name=*/"foo3") + // foo4: valid resource. + .AddFooResource(XdsFooResource{"foo4", 5}) + .Serialize()); + // XdsClient should deliver an error to the watchers for foo1 and foo3. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "invalid resource: INVALID_ARGUMENT: errors validating JSON: " + "[field:value error:is not a number] (node ID:xds_client_test)") + << *error; + error = watcher3->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "invalid resource: INVALID_ARGUMENT: JSON parsing failed: " + "[JSON parse error at index 15] (node ID:xds_client_test)") + << *error; + // It cannot delivery an error for foo2, because the client doesn't know + // that that resource in the response was actually supposed to be foo2. + EXPECT_FALSE(watcher2->HasEvent()); + // It will delivery a valid resource update for foo4. + auto resource = watcher4->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo4"); + EXPECT_EQ(resource->value, 5); + // XdsClient should NACK the update. + // There was one good resource, so the version will be updated. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/ + absl::InvalidArgumentError( + "xDS response validation errors: [" + // foo1 + "resource index 0: foo1: " + "INVALID_ARGUMENT: errors validating JSON: " + "[field:value error:is not a number]; " + // foo2 (name not known) + "resource index 1: INVALID_ARGUMENT: JSON parsing failed: " + "[JSON parse error at index 15]; " + // foo3 + "resource index 2: foo3: " + "INVALID_ARGUMENT: JSON parsing failed: " + "[JSON parse error at index 15]]"), + /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"}); + // Cancel watches. + CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); + CancelFooWatch(watcher.get(), "foo2", /*delay_unsubscription=*/true); + CancelFooWatch(watcher.get(), "foo3", /*delay_unsubscription=*/true); + CancelFooWatch(watcher.get(), "foo4"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Send an update containing an invalid resource. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("B") + .AddInvalidResource(XdsFooResourceType::Get()->type_url(), + "{\"name\":\"foo1\",\"value\":[]}") + .Serialize()); + // XdsClient should deliver an error to the watcher. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "invalid resource: INVALID_ARGUMENT: errors validating JSON: " + "[field:value error:is not a number] (node ID:xds_client_test)") + << *error; + // XdsClient should NACK the update. + // Note that version_info is set to the previous version in this request, + // because there were no valid resources in it. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest( + *request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + /*error_detail=*/ + absl::InvalidArgumentError( + "xDS response validation errors: [" + "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: " + "[field:value error:is not a number]]"), + /*resource_names=*/{"foo1"}); + // Start a second watcher for the same resource. Even though the last + // update was a NACK, we should still deliver the cached resource to + // the watcher. + // TODO(roth): Consider what the right behavior is here. It seems + // inconsistent that the watcher sees the error if it had started + // before the error was seen but does not if it was started afterwards. + // One option is to not send errors at all for already-cached resources; + // another option is to send the errors even for newly started watchers. + auto watcher2 = StartFooWatch("foo1"); + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // Cancel watches. + CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); + CancelFooWatch(watcher2.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, ResourceDoesNotExist) { + InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Do not send a response, but wait for the resource to be reported as + // not existing. + EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5))); + // Start a new watcher for the same resource. It should immediately + // receive the same does-not-exist notification. + auto watcher2 = StartFooWatch("foo1"); + EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1))); + // Now server sends a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watchers. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Cancel watch. + CancelFooWatch(watcher.get(), "foo1"); + CancelFooWatch(watcher2.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, StreamClosedByServer) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Server sends a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Now server closes the stream. + stream->MaybeSendStatusToClient(absl::OkStatus()); + // XdsClient should report error to watcher. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "xDS channel for server default_xds_server: xDS call failed; " + "status: OK (node ID:xds_client_test)") + << *error; + // XdsClient should create a new stream. + stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient sends a subscription request. + // Note that the version persists from the previous stream, but the + // nonce does not. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Before the server resends the resource, start a new watcher for the + // same resource. This watcher should immediately receive the cached + // resource and then the error notification -- in that order. + auto watcher2 = StartFooWatch("foo1"); + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + error = watcher2->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "xDS channel for server default_xds_server: xDS call failed; " + "status: OK (node ID:xds_client_test)") + << *error; + // Create a watcher for a new resource. This should immediately + // receive the cached channel error. + auto watcher3 = StartFooWatch("foo2"); + error = watcher3->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "xDS channel for server default_xds_server: xDS call failed; " + "status: OK (node ID:xds_client_test)") + << *error; + // And the client will send a new request subscribing to the new resource. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + // Server now sends the requested resources. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("B") + .AddFooResource(XdsFooResource{"foo1", 6}) + .AddFooResource(XdsFooResource{"foo2", 7}) + .Serialize()); + // Watchers for foo1 do NOT get an update, since the resource has not changed. + EXPECT_FALSE(watcher->WaitForNextResource()); + EXPECT_FALSE(watcher2->WaitForNextResource()); + // The watcher for foo2 gets the newly delivered resource. + resource = watcher3->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo2"); + EXPECT_EQ(resource->value, 7); + // XdsClient sends an ACK. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + // Cancel watchers. + CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); + CancelFooWatch(watcher2.get(), "foo1", /*delay_unsubscription=*/true); + CancelFooWatch(watcher3.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, StreamClosedByServerAndResourcesNotResentOnNewStream) { + // Lower resources-does-not-exist timeout, to make sure that we're not + // triggering that here. + InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Server sends a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Now server closes the stream. + stream->MaybeSendStatusToClient(absl::OkStatus()); + // XdsClient should report error to watcher. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "xDS channel for server default_xds_server: xDS call failed; " + "status: OK (node ID:xds_client_test)") + << *error; + // XdsClient should create a new stream. + stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient sends a subscription request. + // Note that the version persists from the previous stream, but the + // nonce does not. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Server does NOT send the resource again. + // Watcher should not get any update, since the resource has not changed. + // We wait 5s here to ensure we don't trigger the resource-does-not-exist + // timeout. + EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5))); + // Cancel watch. + CancelFooWatch(watcher.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, ConnectionFails) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Transport reports connection failure. + TriggerConnectionFailure(xds_client_->bootstrap().server(), + absl::UnavailableError("connection failed")); + // XdsClient should report an error to the watcher. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "xDS channel for server default_xds_server: " + "connection failed (node ID:xds_client_test)") + << *error; + // Start a new watch. This watcher should be given the same error, + // since we have not yet recovered. + auto watcher2 = StartFooWatch("foo1"); + error = watcher2->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "xDS channel for server default_xds_server: " + "connection failed (node ID:xds_client_test)") + << *error; + // Inside the XdsTransport interface, the channel will eventually + // reconnect, and the call will proceed. None of that will be visible + // to the XdsClient, because the call uses wait_for_ready. So here, + // to simulate the connection being established, all we need to do is + // allow the stream to proceed. + // Server sends a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watchers. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Cancel watch. + CancelFooWatch(watcher.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, ResourceWrappedInResourceMessage) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response with the resource wrapped in a Resource message. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}, + /*in_resource_wrapper=*/true) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Cancel watch. + CancelFooWatch(watcher.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, MultipleResourceTypes) { + InitXdsClient(); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Start a watch for "bar1". + auto watcher2 = StartBarWatch("bar1"); + // XdsClient should have sent a subscription request on the ADS stream. + // Note that version and nonce here do NOT use the values for Foo, + // since each resource type has its own state. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsBarResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"bar1"}); + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsBarResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("B") + .AddBarResource(XdsBarResource{"bar1", "whee"}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource2 = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource2.has_value()); + EXPECT_EQ(resource2->name, "bar1"); + EXPECT_EQ(resource2->value, "whee"); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsBarResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"bar1"}); + // Cancel watch for "foo1". + CancelFooWatch(watcher.get(), "foo1"); + // XdsClient should send an unsubscription request. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + // Now cancel watch for "bar1". + CancelBarWatch(watcher2.get(), "bar1"); + // The XdsClient may or may not send another unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsBarResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, BasicWatchV2) { + InitXdsClient(FakeXdsBootstrap::Builder().set_use_v2()); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = transport_factory_->WaitForStream( + xds_client_->bootstrap().server(), FakeXdsTransportFactory::kAdsV2Method, + absl::Seconds(5) * grpc_test_slowdown_factor()); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->v2_type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Node Should be present on the first request. + CheckRequestNode(*request, /*check_build_version=*/true); + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->v2_type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->v2_type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Cancel watch. + CancelFooWatch(watcher.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->v2_type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, Federation) { + ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION"); + constexpr char kAuthority[] = "xds.example.com"; + const std::string kXdstpResourceName = absl::StrCat( + "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); + FakeXdsBootstrap::FakeXdsServer authority_server; + authority_server.set_server_uri("other_xds_server"); + FakeXdsBootstrap::FakeAuthority authority; + authority.set_server(authority_server); + InitXdsClient( + FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority)); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream to the top-level xDS server. + auto stream = WaitForAdsStream(xds_client_->bootstrap().server()); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Start a watch for the xdstp resource name. + auto watcher2 = StartFooWatch(kXdstpResourceName); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher2->HasEvent()); + // XdsClient will create a new stream to the server for this authority. + auto stream2 = WaitForAdsStream(authority_server); + ASSERT_TRUE(stream2 != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + // Note that version and nonce here do NOT use the values for Foo, + // since each authority has its own state. + request = WaitForRequest(stream2.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{kXdstpResourceName}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream2->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("B") + .AddFooResource(XdsFooResource{kXdstpResourceName, 3}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, kXdstpResourceName); + EXPECT_EQ(resource->value, 3); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream2.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{kXdstpResourceName}); + // Cancel watch for "foo1". + CancelFooWatch(watcher.get(), "foo1"); + // XdsClient should send an unsubscription request. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + // Now cancel watch for xdstp resource name. + CancelFooWatch(watcher2.get(), kXdstpResourceName); + // The XdsClient may or may not send the unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream2.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, FederationAuthorityDefaultsToTopLevelXdsServer) { + ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION"); + constexpr char kAuthority[] = "xds.example.com"; + const std::string kXdstpResourceName = absl::StrCat( + "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); + // Authority does not specify any xDS servers, so XdsClient will use + // the top-level xDS server in the bootstrap config for this authority. + InitXdsClient(FakeXdsBootstrap::Builder().AddAuthority( + kAuthority, FakeXdsBootstrap::FakeAuthority())); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream to the top-level xDS server. + auto stream = WaitForAdsStream(xds_client_->bootstrap().server()); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Start a watch for the xdstp resource name. + auto watcher2 = StartFooWatch(kXdstpResourceName); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher2->HasEvent()); + // XdsClient will send a subscription request on the ADS stream that + // includes both resources, since both are being obtained from the + // same server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", kXdstpResourceName}); + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("B") + .AddFooResource(XdsFooResource{kXdstpResourceName, 3}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, kXdstpResourceName); + EXPECT_EQ(resource->value, 3); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", kXdstpResourceName}); + // Cancel watch for "foo1". + CancelFooWatch(watcher.get(), "foo1"); + // XdsClient should send an unsubscription request. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{kXdstpResourceName}); + // Now cancel watch for xdstp resource name. + CancelFooWatch(watcher2.get(), kXdstpResourceName); + // The XdsClient may or may not send the unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, FederationWithUnknownAuthority) { + ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION"); + constexpr char kAuthority[] = "xds.example.com"; + const std::string kXdstpResourceName = absl::StrCat( + "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); + // Note: Not adding authority to bootstrap config. + InitXdsClient(); + // Start a watch for the xdstp resource name. + auto watcher = StartFooWatch(kXdstpResourceName); + // Watcher should immediately get an error about the unknown authority. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "authority \"xds.example.com\" not present in bootstrap config") + << *error; +} + +TEST_F(XdsClientTest, FederationWithUnparseableXdstpResourceName) { + ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION"); + // Note: Not adding authority to bootstrap config. + InitXdsClient(); + // Start a watch for the xdstp resource name. + auto watcher = StartFooWatch("xdstp://x"); + // Watcher should immediately get an error about the unknown authority. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), "Unable to parse resource name xdstp://x") + << *error; +} + +TEST_F(XdsClientTest, FederationDisabledWithNewStyleName) { + // We will use this xdstp name, whose authority is not present in + // the bootstrap config. But since federation is not enabled, we + // will treat this as an opaque old-style name, so we'll send it to + // the default server. + constexpr char kXdstpResourceName[] = + "xdstp://xds.example.com/test.v3.foo/foo1"; + InitXdsClient(); + // Start a watch for the xdstp name. + auto watcher = StartFooWatch(kXdstpResourceName); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{kXdstpResourceName}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{kXdstpResourceName, 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, kXdstpResourceName); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{kXdstpResourceName}); + // Cancel watch. + CancelFooWatch(watcher.get(), kXdstpResourceName); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) { + ScopedExperimentalEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION"); + constexpr char kAuthority[] = "xds.example.com"; + const std::string kXdstpResourceName = absl::StrCat( + "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); + FakeXdsBootstrap::FakeXdsServer authority_server; + authority_server.set_server_uri("other_xds_server"); + FakeXdsBootstrap::FakeAuthority authority; + authority.set_server(authority_server); + InitXdsClient( + FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority)); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream to the top-level xDS server. + auto stream = WaitForAdsStream(xds_client_->bootstrap().server()); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource{"foo1", 6}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Start a watch for the xdstp resource name. + auto watcher2 = StartFooWatch(kXdstpResourceName); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher2->HasEvent()); + // XdsClient will create a new stream to the server for this authority. + auto stream2 = WaitForAdsStream(authority_server); + ASSERT_TRUE(stream2 != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + // Note that version and nonce here do NOT use the values for Foo, + // since each authority has its own state. + request = WaitForRequest(stream2.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{kXdstpResourceName}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream2->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("B") + .AddFooResource(XdsFooResource{kXdstpResourceName, 3}) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, kXdstpResourceName); + EXPECT_EQ(resource->value, 3); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream2.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{kXdstpResourceName}); + // Now cause a channel failure on the stream to the authority's xDS server. + TriggerConnectionFailure(authority_server, + absl::UnavailableError("connection failed")); + // The watcher for the xdstp resource name should see the error. + auto error = watcher2->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "xDS channel for server other_xds_server: connection failed " + "(node ID:xds_client_test)") + << *error; + // The watcher for "foo1" should not see any error. + EXPECT_FALSE(watcher->HasEvent()); + // Cancel watch for "foo1". + CancelFooWatch(watcher.get(), "foo1"); + // XdsClient should send an unsubscription request. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + // Now cancel watch for xdstp resource name. + CancelFooWatch(watcher2.get(), kXdstpResourceName); + // The XdsClient may or may not send the unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream2.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +} // namespace +} // namespace testing +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; +} diff --git a/test/core/xds/xds_transport_fake.cc b/test/core/xds/xds_transport_fake.cc new file mode 100644 index 00000000000..12b57134545 --- /dev/null +++ b/test/core/xds/xds_transport_fake.cc @@ -0,0 +1,230 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include "test/core/xds/xds_transport_fake.h" + +#include +#include +#include + +#include + +#include "src/core/ext/xds/xds_bootstrap.h" +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "test/core/util/test_config.h" + +using grpc_event_engine::experimental::GetDefaultEventEngine; + +namespace grpc_core { + +// +// FakeXdsTransportFactory::FakeStreamingCall +// + +void FakeXdsTransportFactory::FakeStreamingCall::Orphan() { + { + MutexLock lock(&mu_); + // Can't call event_handler_->OnStatusReceived() or unref event_handler_ + // synchronously, since those operations will trigger code in + // XdsClient that acquires its mutex, but it was already holding its + // mutex when it called us, so it would deadlock. + GetDefaultEventEngine()->Run([event_handler = std::move(event_handler_), + status_sent = status_sent_]() mutable { + ExecCtx exec_ctx; + if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus()); + event_handler.reset(); + }); + status_sent_ = true; + } + transport_->RemoveStream(method_, this); + Unref(); +} + +void FakeXdsTransportFactory::FakeStreamingCall::SendMessage( + std::string payload) { + MutexLock lock(&mu_); + from_client_messages_.push_back(std::move(payload)); + cv_.Signal(); + // Can't call event_handler_->OnRequestSent() synchronously, since that + // operation will trigger code in XdsClient that acquires its mutex, but it + // was already holding its mutex when it called us, so it would deadlock. + GetDefaultEventEngine()->Run( + [event_handler = event_handler_->Ref()]() mutable { + ExecCtx exec_ctx; + event_handler->OnRequestSent(/*ok=*/true); + event_handler.reset(); + }); +} + +bool FakeXdsTransportFactory::FakeStreamingCall::HaveMessageFromClient() { + MutexLock lock(&mu_); + return !from_client_messages_.empty(); +} + +absl::optional +FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient( + absl::Duration timeout) { + MutexLock lock(&mu_); + while (from_client_messages_.empty()) { + if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) { + return absl::nullopt; + } + } + std::string payload = from_client_messages_.front(); + from_client_messages_.pop_front(); + return payload; +} + +void FakeXdsTransportFactory::FakeStreamingCall::SendMessageToClient( + absl::string_view payload) { + ExecCtx exec_ctx; + RefCountedPtr event_handler; + { + MutexLock lock(&mu_); + event_handler = event_handler_->Ref(); + } + event_handler->OnRecvMessage(payload); +} + +void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient( + absl::Status status) { + ExecCtx exec_ctx; + RefCountedPtr event_handler; + { + MutexLock lock(&mu_); + if (status_sent_) return; + status_sent_ = true; + event_handler = event_handler_->Ref(); + } + event_handler->OnStatusReceived(std::move(status)); +} + +// +// FakeXdsTransportFactory::FakeXdsTransport +// + +void FakeXdsTransportFactory::FakeXdsTransport::TriggerConnectionFailure( + absl::Status status) { + RefCountedPtr on_connectivity_failure; + { + MutexLock lock(&mu_); + on_connectivity_failure = on_connectivity_failure_->Ref(); + } + ExecCtx exec_ctx; + if (on_connectivity_failure != nullptr) { + on_connectivity_failure->Run(std::move(status)); + } +} + +void FakeXdsTransportFactory::FakeXdsTransport::Orphan() { + { + MutexLock lock(&mu_); + // Can't destroy on_connectivity_failure_ synchronously, since that + // operation will trigger code in XdsClient that acquires its mutex, but + // it was already holding its mutex when it called us, so it would deadlock. + GetDefaultEventEngine()->Run([on_connectivity_failure = std::move( + on_connectivity_failure_)]() mutable { + ExecCtx exec_ctx; + on_connectivity_failure.reset(); + }); + } + Unref(); +} + +RefCountedPtr +FakeXdsTransportFactory::FakeXdsTransport::WaitForStream( + const char* method, absl::Duration timeout) { + MutexLock lock(&mu_); + auto it = active_calls_.find(method); + while (it == active_calls_.end() || it->second == nullptr) { + if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) { + return nullptr; + } + it = active_calls_.find(method); + } + return it->second; +} + +void FakeXdsTransportFactory::FakeXdsTransport::RemoveStream( + const char* method, FakeStreamingCall* call) { + MutexLock lock(&mu_); + auto it = active_calls_.find(method); + if (it != active_calls_.end() && it->second.get() == call) { + active_calls_.erase(it); + } +} + +OrphanablePtr +FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall( + const char* method, + std::unique_ptr event_handler) { + auto call = MakeOrphanable(Ref(), method, + std::move(event_handler)); + MutexLock lock(&mu_); + active_calls_[method] = call->Ref(); + cv_.Signal(); + return call; +} + +// +// FakeXdsTransportFactory +// + +constexpr char FakeXdsTransportFactory::kAdsMethod[]; +constexpr char FakeXdsTransportFactory::kAdsV2Method[]; + +OrphanablePtr +FakeXdsTransportFactory::Create( + const XdsBootstrap::XdsServer& server, + std::function on_connectivity_failure, + absl::Status* /*status*/) { + auto transport = + MakeOrphanable(std::move(on_connectivity_failure)); + MutexLock lock(&mu_); + auto& entry = transport_map_[&server]; + GPR_ASSERT(entry == nullptr); + entry = transport->Ref(); + return transport; +} + +void FakeXdsTransportFactory::TriggerConnectionFailure( + const XdsBootstrap::XdsServer& server, absl::Status status) { + auto transport = GetTransport(server); + transport->TriggerConnectionFailure(std::move(status)); +} + +RefCountedPtr +FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server, + const char* method, + absl::Duration timeout) { + auto transport = GetTransport(server); + return transport->WaitForStream(method, timeout); +} + +RefCountedPtr +FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server) { + MutexLock lock(&mu_); + RefCountedPtr transport = transport_map_[&server]; + GPR_ASSERT(transport != nullptr); + return transport; +} + +} // namespace grpc_core diff --git a/test/core/xds/xds_transport_fake.h b/test/core/xds/xds_transport_fake.h new file mode 100644 index 00000000000..ed0d746c90e --- /dev/null +++ b/test/core/xds/xds_transport_fake.h @@ -0,0 +1,180 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H +#define GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H + +#include + +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/types/optional.h" + +#include "src/core/ext/xds/xds_bootstrap.h" +#include "src/core/ext/xds/xds_transport.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" + +namespace grpc_core { + +class FakeXdsTransportFactory : public XdsTransportFactory { + private: + class FakeXdsTransport; + + public: + static constexpr char kAdsMethod[] = + "/envoy.service.discovery.v3.AggregatedDiscoveryService/" + "StreamAggregatedResources"; + static constexpr char kAdsV2Method[] = + "/envoy.service.discovery.v2.AggregatedDiscoveryService/" + "StreamAggregatedResources"; + + class FakeStreamingCall : public XdsTransport::StreamingCall { + public: + FakeStreamingCall( + RefCountedPtr transport, const char* method, + std::unique_ptr event_handler) + : transport_(std::move(transport)), + method_(method), + event_handler_(MakeRefCounted( + std::move(event_handler))) {} + + void Orphan() override; + + using StreamingCall::Ref; // Make it public. + + bool HaveMessageFromClient(); + absl::optional WaitForMessageFromClient( + absl::Duration timeout); + + void SendMessageToClient(absl::string_view payload); + void MaybeSendStatusToClient(absl::Status status); + + private: + class RefCountedEventHandler : public RefCounted { + public: + explicit RefCountedEventHandler( + std::unique_ptr event_handler) + : event_handler_(std::move(event_handler)) {} + + void OnRequestSent(bool ok) { event_handler_->OnRequestSent(ok); } + void OnRecvMessage(absl::string_view payload) { + event_handler_->OnRecvMessage(payload); + } + void OnStatusReceived(absl::Status status) { + event_handler_->OnStatusReceived(std::move(status)); + } + + private: + std::unique_ptr event_handler_; + }; + + void SendMessage(std::string payload) override; + + RefCountedPtr transport_; + const char* method_; + + Mutex mu_; + CondVar cv_; + RefCountedPtr event_handler_ ABSL_GUARDED_BY(&mu_); + std::deque from_client_messages_ ABSL_GUARDED_BY(&mu_); + bool status_sent_ ABSL_GUARDED_BY(&mu_) = false; + }; + + FakeXdsTransportFactory() = default; + + using XdsTransportFactory::Ref; // Make it public. + + void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server, + absl::Status status); + + RefCountedPtr WaitForStream( + const XdsBootstrap::XdsServer& server, const char* method, + absl::Duration timeout); + + void Orphan() override { Unref(); } + + private: + class FakeXdsTransport : public XdsTransport { + public: + explicit FakeXdsTransport( + std::function on_connectivity_failure) + : on_connectivity_failure_( + MakeRefCounted( + std::move(on_connectivity_failure))) {} + + void Orphan() override; + + using XdsTransport::Ref; // Make it public. + + void TriggerConnectionFailure(absl::Status status); + + RefCountedPtr WaitForStream(const char* method, + absl::Duration timeout); + + void RemoveStream(const char* method, FakeStreamingCall* call); + + private: + class RefCountedOnConnectivityFailure + : public RefCounted { + public: + explicit RefCountedOnConnectivityFailure( + std::function on_connectivity_failure) + : on_connectivity_failure_(std::move(on_connectivity_failure)) {} + + void Run(absl::Status status) { + on_connectivity_failure_(std::move(status)); + } + + private: + std::function on_connectivity_failure_; + }; + + OrphanablePtr CreateStreamingCall( + const char* method, + std::unique_ptr event_handler) override; + + void ResetBackoff() override {} + + Mutex mu_; + CondVar cv_; + RefCountedPtr on_connectivity_failure_ + ABSL_GUARDED_BY(&mu_); + std::map> + active_calls_ ABSL_GUARDED_BY(&mu_); + }; + + OrphanablePtr Create( + const XdsBootstrap::XdsServer& server, + std::function on_connectivity_failure, + absl::Status* status) override; + + RefCountedPtr GetTransport( + const XdsBootstrap::XdsServer& server); + + Mutex mu_; + std::map> + transport_map_ ABSL_GUARDED_BY(&mu_); +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index 868e0b6966f..16cfa1369b1 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -87,10 +87,10 @@ TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) { const auto response_state = WaitForCdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK); ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::ContainsRegex(absl::StrCat( - kDefaultClusterName, - ": validation error.*DiscoveryType is not valid"))); + EXPECT_EQ(response_state->error_message, + "xDS response validation errors: [resource index 0: cluster_name: " + "INVALID_ARGUMENT: errors parsing CDS resource: [" + "DiscoveryType is not valid.]]"); CheckRpcSendOk(DEBUG_LOCATION); } diff --git a/test/cpp/end2end/xds/xds_core_end2end_test.cc b/test/cpp/end2end/xds/xds_core_end2end_test.cc index 38cc2bc7c79..6dadb621130 100644 --- a/test/cpp/end2end/xds/xds_core_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_core_end2end_test.cc @@ -182,19 +182,18 @@ TEST_P(XdsClientTest, MultipleBadCdsResources) { ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; EXPECT_EQ( response_state->error_message, - absl::StrCat( - "xDS response validation errors: [" - "resource index 0: Can't decode Resource proto wrapper; ", - "resource index 1: foo: " - "INVALID_ARGUMENT: Can't parse Cluster resource.; " - "resource index 3: ", - kClusterName2, - ": validation error: INVALID_ARGUMENT: errors parsing CDS resource: " - "[DiscoveryType is not valid.]; " - "resource index 4: ", - kClusterName3, - ": validation error: INVALID_ARGUMENT: errors parsing CDS resource: " - "[DiscoveryType is not valid.]]")); + absl::StrCat("xDS response validation errors: [" + "resource index 0: Can't decode Resource proto wrapper; ", + "resource index 1: foo: " + "INVALID_ARGUMENT: Can't parse Cluster resource.; " + "resource index 3: ", + kClusterName2, + ": INVALID_ARGUMENT: errors parsing CDS resource: " + "[DiscoveryType is not valid.]; " + "resource index 4: ", + kClusterName3, + ": INVALID_ARGUMENT: errors parsing CDS resource: " + "[DiscoveryType is not valid.]]")); // RPCs for default cluster should succeed. std::vector> metadata_default_cluster = { {"cluster", kDefaultClusterName}, @@ -312,11 +311,10 @@ TEST_P(GlobalXdsClientTest, MultipleBadLdsResources) { balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); const auto response_state = WaitForLdsNack(DEBUG_LOCATION); ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::ContainsRegex(absl::StrCat( - kServerName, - ": validation error.*" - "Listener has neither address nor ApiListener.*"))); + EXPECT_EQ(response_state->error_message, + "xDS response validation errors: [" + "resource index 0: server.example.com: " + "INVALID_ARGUMENT: Listener has neither address nor ApiListener]"); // Need to create a second channel to subscribe to a second LDS resource. auto channel2 = CreateChannel(0, kServerName2); auto stub2 = grpc::testing::EchoTestService::NewStub(channel2); @@ -330,16 +328,13 @@ TEST_P(GlobalXdsClientTest, MultipleBadLdsResources) { // Wait for second NACK to be reported to xDS server. const auto response_state = WaitForLdsNack(DEBUG_LOCATION); ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::ContainsRegex(absl::StrCat( - kServerName, - ": validation error.*" - "Listener has neither address nor ApiListener.*"))); - EXPECT_THAT(response_state->error_message, - ::testing::ContainsRegex(absl::StrCat( - kServerName2, - ": validation error.*" - "Listener has neither address nor ApiListener.*"))); + EXPECT_EQ( + response_state->error_message, + "xDS response validation errors: [" + "resource index 0: server.other.com: " + "INVALID_ARGUMENT: Listener has neither address nor ApiListener; " + "resource index 1: server.example.com: " + "INVALID_ARGUMENT: Listener has neither address nor ApiListener]"); } // Now start a new channel with a third server name, this one with a // valid resource. @@ -371,11 +366,10 @@ TEST_P(GlobalXdsClientTest, InvalidListenerStillExistsIfPreviouslyCached) { const auto response_state = WaitForLdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK); ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT(response_state->error_message, - ::testing::ContainsRegex(absl::StrCat( - kServerName, - ": validation error.*" - "Listener has neither address nor ApiListener"))); + EXPECT_EQ(response_state->error_message, + "xDS response validation errors: [" + "resource index 0: server.example.com: " + "INVALID_ARGUMENT: Listener has neither address nor ApiListener]"); CheckRpcSendOk(DEBUG_LOCATION); } diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index e85edc2a602..189092557cc 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -7869,6 +7869,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "xds_client_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,