Revert "XdsClient: add unit test and fix watcher notification bugs (#30823)" (#30942)

This reverts commit bcd8c991e6.
pull/30945/head
Richard Belleville 2 years ago committed by GitHub
parent 2297249e47
commit 6d2c4a8314
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 49
      CMakeLists.txt
  2. 15
      build_autogenerated.yaml
  3. 204
      src/core/ext/xds/xds_client.cc
  4. 17
      src/core/ext/xds/xds_client.h
  5. 6
      src/core/ext/xds/xds_transport_grpc.cc
  6. 7
      src/core/lib/json/json_object_loader.h
  7. 40
      test/core/xds/BUILD
  8. 2303
      test/core/xds/xds_client_test.cc
  9. 230
      test/core/xds/xds_transport_fake.cc
  10. 180
      test/core/xds/xds_transport_fake.h
  11. 8
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  12. 42
      test/cpp/end2end/xds/xds_core_end2end_test.cc
  13. 24
      tools/run_tests/generated/tests.json

49
CMakeLists.txt generated

@ -1248,7 +1248,6 @@ if(gRPC_BUILD_TESTS)
endif()
add_dependencies(buildtests_cxx xds_bootstrap_test)
add_dependencies(buildtests_cxx xds_certificate_provider_test)
add_dependencies(buildtests_cxx xds_client_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx xds_cluster_end2end_test)
endif()
@ -19890,54 +19889,6 @@ target_link_libraries(xds_certificate_provider_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(xds_client_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h
test/core/xds/xds_client_test.cc
test/core/xds/xds_transport_fake.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(xds_client_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(xds_client_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

@ -10851,21 +10851,6 @@ targets:
- test/core/xds/xds_certificate_provider_test.cc
deps:
- grpc_test_util
- name: xds_client_test
gtest: true
build: test
language: c++
headers:
- test/core/xds/xds_transport_fake.h
src:
- src/proto/grpc/testing/xds/v3/base.proto
- src/proto/grpc/testing/xds/v3/discovery.proto
- src/proto/grpc/testing/xds/v3/percent.proto
- test/core/xds/xds_client_test.cc
- test/core/xds/xds_transport_fake.cc
deps:
- grpc_test_util
uses_polling: false
- name: xds_cluster_end2end_test
gtest: true
build: test

@ -433,11 +433,11 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
server,
[self = WeakRef(DEBUG_LOCATION, "OnConnectivityFailure")](
absl::Status status) {
self->OnConnectivityFailure(std::move(status));
self->OnConnectivityStateChange(std::move(status));
},
&status);
GPR_ASSERT(transport_ != nullptr);
if (!status.ok()) SetChannelStatusLocked(std::move(status));
if (!status.ok()) OnConnectivityStateChangeLocked(std::move(status));
}
XdsClient::ChannelState::~ChannelState() {
@ -475,6 +475,10 @@ XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
return lrs_calld_->calld();
}
bool XdsClient::ChannelState::HasActiveAdsCall() const {
return ads_calld_ != nullptr && ads_calld_->calld() != nullptr;
}
void XdsClient::ChannelState::MaybeStartLrsCall() {
if (lrs_calld_ != nullptr) return;
lrs_calld_.reset(new RetryableCall<LrsCallState>(
@ -518,54 +522,27 @@ void XdsClient::ChannelState::UnsubscribeLocked(const XdsResourceType* type,
}
}
void XdsClient::ChannelState::OnConnectivityFailure(absl::Status status) {
void XdsClient::ChannelState::OnConnectivityStateChange(absl::Status status) {
{
MutexLock lock(&xds_client_->mu_);
SetChannelStatusLocked(std::move(status));
OnConnectivityStateChangeLocked(std::move(status));
}
xds_client_->work_serializer_.DrainQueue();
}
void XdsClient::ChannelState::SetChannelStatusLocked(absl::Status status) {
if (shutting_down_) return;
status = absl::Status(status.code(), absl::StrCat("xDS channel for server ",
server_.server_uri(), ": ",
status.message()));
gpr_log(GPR_INFO, "[xds_client %p] %s", xds_client(),
void XdsClient::ChannelState::OnConnectivityStateChangeLocked(
absl::Status status) {
if (!shutting_down_) {
// Notify all watchers of error.
gpr_log(GPR_INFO,
"[xds_client %p] xds channel for server %s in "
"state TRANSIENT_FAILURE: %s",
xds_client(), server_.server_uri().c_str(),
status.ToString().c_str());
// If the node ID is set, append that to the status message that we send to
// the watchers, so that it will appear in log messages visible to users.
const auto* node = xds_client_->bootstrap_->node();
if (node != nullptr) {
status = absl::Status(
status.code(),
absl::StrCat(status.message(),
" (node ID:", xds_client_->bootstrap_->node()->id(), ")"));
xds_client_->NotifyOnErrorLocked(absl::UnavailableError(
absl::StrCat("xds channel in TRANSIENT_FAILURE, connectivity error: ",
status.ToString())));
}
// Save status in channel, so that we can immediately generate an
// error for any new watchers that may be started.
status_ = status;
// Find all watchers for this channel.
std::set<RefCountedPtr<ResourceWatcherInterface>> watchers;
for (const auto& a : xds_client_->authority_state_map_) { // authority
if (a.second.channel_state != this) continue;
for (const auto& t : a.second.resource_map) { // type
for (const auto& r : t.second) { // resource id
for (const auto& w : r.second.watchers) { // watchers
watchers.insert(w.second);
}
}
}
}
// Enqueue notification for the watchers.
xds_client_->work_serializer_.Schedule(
[watchers = std::move(watchers), status = std::move(status)]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(xds_client_->work_serializer_) {
for (const auto& watcher : watchers) {
watcher->OnError(status);
}
},
DEBUG_LOCATION);
}
//
@ -732,32 +709,19 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
XdsResourceType::DecodeContext context = {
xds_client(), ads_call_state_->chand()->server_, &grpc_xds_client_trace,
xds_client()->symtab_.ptr(), arena};
absl::StatusOr<XdsResourceType::DecodeResult> decode_result =
absl::StatusOr<XdsResourceType::DecodeResult> result =
result_.type->Decode(context, serialized_resource, is_v2);
// If we didn't already have the resource name from the Resource
// wrapper, try to get it from the decoding result.
if (resource_name.empty()) {
if (decode_result.ok()) {
resource_name = decode_result->name;
error_prefix =
absl::StrCat("resource index ", idx, ": ", resource_name, ": ");
} else {
// We don't have any way of determining the resource name, so
// there's nothing more we can do here.
if (!result.ok()) {
result_.errors.emplace_back(
absl::StrCat(error_prefix, decode_result.status().ToString()));
absl::StrCat(error_prefix, result.status().ToString()));
return;
}
}
// If decoding failed, make sure we include the error in the NACK.
const absl::Status& decode_status = decode_result.ok()
? decode_result->resource.status()
: decode_result.status();
if (!decode_status.ok()) {
result_.errors.emplace_back(
absl::StrCat(error_prefix, decode_status.ToString()));
}
// Check the resource name.
if (resource_name.empty()) {
resource_name = result->name;
error_prefix =
absl::StrCat("resource index ", idx, ": ", resource_name, ": ");
}
auto parsed_resource_name =
xds_client()->ParseXdsResourceName(resource_name, result_.type);
if (!parsed_resource_name.ok()) {
@ -814,12 +778,16 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
resource_state.ignored_deletion = false;
}
// Update resource state based on whether the resource is valid.
if (!decode_status.ok()) {
if (!result->resource.ok()) {
result_.errors.emplace_back(absl::StrCat(
error_prefix,
"validation error: ", result->resource.status().ToString()));
xds_client()->NotifyWatchersOnErrorLocked(
resource_state.watchers,
absl::UnavailableError(
absl::StrCat("invalid resource: ", decode_status.ToString())));
UpdateResourceMetadataNacked(result_.version, decode_status.ToString(),
absl::UnavailableError(absl::StrCat(
"invalid resource: ", result->resource.status().ToString())));
UpdateResourceMetadataNacked(result_.version,
result->resource.status().ToString(),
update_time_, &resource_state.meta);
return;
}
@ -828,7 +796,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
// If it didn't change, ignore it.
if (resource_state.resource != nullptr &&
result_.type->ResourcesEqual(resource_state.resource.get(),
decode_result->resource->get())) {
result->resource->get())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] %s resource %s identical to current, ignoring.",
@ -838,7 +806,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
return;
}
// Update the resource state.
resource_state.resource = std::move(*decode_result->resource);
resource_state.resource = std::move(*result->resource);
resource_state.meta = CreateResourceMetadataAcked(
std::string(serialized_resource), result_.version, update_time_);
// Notify watchers.
@ -1019,12 +987,11 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage(
status.ToString().c_str());
} else {
seen_response_ = true;
chand()->status_ = absl::OkStatus();
AdsResponseParser::Result result = parser.TakeResult();
// Update nonce.
auto& state = state_map_[result.type];
state.nonce = result.nonce;
// If we got an error, set state.status so that we'll NACK the update.
// If we got an error, set state.error so that we'll NACK the update.
if (!result.errors.empty()) {
state.status = absl::UnavailableError(
absl::StrCat("xDS response validation errors: [",
@ -1117,9 +1084,10 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
if (IsCurrentCallOnChannel()) {
// Try to restart the call.
parent_->OnCallFinishedLocked();
// Send error to all watchers for the channel.
chand()->SetChannelStatusLocked(absl::UnavailableError(absl::StrFormat(
"xDS call failed; status: %s", status.ToString().c_str())));
// Send error to all watchers.
xds_client()->NotifyOnErrorLocked(absl::UnavailableError(absl::StrFormat(
"xDS call failed: xDS server: %s, ADS call status: %s",
chand()->server_.server_uri(), status.ToString().c_str())));
}
}
xds_client()->work_serializer_.DrainQueue();
@ -1493,16 +1461,17 @@ void XdsClient::WatchResource(const XdsResourceType* type,
invalid_watchers_[w] = watcher;
}
work_serializer_.Run(
[watcher = std::move(watcher), status = std::move(status)]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
// TODO(yashykt): When we move to C++14, capture watcher using
// std::move()
[watcher, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
watcher->OnError(status);
},
DEBUG_LOCATION);
};
auto resource_name = ParseXdsResourceName(name, type);
if (!resource_name.ok()) {
fail(absl::UnavailableError(
absl::StrCat("Unable to parse resource name ", name)));
fail(absl::UnavailableError(absl::StrFormat(
"Unable to parse resource name for listener %s", name)));
return;
}
// Find server to use.
@ -1542,39 +1511,6 @@ void XdsClient::WatchResource(const XdsResourceType* type,
delete value;
},
DEBUG_LOCATION);
} else if (resource_state.meta.client_status ==
XdsApi::ResourceMetadata::DOES_NOT_EXIST) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] reporting cached does-not-exist for %s", this,
std::string(name).c_str());
}
work_serializer_.Schedule(
[watcher]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
watcher->OnResourceDoesNotExist();
},
DEBUG_LOCATION);
} else if (resource_state.meta.client_status ==
XdsApi::ResourceMetadata::NACKED) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(
GPR_INFO,
"[xds_client %p] reporting cached validation failure for %s: %s",
this, std::string(name).c_str(),
resource_state.meta.failed_details.c_str());
}
std::string details = resource_state.meta.failed_details;
const auto* node = bootstrap_->node();
if (node != nullptr) {
absl::StrAppend(&details, " (node ID:", bootstrap_->node()->id(), ")");
}
work_serializer_.Schedule(
[watcher, details = std::move(details)]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
watcher->OnError(absl::UnavailableError(
absl::StrCat("invalid resource: ", details)));
},
DEBUG_LOCATION);
}
// If the authority doesn't yet have a channel, set it, creating it if
// needed.
@ -1582,21 +1518,6 @@ void XdsClient::WatchResource(const XdsResourceType* type,
authority_state.channel_state =
GetOrCreateChannelStateLocked(*xds_server, "start watch");
}
absl::Status channel_status = authority_state.channel_state->status();
if (!channel_status.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] returning cached channel error for %s: %s",
this, std::string(name).c_str(),
channel_status.ToString().c_str());
}
work_serializer_.Schedule(
[watcher = std::move(watcher), status = std::move(channel_status)]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) mutable {
watcher->OnError(std::move(status));
},
DEBUG_LOCATION);
}
authority_state.channel_state->SubscribeLocked(type, *resource_name);
}
work_serializer_.DrainQueue();
@ -1860,6 +1781,34 @@ void XdsClient::ResetBackoff() {
}
}
void XdsClient::NotifyOnErrorLocked(absl::Status status) {
const auto* node = bootstrap_->node();
if (node != nullptr) {
status = absl::Status(
status.code(),
absl::StrCat(status.message(), " (node ID:", node->id(), ")"));
}
std::set<RefCountedPtr<ResourceWatcherInterface>> watchers;
for (const auto& a : authority_state_map_) { // authority
for (const auto& t : a.second.resource_map) { // type
for (const auto& r : t.second) { // resource id
for (const auto& w : r.second.watchers) { // watchers
watchers.insert(w.second);
}
}
}
}
work_serializer_.Schedule(
// TODO(yashykt): When we move to C++14, capture watchers using
// std::move()
[watchers, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) {
for (const auto& watcher : watchers) {
watcher->OnError(status);
}
},
DEBUG_LOCATION);
}
void XdsClient::NotifyWatchersOnErrorLocked(
const std::map<ResourceWatcherInterface*,
RefCountedPtr<ResourceWatcherInterface>>& watchers,
@ -1871,8 +1820,7 @@ void XdsClient::NotifyWatchersOnErrorLocked(
absl::StrCat(status.message(), " (node ID:", node->id(), ")"));
}
work_serializer_.Schedule(
[watchers, status = std::move(status)]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
[watchers, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
for (const auto& p : watchers) {
p.first->OnError(status);
}

@ -186,9 +186,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
void MaybeStartLrsCall();
void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
// Returns non-OK if there has been an error since the last time the
// ADS stream saw a response.
const absl::Status& status() const { return status_; }
bool HasAdsCall() const;
bool HasActiveAdsCall() const;
void SubscribeLocked(const XdsResourceType* type,
const XdsResourceName& name)
@ -199,11 +198,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
private:
void OnConnectivityFailure(absl::Status status);
// Enqueues error notifications to watchers. Caller must drain
// XdsClient::work_serializer_ after releasing the lock.
void SetChannelStatusLocked(absl::Status status)
void OnConnectivityStateChange(absl::Status status);
void OnConnectivityStateChangeLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
// The owning xds client.
@ -222,8 +218,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
// Stores the most recent accepted resource version for each resource type.
std::map<const XdsResourceType*, std::string /*version*/>
resource_type_version_map_;
absl::Status status_;
};
struct ResourceState {
@ -265,6 +259,9 @@ class XdsClient : public DualRefCounted<XdsClient> {
LoadReportMap load_report_map;
};
// Sends an error notification to all watchers.
void NotifyOnErrorLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Sends an error notification to a specific set of watchers.
void NotifyWatchersOnErrorLocked(
const std::map<ResourceWatcherInterface*,

@ -24,8 +24,6 @@
#include <memory>
#include <utility>
#include "absl/strings/str_cat.h"
#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/grpc.h>
@ -237,9 +235,7 @@ class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
on_connectivity_failure_(absl::Status(
status.code(),
absl::StrCat("channel in TRANSIENT_FAILURE: ", status.message())));
on_connectivity_failure_(status);
}
}

@ -51,7 +51,7 @@
// struct Foo {
// int a;
// int b;
// static const JsonLoaderInterface* JsonLoader(const JsonArgs& args) {
// static const JsonLoaderInterface* JsonLoader() {
// // Note: Field names must be string constants; they are not copied.
// static const auto* loader = JsonObjectLoader<Foo>()
// .Field("a", &Foo::a)
@ -60,10 +60,7 @@
// return loader;
// }
// // Optional; omit if no post-processing needed.
// void JsonPostLoad(const Json& source, const JsonArgs& args,
// ErrorList* errors) {
// ++a;
// }
// void JsonPostLoad(const Json& source, ErrorList* errors) { ++a; }
// };
// Now we can load Foo objects from JSON:
// absl::StatusOr<Foo> foo = LoadFromJson<Foo>(json);

@ -12,12 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load(
"//bazel:grpc_build_system.bzl",
"grpc_cc_library",
"grpc_cc_test",
"grpc_package",
)
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
grpc_package(name = "test/core/xds")
@ -121,36 +116,3 @@ grpc_cc_test(
"//test/cpp/util:grpc_cli_utils",
],
)
grpc_cc_library(
name = "xds_transport_fake",
testonly = True,
srcs = ["xds_transport_fake.cc"],
hdrs = ["xds_transport_fake.h"],
external_deps = [
"absl/strings",
"absl/types:optional",
],
language = "C++",
deps = [
"//:orphanable",
"//:ref_counted_ptr",
"//:xds_client",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "xds_client_test",
srcs = ["xds_client_test.cc"],
external_deps = ["gtest"],
language = "C++",
uses_event_engine = True,
uses_polling = False,
deps = [
":xds_transport_fake",
"//:xds_client",
"//src/proto/grpc/testing/xds/v3:discovery_proto",
"//test/core/util:grpc_test_util",
],
)

File diff suppressed because it is too large Load Diff

@ -1,230 +0,0 @@
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "test/core/xds/xds_transport_fake.h"
#include <functional>
#include <memory>
#include <utility>
#include <grpc/event_engine/event_engine.h>
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/util/test_config.h"
using grpc_event_engine::experimental::GetDefaultEventEngine;
namespace grpc_core {
//
// FakeXdsTransportFactory::FakeStreamingCall
//
void FakeXdsTransportFactory::FakeStreamingCall::Orphan() {
{
MutexLock lock(&mu_);
// Can't call event_handler_->OnStatusReceived() or unref event_handler_
// synchronously, since those operations will trigger code in
// XdsClient that acquires its mutex, but it was already holding its
// mutex when it called us, so it would deadlock.
GetDefaultEventEngine()->Run([event_handler = std::move(event_handler_),
status_sent = status_sent_]() mutable {
ExecCtx exec_ctx;
if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus());
event_handler.reset();
});
status_sent_ = true;
}
transport_->RemoveStream(method_, this);
Unref();
}
void FakeXdsTransportFactory::FakeStreamingCall::SendMessage(
std::string payload) {
MutexLock lock(&mu_);
from_client_messages_.push_back(std::move(payload));
cv_.Signal();
// Can't call event_handler_->OnRequestSent() synchronously, since that
// operation will trigger code in XdsClient that acquires its mutex, but it
// was already holding its mutex when it called us, so it would deadlock.
GetDefaultEventEngine()->Run(
[event_handler = event_handler_->Ref()]() mutable {
ExecCtx exec_ctx;
event_handler->OnRequestSent(/*ok=*/true);
event_handler.reset();
});
}
bool FakeXdsTransportFactory::FakeStreamingCall::HaveMessageFromClient() {
MutexLock lock(&mu_);
return !from_client_messages_.empty();
}
absl::optional<std::string>
FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient(
absl::Duration timeout) {
MutexLock lock(&mu_);
while (from_client_messages_.empty()) {
if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) {
return absl::nullopt;
}
}
std::string payload = from_client_messages_.front();
from_client_messages_.pop_front();
return payload;
}
void FakeXdsTransportFactory::FakeStreamingCall::SendMessageToClient(
absl::string_view payload) {
ExecCtx exec_ctx;
RefCountedPtr<RefCountedEventHandler> event_handler;
{
MutexLock lock(&mu_);
event_handler = event_handler_->Ref();
}
event_handler->OnRecvMessage(payload);
}
void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient(
absl::Status status) {
ExecCtx exec_ctx;
RefCountedPtr<RefCountedEventHandler> event_handler;
{
MutexLock lock(&mu_);
if (status_sent_) return;
status_sent_ = true;
event_handler = event_handler_->Ref();
}
event_handler->OnStatusReceived(std::move(status));
}
//
// FakeXdsTransportFactory::FakeXdsTransport
//
void FakeXdsTransportFactory::FakeXdsTransport::TriggerConnectionFailure(
absl::Status status) {
RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure;
{
MutexLock lock(&mu_);
on_connectivity_failure = on_connectivity_failure_->Ref();
}
ExecCtx exec_ctx;
if (on_connectivity_failure != nullptr) {
on_connectivity_failure->Run(std::move(status));
}
}
void FakeXdsTransportFactory::FakeXdsTransport::Orphan() {
{
MutexLock lock(&mu_);
// Can't destroy on_connectivity_failure_ synchronously, since that
// operation will trigger code in XdsClient that acquires its mutex, but
// it was already holding its mutex when it called us, so it would deadlock.
GetDefaultEventEngine()->Run([on_connectivity_failure = std::move(
on_connectivity_failure_)]() mutable {
ExecCtx exec_ctx;
on_connectivity_failure.reset();
});
}
Unref();
}
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
FakeXdsTransportFactory::FakeXdsTransport::WaitForStream(
const char* method, absl::Duration timeout) {
MutexLock lock(&mu_);
auto it = active_calls_.find(method);
while (it == active_calls_.end() || it->second == nullptr) {
if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) {
return nullptr;
}
it = active_calls_.find(method);
}
return it->second;
}
void FakeXdsTransportFactory::FakeXdsTransport::RemoveStream(
const char* method, FakeStreamingCall* call) {
MutexLock lock(&mu_);
auto it = active_calls_.find(method);
if (it != active_calls_.end() && it->second.get() == call) {
active_calls_.erase(it);
}
}
OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall(
const char* method,
std::unique_ptr<StreamingCall::EventHandler> event_handler) {
auto call = MakeOrphanable<FakeStreamingCall>(Ref(), method,
std::move(event_handler));
MutexLock lock(&mu_);
active_calls_[method] = call->Ref();
cv_.Signal();
return call;
}
//
// FakeXdsTransportFactory
//
constexpr char FakeXdsTransportFactory::kAdsMethod[];
constexpr char FakeXdsTransportFactory::kAdsV2Method[];
OrphanablePtr<XdsTransportFactory::XdsTransport>
FakeXdsTransportFactory::Create(
const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
absl::Status* /*status*/) {
auto transport =
MakeOrphanable<FakeXdsTransport>(std::move(on_connectivity_failure));
MutexLock lock(&mu_);
auto& entry = transport_map_[&server];
GPR_ASSERT(entry == nullptr);
entry = transport->Ref();
return transport;
}
void FakeXdsTransportFactory::TriggerConnectionFailure(
const XdsBootstrap::XdsServer& server, absl::Status status) {
auto transport = GetTransport(server);
transport->TriggerConnectionFailure(std::move(status));
}
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server,
const char* method,
absl::Duration timeout) {
auto transport = GetTransport(server);
return transport->WaitForStream(method, timeout);
}
RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport>
FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server) {
MutexLock lock(&mu_);
RefCountedPtr<FakeXdsTransport> transport = transport_map_[&server];
GPR_ASSERT(transport != nullptr);
return transport;
}
} // namespace grpc_core

@ -1,180 +0,0 @@
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H
#define GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H
#include <grpc/support/port_platform.h>
#include <deque>
#include <functional>
#include <memory>
#include <string>
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_transport.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_core {
class FakeXdsTransportFactory : public XdsTransportFactory {
private:
class FakeXdsTransport;
public:
static constexpr char kAdsMethod[] =
"/envoy.service.discovery.v3.AggregatedDiscoveryService/"
"StreamAggregatedResources";
static constexpr char kAdsV2Method[] =
"/envoy.service.discovery.v2.AggregatedDiscoveryService/"
"StreamAggregatedResources";
class FakeStreamingCall : public XdsTransport::StreamingCall {
public:
FakeStreamingCall(
RefCountedPtr<FakeXdsTransport> transport, const char* method,
std::unique_ptr<StreamingCall::EventHandler> event_handler)
: transport_(std::move(transport)),
method_(method),
event_handler_(MakeRefCounted<RefCountedEventHandler>(
std::move(event_handler))) {}
void Orphan() override;
using StreamingCall::Ref; // Make it public.
bool HaveMessageFromClient();
absl::optional<std::string> WaitForMessageFromClient(
absl::Duration timeout);
void SendMessageToClient(absl::string_view payload);
void MaybeSendStatusToClient(absl::Status status);
private:
class RefCountedEventHandler : public RefCounted<RefCountedEventHandler> {
public:
explicit RefCountedEventHandler(
std::unique_ptr<StreamingCall::EventHandler> event_handler)
: event_handler_(std::move(event_handler)) {}
void OnRequestSent(bool ok) { event_handler_->OnRequestSent(ok); }
void OnRecvMessage(absl::string_view payload) {
event_handler_->OnRecvMessage(payload);
}
void OnStatusReceived(absl::Status status) {
event_handler_->OnStatusReceived(std::move(status));
}
private:
std::unique_ptr<StreamingCall::EventHandler> event_handler_;
};
void SendMessage(std::string payload) override;
RefCountedPtr<FakeXdsTransport> transport_;
const char* method_;
Mutex mu_;
CondVar cv_;
RefCountedPtr<RefCountedEventHandler> event_handler_ ABSL_GUARDED_BY(&mu_);
std::deque<std::string> from_client_messages_ ABSL_GUARDED_BY(&mu_);
bool status_sent_ ABSL_GUARDED_BY(&mu_) = false;
};
FakeXdsTransportFactory() = default;
using XdsTransportFactory::Ref; // Make it public.
void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server,
absl::Status status);
RefCountedPtr<FakeStreamingCall> WaitForStream(
const XdsBootstrap::XdsServer& server, const char* method,
absl::Duration timeout);
void Orphan() override { Unref(); }
private:
class FakeXdsTransport : public XdsTransport {
public:
explicit FakeXdsTransport(
std::function<void(absl::Status)> on_connectivity_failure)
: on_connectivity_failure_(
MakeRefCounted<RefCountedOnConnectivityFailure>(
std::move(on_connectivity_failure))) {}
void Orphan() override;
using XdsTransport::Ref; // Make it public.
void TriggerConnectionFailure(absl::Status status);
RefCountedPtr<FakeStreamingCall> WaitForStream(const char* method,
absl::Duration timeout);
void RemoveStream(const char* method, FakeStreamingCall* call);
private:
class RefCountedOnConnectivityFailure
: public RefCounted<RefCountedOnConnectivityFailure> {
public:
explicit RefCountedOnConnectivityFailure(
std::function<void(absl::Status)> on_connectivity_failure)
: on_connectivity_failure_(std::move(on_connectivity_failure)) {}
void Run(absl::Status status) {
on_connectivity_failure_(std::move(status));
}
private:
std::function<void(absl::Status)> on_connectivity_failure_;
};
OrphanablePtr<StreamingCall> CreateStreamingCall(
const char* method,
std::unique_ptr<StreamingCall::EventHandler> event_handler) override;
void ResetBackoff() override {}
Mutex mu_;
CondVar cv_;
RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure_
ABSL_GUARDED_BY(&mu_);
std::map<std::string /*method*/, RefCountedPtr<FakeStreamingCall>>
active_calls_ ABSL_GUARDED_BY(&mu_);
};
OrphanablePtr<XdsTransport> Create(
const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
absl::Status* status) override;
RefCountedPtr<FakeXdsTransport> GetTransport(
const XdsBootstrap::XdsServer& server);
Mutex mu_;
std::map<const XdsBootstrap::XdsServer*, RefCountedPtr<FakeXdsTransport>>
transport_map_ ABSL_GUARDED_BY(&mu_);
};
} // namespace grpc_core
#endif // GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H

@ -87,10 +87,10 @@ TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) {
const auto response_state =
WaitForCdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK);
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_EQ(response_state->error_message,
"xDS response validation errors: [resource index 0: cluster_name: "
"INVALID_ARGUMENT: errors parsing CDS resource: ["
"DiscoveryType is not valid.]]");
EXPECT_THAT(response_state->error_message,
::testing::ContainsRegex(absl::StrCat(
kDefaultClusterName,
": validation error.*DiscoveryType is not valid")));
CheckRpcSendOk(DEBUG_LOCATION);
}

@ -182,17 +182,18 @@ TEST_P(XdsClientTest, MultipleBadCdsResources) {
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_EQ(
response_state->error_message,
absl::StrCat("xDS response validation errors: ["
absl::StrCat(
"xDS response validation errors: ["
"resource index 0: Can't decode Resource proto wrapper; ",
"resource index 1: foo: "
"INVALID_ARGUMENT: Can't parse Cluster resource.; "
"resource index 3: ",
kClusterName2,
": INVALID_ARGUMENT: errors parsing CDS resource: "
": validation error: INVALID_ARGUMENT: errors parsing CDS resource: "
"[DiscoveryType is not valid.]; "
"resource index 4: ",
kClusterName3,
": INVALID_ARGUMENT: errors parsing CDS resource: "
": validation error: INVALID_ARGUMENT: errors parsing CDS resource: "
"[DiscoveryType is not valid.]]"));
// RPCs for default cluster should succeed.
std::vector<std::pair<std::string, std::string>> metadata_default_cluster = {
@ -311,10 +312,11 @@ TEST_P(GlobalXdsClientTest, MultipleBadLdsResources) {
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
const auto response_state = WaitForLdsNack(DEBUG_LOCATION);
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_EQ(response_state->error_message,
"xDS response validation errors: ["
"resource index 0: server.example.com: "
"INVALID_ARGUMENT: Listener has neither address nor ApiListener]");
EXPECT_THAT(response_state->error_message,
::testing::ContainsRegex(absl::StrCat(
kServerName,
": validation error.*"
"Listener has neither address nor ApiListener.*")));
// Need to create a second channel to subscribe to a second LDS resource.
auto channel2 = CreateChannel(0, kServerName2);
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
@ -328,13 +330,16 @@ TEST_P(GlobalXdsClientTest, MultipleBadLdsResources) {
// Wait for second NACK to be reported to xDS server.
const auto response_state = WaitForLdsNack(DEBUG_LOCATION);
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_EQ(
response_state->error_message,
"xDS response validation errors: ["
"resource index 0: server.other.com: "
"INVALID_ARGUMENT: Listener has neither address nor ApiListener; "
"resource index 1: server.example.com: "
"INVALID_ARGUMENT: Listener has neither address nor ApiListener]");
EXPECT_THAT(response_state->error_message,
::testing::ContainsRegex(absl::StrCat(
kServerName,
": validation error.*"
"Listener has neither address nor ApiListener.*")));
EXPECT_THAT(response_state->error_message,
::testing::ContainsRegex(absl::StrCat(
kServerName2,
": validation error.*"
"Listener has neither address nor ApiListener.*")));
}
// Now start a new channel with a third server name, this one with a
// valid resource.
@ -366,10 +371,11 @@ TEST_P(GlobalXdsClientTest, InvalidListenerStillExistsIfPreviouslyCached) {
const auto response_state =
WaitForLdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK);
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_EQ(response_state->error_message,
"xDS response validation errors: ["
"resource index 0: server.example.com: "
"INVALID_ARGUMENT: Listener has neither address nor ApiListener]");
EXPECT_THAT(response_state->error_message,
::testing::ContainsRegex(absl::StrCat(
kServerName,
": validation error.*"
"Listener has neither address nor ApiListener")));
CheckRpcSendOk(DEBUG_LOCATION);
}

@ -7891,30 +7891,6 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "xds_client_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save