[xds_client_test] change test to use FuzzingEventEngine (#37668)

Closes #37668

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37668 from markdroth:xds_client_test_fuzzing_ee e1fe7f56c2
PiperOrigin-RevId: 681150007
pull/37820/head
Mark D. Roth 4 months ago committed by Copybara-Service
parent 74e640341d
commit 9a12ec91e1
  1. 6
      CMakeLists.txt
  2. 5
      build_autogenerated.yaml
  3. 17
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  4. 5
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
  5. 9
      test/core/xds/BUILD
  6. 40
      test/core/xds/xds_client_fuzzer.cc
  7. 2
      test/core/xds/xds_client_fuzzer.proto
  8. 267
      test/core/xds/xds_client_test.cc
  9. 79
      test/core/xds/xds_transport_fake.cc
  10. 44
      test/core/xds/xds_transport_fake.h

6
CMakeLists.txt generated

@ -34193,8 +34193,14 @@ add_executable(xds_client_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
src/cpp/client/global_callback_hook.cc src/cpp/client/global_callback_hook.cc
src/cpp/util/status.cc src/cpp/util/status.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
test/core/xds/xds_client_test.cc test/core/xds/xds_client_test.cc
test/core/xds/xds_transport_fake.cc test/core/xds/xds_transport_fake.cc
) )

@ -21562,6 +21562,8 @@ targets:
build: test build: test
language: c++ language: c++
headers: headers:
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/test_util/scoped_env_var.h - test/core/test_util/scoped_env_var.h
- test/core/xds/xds_client_test_peer.h - test/core/xds/xds_client_test_peer.h
- test/core/xds/xds_transport_fake.h - test/core/xds/xds_transport_fake.h
@ -21569,8 +21571,11 @@ targets:
- src/proto/grpc/testing/xds/v3/base.proto - src/proto/grpc/testing/xds/v3/base.proto
- src/proto/grpc/testing/xds/v3/discovery.proto - src/proto/grpc/testing/xds/v3/discovery.proto
- src/proto/grpc/testing/xds/v3/percent.proto - src/proto/grpc/testing/xds/v3/percent.proto
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- src/cpp/client/global_callback_hook.cc - src/cpp/client/global_callback_hook.cc
- src/cpp/util/status.cc - src/cpp/util/status.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
- test/core/xds/xds_client_test.cc - test/core/xds/xds_client_test.cc
- test/core/xds/xds_transport_fake.cc - test/core/xds/xds_transport_fake.cc
deps: deps:

@ -194,16 +194,23 @@ void FuzzingEventEngine::TickUntilIdle() {
<< "TickUntilIdle: " << "TickUntilIdle: "
<< GRPC_DUMP_ARGS(tasks_by_id_.size(), outstanding_reads_.load(), << GRPC_DUMP_ARGS(tasks_by_id_.size(), outstanding_reads_.load(),
outstanding_writes_.load()); outstanding_writes_.load());
if (tasks_by_id_.empty() && if (IsIdleLocked()) return;
outstanding_writes_.load(std::memory_order_relaxed) == 0 &&
outstanding_reads_.load(std::memory_order_relaxed) == 0) {
return;
}
} }
Tick(); Tick();
} }
} }
bool FuzzingEventEngine::IsIdle() {
grpc_core::MutexLock lock(&*mu_);
return IsIdleLocked();
}
bool FuzzingEventEngine::IsIdleLocked() {
return tasks_by_id_.empty() &&
outstanding_writes_.load(std::memory_order_relaxed) == 0 &&
outstanding_reads_.load(std::memory_order_relaxed) == 0;
}
void FuzzingEventEngine::TickUntil(Time t) { void FuzzingEventEngine::TickUntil(Time t) {
while (true) { while (true) {
auto now = Now(); auto now = Now();

@ -72,6 +72,8 @@ class FuzzingEventEngine : public EventEngine {
ABSL_LOCKS_EXCLUDED(mu_); ABSL_LOCKS_EXCLUDED(mu_);
// Repeatedly call Tick() until there is no more work to do. // Repeatedly call Tick() until there is no more work to do.
void TickUntilIdle() ABSL_LOCKS_EXCLUDED(mu_); void TickUntilIdle() ABSL_LOCKS_EXCLUDED(mu_);
// Returns true if idle.
bool IsIdle() ABSL_LOCKS_EXCLUDED(mu_);
// Tick until some time // Tick until some time
void TickUntil(Time t) ABSL_LOCKS_EXCLUDED(mu_); void TickUntil(Time t) ABSL_LOCKS_EXCLUDED(mu_);
// Tick for some duration // Tick for some duration
@ -296,6 +298,9 @@ class FuzzingEventEngine : public EventEngine {
int AllocatePort() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); int AllocatePort() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Is the given port in use by any listener? // Is the given port in use by any listener?
bool IsPortUsed(int port) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); bool IsPortUsed(int port) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
bool IsIdleLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// For the next connection being built, query the list of fuzzer selected // For the next connection being built, query the list of fuzzer selected
// write size limits. // write size limits.
std::queue<size_t> WriteSizesForConnection() std::queue<size_t> WriteSizesForConnection()

@ -145,6 +145,7 @@ grpc_cc_library(
"//:orphanable", "//:orphanable",
"//:ref_counted_ptr", "//:ref_counted_ptr",
"//:xds_client", "//:xds_client",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/test_util:grpc_test_util", "//test/core/test_util:grpc_test_util",
], ],
) )
@ -166,7 +167,6 @@ grpc_cc_test(
srcs = ["xds_client_test.cc"], srcs = ["xds_client_test.cc"],
external_deps = ["gtest"], external_deps = ["gtest"],
language = "C++", language = "C++",
shard_count = 10,
uses_event_engine = True, uses_event_engine = True,
uses_polling = False, uses_polling = False,
deps = [ deps = [
@ -174,6 +174,8 @@ grpc_cc_test(
":xds_transport_fake", ":xds_transport_fake",
"//:xds_client", "//:xds_client",
"//src/proto/grpc/testing/xds/v3:discovery_proto", "//src/proto/grpc/testing/xds/v3:discovery_proto",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/test_util:grpc_test_util", "//test/core/test_util:grpc_test_util",
"//test/core/test_util:scoped_env_var", "//test/core/test_util:scoped_env_var",
], ],
@ -188,9 +190,10 @@ grpc_proto_fuzzer(
proto = "xds_client_fuzzer.proto", proto = "xds_client_fuzzer.proto",
proto_deps = [ proto_deps = [
"//src/proto/grpc/testing/xds/v3:discovery_proto", "//src/proto/grpc/testing/xds/v3:discovery_proto",
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
], ],
tags = ["no_windows"], tags = ["no_windows"],
uses_event_engine = False, uses_event_engine = True,
uses_polling = False, uses_polling = False,
deps = [ deps = [
":xds_client_test_peer", ":xds_client_test_peer",
@ -206,6 +209,8 @@ grpc_proto_fuzzer(
"//src/proto/grpc/testing/xds/v3:endpoint_proto", "//src/proto/grpc/testing/xds/v3:endpoint_proto",
"//src/proto/grpc/testing/xds/v3:http_connection_manager_proto", "//src/proto/grpc/testing/xds/v3:http_connection_manager_proto",
"//src/proto/grpc/testing/xds/v3:router_proto", "//src/proto/grpc/testing/xds/v3:router_proto",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine:event_engine_test_utils",
], ],
) )

@ -30,7 +30,7 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/util/orphanable.h" #include "src/core/util/orphanable.h"
#include "src/core/util/ref_counted_ptr.h" #include "src/core/util/ref_counted_ptr.h"
#include "src/core/xds/grpc/xds_bootstrap_grpc.h" #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
@ -46,15 +46,24 @@
#include "src/core/xds/xds_client/xds_client.h" #include "src/core/xds/xds_client/xds_client.h"
#include "src/libfuzzer/libfuzzer_macro.h" #include "src/libfuzzer/libfuzzer_macro.h"
#include "src/proto/grpc/testing/xds/v3/discovery.pb.h" #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/xds/xds_client_fuzzer.pb.h" #include "test/core/xds/xds_client_fuzzer.pb.h"
#include "test/core/xds/xds_client_test_peer.h" #include "test/core/xds/xds_client_test_peer.h"
#include "test/core/xds/xds_transport_fake.h" #include "test/core/xds/xds_transport_fake.h"
using grpc_event_engine::experimental::FuzzingEventEngine;
namespace grpc_core { namespace grpc_core {
class Fuzzer { class Fuzzer {
public: public:
explicit Fuzzer(absl::string_view bootstrap_json) { Fuzzer(absl::string_view bootstrap_json,
const fuzzing_event_engine::Actions& fuzzing_ee_actions) {
event_engine_ = std::make_shared<FuzzingEventEngine>(
FuzzingEventEngine::Options(), fuzzing_ee_actions);
grpc_timer_manager_set_start_threaded(false);
grpc_init();
auto bootstrap = GrpcXdsBootstrap::Create(bootstrap_json); auto bootstrap = GrpcXdsBootstrap::Create(bootstrap_json);
if (!bootstrap.ok()) { if (!bootstrap.ok()) {
LOG(ERROR) << "error creating bootstrap: " << bootstrap.status(); LOG(ERROR) << "error creating bootstrap: " << bootstrap.status();
@ -62,15 +71,25 @@ class Fuzzer {
return; return;
} }
transport_factory_ = MakeRefCounted<FakeXdsTransportFactory>( transport_factory_ = MakeRefCounted<FakeXdsTransportFactory>(
[]() { Crash("Multiple concurrent reads"); }); []() { Crash("Multiple concurrent reads"); }, event_engine_);
transport_factory_->SetAutoCompleteMessagesFromClient(false); transport_factory_->SetAutoCompleteMessagesFromClient(false);
transport_factory_->SetAbortOnUndrainedMessages(false); transport_factory_->SetAbortOnUndrainedMessages(false);
xds_client_ = MakeRefCounted<XdsClient>( xds_client_ = MakeRefCounted<XdsClient>(
std::move(*bootstrap), transport_factory_, std::move(*bootstrap), transport_factory_, event_engine_,
grpc_event_engine::experimental::GetDefaultEventEngine(),
/*metrics_reporter=*/nullptr, "foo agent", "foo version"); /*metrics_reporter=*/nullptr, "foo agent", "foo version");
} }
~Fuzzer() {
transport_factory_.reset();
xds_client_.reset();
event_engine_->FuzzingDone();
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
grpc_event_engine::experimental::WaitForSingleOwner(
std::move(event_engine_));
grpc_shutdown_blocking();
}
void Act(const xds_client_fuzzer::Action& action) { void Act(const xds_client_fuzzer::Action& action) {
if (xds_client_ == nullptr) return; if (xds_client_ == nullptr) return;
switch (action.action_type_case()) { switch (action.action_type_case()) {
@ -276,8 +295,7 @@ class Fuzzer {
if (xds_server == nullptr) return nullptr; if (xds_server == nullptr) return nullptr;
const char* method = StreamIdMethod(stream_id); const char* method = StreamIdMethod(stream_id);
if (method == nullptr) return nullptr; if (method == nullptr) return nullptr;
return transport_factory_->WaitForStream(*xds_server, method, return transport_factory_->WaitForStream(*xds_server, method);
absl::ZeroDuration());
} }
static std::string StreamIdString( static std::string StreamIdString(
@ -293,7 +311,7 @@ class Fuzzer {
auto stream = GetStream(stream_id); auto stream = GetStream(stream_id);
if (stream == nullptr) return; if (stream == nullptr) return;
LOG(INFO) << " stream=" << stream.get(); LOG(INFO) << " stream=" << stream.get();
auto message = stream->WaitForMessageFromClient(absl::ZeroDuration()); auto message = stream->WaitForMessageFromClient();
if (message.has_value()) { if (message.has_value()) {
LOG(INFO) << " completing send_message"; LOG(INFO) << " completing send_message";
stream->CompleteSendMessageFromClient(ok); stream->CompleteSendMessageFromClient(ok);
@ -320,6 +338,7 @@ class Fuzzer {
stream->MaybeSendStatusToClient(std::move(status)); stream->MaybeSendStatusToClient(std::move(status));
} }
std::shared_ptr<FuzzingEventEngine> event_engine_;
RefCountedPtr<XdsClient> xds_client_; RefCountedPtr<XdsClient> xds_client_;
RefCountedPtr<FakeXdsTransportFactory> transport_factory_; RefCountedPtr<FakeXdsTransportFactory> transport_factory_;
@ -336,10 +355,9 @@ class Fuzzer {
bool squelch = true; bool squelch = true;
DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Msg& message) { DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Msg& message) {
grpc_init(); grpc_core::Fuzzer fuzzer(message.bootstrap(),
grpc_core::Fuzzer fuzzer(message.bootstrap()); message.fuzzing_event_engine_actions());
for (int i = 0; i < message.actions_size(); i++) { for (int i = 0; i < message.actions_size(); i++) {
fuzzer.Act(message.actions(i)); fuzzer.Act(message.actions(i));
} }
grpc_shutdown();
} }

@ -19,6 +19,7 @@ syntax = "proto3";
package xds_client_fuzzer; package xds_client_fuzzer;
import "src/proto/grpc/testing/xds/v3/discovery.proto"; import "src/proto/grpc/testing/xds/v3/discovery.proto";
import "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto";
// We'd ideally like to use google.rpc.Status instead of creating our // We'd ideally like to use google.rpc.Status instead of creating our
// own proto for this, but that winds up causing all sorts of dependency // own proto for this, but that winds up causing all sorts of dependency
@ -120,4 +121,5 @@ message Action {
message Msg { message Msg {
string bootstrap = 1; string bootstrap = 1;
repeated Action actions = 2; repeated Action actions = 2;
fuzzing_event_engine.Actions fuzzing_event_engine_actions = 3;
} }

@ -46,7 +46,7 @@
#include <grpc/support/json.h> #include <grpc/support/json.h>
#include <grpcpp/impl/codegen/config_protobuf.h> #include <grpcpp/impl/codegen/config_protobuf.h>
#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/util/debug_location.h" #include "src/core/util/debug_location.h"
#include "src/core/util/json/json.h" #include "src/core/util/json/json.h"
#include "src/core/util/json/json_args.h" #include "src/core/util/json/json_args.h"
@ -59,6 +59,8 @@
#include "src/core/xds/xds_client/xds_resource_type_impl.h" #include "src/core/xds/xds_client/xds_resource_type_impl.h"
#include "src/proto/grpc/testing/xds/v3/base.pb.h" #include "src/proto/grpc/testing/xds/v3/base.pb.h"
#include "src/proto/grpc/testing/xds/v3/discovery.pb.h" #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/test_util/scoped_env_var.h" #include "test/core/test_util/scoped_env_var.h"
#include "test/core/test_util/test_config.h" #include "test/core/test_util/test_config.h"
#include "test/core/xds/xds_client_test_peer.h" #include "test/core/xds/xds_client_test_peer.h"
@ -73,6 +75,7 @@
using envoy::service::discovery::v3::DiscoveryRequest; using envoy::service::discovery::v3::DiscoveryRequest;
using envoy::service::discovery::v3::DiscoveryResponse; using envoy::service::discovery::v3::DiscoveryResponse;
using grpc_event_engine::experimental::FuzzingEventEngine;
namespace grpc_core { namespace grpc_core {
namespace testing { namespace testing {
@ -252,6 +255,9 @@ class XdsClientTest : public ::testing::Test {
all_resources_required_in_sotw>, all_resources_required_in_sotw>,
ResourceStruct>::WatcherInterface { ResourceStruct>::WatcherInterface {
public: public:
explicit Watcher(std::shared_ptr<FuzzingEventEngine> event_engine)
: event_engine_(std::move(event_engine)) {}
~Watcher() override { ~Watcher() override {
MutexLock lock(&mu_); MutexLock lock(&mu_);
EXPECT_THAT(queue_, ::testing::IsEmpty()) EXPECT_THAT(queue_, ::testing::IsEmpty())
@ -270,42 +276,46 @@ class XdsClientTest : public ::testing::Test {
}); });
} }
// Returns true if no event is received during the timeout period.
bool ExpectNoEvent(absl::Duration timeout) {
MutexLock lock(&mu_);
return !WaitForEventLocked(timeout);
}
bool HasEvent() { bool HasEvent() {
MutexLock lock(&mu_); MutexLock lock(&mu_);
return !queue_.empty(); return !queue_.empty();
} }
// Returns true if no event is received after draining the fuzzing
// EE queue.
bool ExpectNoEvent() {
event_engine_->TickUntilIdle();
return !HasEvent();
}
absl::optional<ResourceAndReadDelayHandle> WaitForNextResourceAndHandle( absl::optional<ResourceAndReadDelayHandle> WaitForNextResourceAndHandle(
absl::Duration timeout = absl::Seconds(1),
SourceLocation location = SourceLocation()) { SourceLocation location = SourceLocation()) {
MutexLock lock(&mu_); while (true) {
if (!WaitForEventLocked(timeout)) return absl::nullopt; event_engine_->Tick();
Event& event = queue_.front(); MutexLock lock(&mu_);
if (!absl::holds_alternative<ResourceAndReadDelayHandle>(event)) { if (queue_.empty()) {
EXPECT_TRUE(false) if (event_engine_->IsIdle()) return absl::nullopt;
<< "got unexpected event " continue;
<< (absl::holds_alternative<absl::Status>(event) }
? "error" Event& event = queue_.front();
: "does-not-exist") if (!absl::holds_alternative<ResourceAndReadDelayHandle>(event)) {
<< " at " << location.file() << ":" << location.line(); EXPECT_TRUE(false)
return absl::nullopt; << "got unexpected event "
<< (absl::holds_alternative<absl::Status>(event)
? "error"
: "does-not-exist")
<< " at " << location.file() << ":" << location.line();
return absl::nullopt;
}
auto foo = std::move(absl::get<ResourceAndReadDelayHandle>(event));
queue_.pop_front();
return foo;
} }
auto foo = std::move(absl::get<ResourceAndReadDelayHandle>(event));
queue_.pop_front();
return foo;
} }
std::shared_ptr<const ResourceStruct> WaitForNextResource( std::shared_ptr<const ResourceStruct> WaitForNextResource(
absl::Duration timeout = absl::Seconds(1),
SourceLocation location = SourceLocation()) { SourceLocation location = SourceLocation()) {
auto resource_and_handle = auto resource_and_handle = WaitForNextResourceAndHandle(location);
WaitForNextResourceAndHandle(timeout, location);
if (!resource_and_handle.has_value()) { if (!resource_and_handle.has_value()) {
return nullptr; return nullptr;
} }
@ -313,40 +323,50 @@ class XdsClientTest : public ::testing::Test {
} }
absl::optional<absl::Status> WaitForNextError( absl::optional<absl::Status> WaitForNextError(
absl::Duration timeout = absl::Seconds(1),
SourceLocation location = SourceLocation()) { SourceLocation location = SourceLocation()) {
MutexLock lock(&mu_); while (true) {
if (!WaitForEventLocked(timeout)) return absl::nullopt; event_engine_->Tick();
Event& event = queue_.front(); MutexLock lock(&mu_);
if (!absl::holds_alternative<absl::Status>(event)) { if (queue_.empty()) {
EXPECT_TRUE(false) if (event_engine_->IsIdle()) return absl::nullopt;
<< "got unexpected event " continue;
<< (absl::holds_alternative<ResourceAndReadDelayHandle>(event) }
? "resource" Event& event = queue_.front();
: "does-not-exist") if (!absl::holds_alternative<absl::Status>(event)) {
<< " at " << location.file() << ":" << location.line(); EXPECT_TRUE(false)
return absl::nullopt; << "got unexpected event "
<< (absl::holds_alternative<ResourceAndReadDelayHandle>(event)
? "resource"
: "does-not-exist")
<< " at " << location.file() << ":" << location.line();
return absl::nullopt;
}
absl::Status error = std::move(absl::get<absl::Status>(event));
queue_.pop_front();
return std::move(error);
} }
absl::Status error = std::move(absl::get<absl::Status>(event));
queue_.pop_front();
return std::move(error);
} }
bool WaitForDoesNotExist(absl::Duration timeout, bool WaitForDoesNotExist(SourceLocation location = SourceLocation()) {
SourceLocation location = SourceLocation()) { while (true) {
MutexLock lock(&mu_); event_engine_->Tick();
if (!WaitForEventLocked(timeout)) return false; MutexLock lock(&mu_);
Event& event = queue_.front(); if (queue_.empty()) {
if (!absl::holds_alternative<DoesNotExist>(event)) { if (event_engine_->IsIdle()) return false;
EXPECT_TRUE(false) continue;
<< "got unexpected event " }
<< (absl::holds_alternative<absl::Status>(event) ? "error" Event& event = queue_.front();
: "resource") if (!absl::holds_alternative<DoesNotExist>(event)) {
<< " at " << location.file() << ":" << location.line(); EXPECT_TRUE(false)
return false; << "got unexpected event "
<< (absl::holds_alternative<absl::Status>(event) ? "error"
: "resource")
<< " at " << location.file() << ":" << location.line();
return false;
}
queue_.pop_front();
return true;
} }
queue_.pop_front();
return true;
} }
private: private:
@ -361,7 +381,6 @@ class XdsClientTest : public ::testing::Test {
ResourceAndReadDelayHandle event_details = { ResourceAndReadDelayHandle event_details = {
std::move(foo), std::move(read_delay_handle)}; std::move(foo), std::move(read_delay_handle)};
queue_.emplace_back(std::move(event_details)); queue_.emplace_back(std::move(event_details));
cv_.Signal();
} }
void OnError( void OnError(
@ -370,7 +389,6 @@ class XdsClientTest : public ::testing::Test {
override { override {
MutexLock lock(&mu_); MutexLock lock(&mu_);
queue_.push_back(std::move(status)); queue_.push_back(std::move(status));
cv_.Signal();
} }
void OnResourceDoesNotExist( void OnResourceDoesNotExist(
@ -378,24 +396,11 @@ class XdsClientTest : public ::testing::Test {
override { override {
MutexLock lock(&mu_); MutexLock lock(&mu_);
queue_.push_back(DoesNotExist()); queue_.push_back(DoesNotExist());
cv_.Signal();
} }
// Returns true if an event was received, or false if the timeout std::shared_ptr<FuzzingEventEngine> event_engine_;
// expires before any event is received.
bool WaitForEventLocked(absl::Duration timeout)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) {
while (queue_.empty()) {
if (cv_.WaitWithTimeout(&mu_,
timeout * grpc_test_slowdown_factor())) {
return false;
}
}
return true;
}
Mutex mu_; Mutex mu_;
CondVar cv_;
std::deque<Event> queue_ ABSL_GUARDED_BY(&mu_); std::deque<Event> queue_ ABSL_GUARDED_BY(&mu_);
}; };
@ -626,6 +631,9 @@ class XdsClientTest : public ::testing::Test {
uint64_t>; uint64_t>;
using ServerFailureMap = std::map<std::string /*xds_server*/, uint64_t>; using ServerFailureMap = std::map<std::string /*xds_server*/, uint64_t>;
explicit MetricsReporter(std::shared_ptr<FuzzingEventEngine> event_engine)
: event_engine_(std::move(event_engine)) {}
ResourceUpdateMap resource_updates_valid() const { ResourceUpdateMap resource_updates_valid() const {
MutexLock lock(&mu_); MutexLock lock(&mu_);
return resource_updates_valid_; return resource_updates_valid_;
@ -643,12 +651,10 @@ class XdsClientTest : public ::testing::Test {
::testing::Matcher<ResourceUpdateMap> resource_updates_valid_matcher, ::testing::Matcher<ResourceUpdateMap> resource_updates_valid_matcher,
::testing::Matcher<ResourceUpdateMap> resource_updates_invalid_matcher, ::testing::Matcher<ResourceUpdateMap> resource_updates_invalid_matcher,
::testing::Matcher<ServerFailureMap> server_failures_matcher, ::testing::Matcher<ServerFailureMap> server_failures_matcher,
absl::Duration timeout = absl::Seconds(3),
SourceLocation location = SourceLocation()) { SourceLocation location = SourceLocation()) {
const absl::Time deadline =
absl::Now() + (timeout * grpc_test_slowdown_factor());
MutexLock lock(&mu_);
while (true) { while (true) {
event_engine_->Tick();
MutexLock lock(&mu_);
if (::testing::Matches(resource_updates_valid_matcher)( if (::testing::Matches(resource_updates_valid_matcher)(
resource_updates_valid_) && resource_updates_valid_) &&
::testing::Matches(resource_updates_invalid_matcher)( ::testing::Matches(resource_updates_invalid_matcher)(
@ -656,15 +662,15 @@ class XdsClientTest : public ::testing::Test {
::testing::Matches(server_failures_matcher)(server_failures_)) { ::testing::Matches(server_failures_matcher)(server_failures_)) {
return true; return true;
} }
if (cond_.WaitWithDeadline(&mu_, deadline)) break; if (!event_engine_->IsIdle()) continue;
EXPECT_THAT(resource_updates_valid_, resource_updates_valid_matcher)
<< location.file() << ":" << location.line();
EXPECT_THAT(resource_updates_invalid_, resource_updates_invalid_matcher)
<< location.file() << ":" << location.line();
EXPECT_THAT(server_failures_, server_failures_matcher)
<< location.file() << ":" << location.line();
return false;
} }
EXPECT_THAT(resource_updates_valid_, resource_updates_valid_matcher)
<< location.file() << ":" << location.line();
EXPECT_THAT(resource_updates_invalid_, resource_updates_invalid_matcher)
<< location.file() << ":" << location.line();
EXPECT_THAT(server_failures_, server_failures_matcher)
<< location.file() << ":" << location.line();
return false;
} }
private: private:
@ -690,6 +696,8 @@ class XdsClientTest : public ::testing::Test {
cond_.SignalAll(); cond_.SignalAll();
} }
std::shared_ptr<FuzzingEventEngine> event_engine_;
mutable Mutex mu_; mutable Mutex mu_;
ResourceUpdateMap resource_updates_valid_ ABSL_GUARDED_BY(mu_); ResourceUpdateMap resource_updates_valid_ ABSL_GUARDED_BY(mu_);
ResourceUpdateMap resource_updates_invalid_ ABSL_GUARDED_BY(mu_); ResourceUpdateMap resource_updates_invalid_ ABSL_GUARDED_BY(mu_);
@ -724,18 +732,35 @@ class XdsClientTest : public ::testing::Test {
return server_connection_map; return server_connection_map;
} }
void SetUp() override {
event_engine_ = std::make_shared<FuzzingEventEngine>(
FuzzingEventEngine::Options(), fuzzing_event_engine::Actions());
grpc_timer_manager_set_start_threaded(false);
grpc_init();
}
void TearDown() override {
transport_factory_.reset();
xds_client_.reset();
event_engine_->FuzzingDone();
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
grpc_event_engine::experimental::WaitForSingleOwner(
std::move(event_engine_));
grpc_shutdown_blocking();
}
// Sets transport_factory_ and initializes xds_client_ with the // Sets transport_factory_ and initializes xds_client_ with the
// specified bootstrap config. // specified bootstrap config.
void InitXdsClient( void InitXdsClient(
FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(), FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(),
Duration resource_request_timeout = Duration::Seconds(15)) { Duration resource_request_timeout = Duration::Seconds(15)) {
transport_factory_ = MakeRefCounted<FakeXdsTransportFactory>( transport_factory_ = MakeRefCounted<FakeXdsTransportFactory>(
[]() { FAIL() << "Multiple concurrent reads"; }); []() { FAIL() << "Multiple concurrent reads"; }, event_engine_);
auto metrics_reporter = std::make_unique<MetricsReporter>(); auto metrics_reporter = std::make_unique<MetricsReporter>(event_engine_);
metrics_reporter_ = metrics_reporter.get(); metrics_reporter_ = metrics_reporter.get();
xds_client_ = MakeRefCounted<XdsClient>( xds_client_ = MakeRefCounted<XdsClient>(
bootstrap_builder.Build(), transport_factory_, bootstrap_builder.Build(), transport_factory_, event_engine_,
grpc_event_engine::experimental::GetDefaultEventEngine(),
std::move(metrics_reporter), "foo agent", "foo version", std::move(metrics_reporter), "foo agent", "foo version",
resource_request_timeout * grpc_test_slowdown_factor()); resource_request_timeout * grpc_test_slowdown_factor());
} }
@ -743,7 +768,7 @@ class XdsClientTest : public ::testing::Test {
// Starts and cancels a watch for a Foo resource. // Starts and cancels a watch for a Foo resource.
RefCountedPtr<XdsFooResourceType::Watcher> StartFooWatch( RefCountedPtr<XdsFooResourceType::Watcher> StartFooWatch(
absl::string_view resource_name) { absl::string_view resource_name) {
auto watcher = MakeRefCounted<XdsFooResourceType::Watcher>(); auto watcher = MakeRefCounted<XdsFooResourceType::Watcher>(event_engine_);
XdsFooResourceType::StartWatch(xds_client_.get(), resource_name, watcher); XdsFooResourceType::StartWatch(xds_client_.get(), resource_name, watcher);
return watcher; return watcher;
} }
@ -757,7 +782,7 @@ class XdsClientTest : public ::testing::Test {
// Starts and cancels a watch for a Bar resource. // Starts and cancels a watch for a Bar resource.
RefCountedPtr<XdsBarResourceType::Watcher> StartBarWatch( RefCountedPtr<XdsBarResourceType::Watcher> StartBarWatch(
absl::string_view resource_name) { absl::string_view resource_name) {
auto watcher = MakeRefCounted<XdsBarResourceType::Watcher>(); auto watcher = MakeRefCounted<XdsBarResourceType::Watcher>(event_engine_);
XdsBarResourceType::StartWatch(xds_client_.get(), resource_name, watcher); XdsBarResourceType::StartWatch(xds_client_.get(), resource_name, watcher);
return watcher; return watcher;
} }
@ -771,7 +796,8 @@ class XdsClientTest : public ::testing::Test {
// Starts and cancels a watch for a WildcardCapable resource. // Starts and cancels a watch for a WildcardCapable resource.
RefCountedPtr<XdsWildcardCapableResourceType::Watcher> RefCountedPtr<XdsWildcardCapableResourceType::Watcher>
StartWildcardCapableWatch(absl::string_view resource_name) { StartWildcardCapableWatch(absl::string_view resource_name) {
auto watcher = MakeRefCounted<XdsWildcardCapableResourceType::Watcher>(); auto watcher =
MakeRefCounted<XdsWildcardCapableResourceType::Watcher>(event_engine_);
XdsWildcardCapableResourceType::StartWatch(xds_client_.get(), resource_name, XdsWildcardCapableResourceType::StartWatch(xds_client_.get(), resource_name,
watcher); watcher);
return watcher; return watcher;
@ -784,11 +810,13 @@ class XdsClientTest : public ::testing::Test {
} }
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream( RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream(
const XdsBootstrap::XdsServer& xds_server, const XdsBootstrap::XdsServer& xds_server) {
absl::Duration timeout = absl::Seconds(5)) {
return transport_factory_->WaitForStream( return transport_factory_->WaitForStream(
xds_server, FakeXdsTransportFactory::kAdsMethod, xds_server, FakeXdsTransportFactory::kAdsMethod);
timeout * grpc_test_slowdown_factor()); }
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream() {
return WaitForAdsStream(*xds_client_->bootstrap().servers().front());
} }
void TriggerConnectionFailure(const XdsBootstrap::XdsServer& xds_server, void TriggerConnectionFailure(const XdsBootstrap::XdsServer& xds_server,
@ -796,19 +824,11 @@ class XdsClientTest : public ::testing::Test {
transport_factory_->TriggerConnectionFailure(xds_server, std::move(status)); transport_factory_->TriggerConnectionFailure(xds_server, std::move(status));
} }
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream(
absl::Duration timeout = absl::Seconds(5)) {
return WaitForAdsStream(*xds_client_->bootstrap().servers().front(),
timeout);
}
// Gets the latest request sent to the fake xDS server. // Gets the latest request sent to the fake xDS server.
absl::optional<DiscoveryRequest> WaitForRequest( absl::optional<DiscoveryRequest> WaitForRequest(
FakeXdsTransportFactory::FakeStreamingCall* stream, FakeXdsTransportFactory::FakeStreamingCall* stream,
absl::Duration timeout = absl::Seconds(3),
SourceLocation location = SourceLocation()) { SourceLocation location = SourceLocation()) {
auto message = auto message = stream->WaitForMessageFromClient();
stream->WaitForMessageFromClient(timeout * grpc_test_slowdown_factor());
if (!message.has_value()) return absl::nullopt; if (!message.has_value()) return absl::nullopt;
DiscoveryRequest request; DiscoveryRequest request;
bool success = request.ParseFromString(*message); bool success = request.ParseFromString(*message);
@ -893,6 +913,7 @@ class XdsClientTest : public ::testing::Test {
<< location.file() << ":" << location.line(); << location.file() << ":" << location.line();
} }
std::shared_ptr<FuzzingEventEngine> event_engine_;
RefCountedPtr<FakeXdsTransportFactory> transport_factory_; RefCountedPtr<FakeXdsTransportFactory> transport_factory_;
RefCountedPtr<XdsClient> xds_client_; RefCountedPtr<XdsClient> xds_client_;
MetricsReporter* metrics_reporter_ = nullptr; MetricsReporter* metrics_reporter_ = nullptr;
@ -2011,7 +2032,7 @@ TEST_F(XdsClientTest, ResourceDeletion) {
.set_nonce("B") .set_nonce("B")
.Serialize()); .Serialize());
// Watcher should see the does-not-exist event. // Watcher should see the does-not-exist event.
EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(1))); EXPECT_TRUE(watcher->WaitForDoesNotExist());
// Check metric data. // Check metric data.
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
::testing::ElementsAre(::testing::Pair( ::testing::ElementsAre(::testing::Pair(
@ -2029,7 +2050,7 @@ TEST_F(XdsClientTest, ResourceDeletion) {
// Start a new watcher for the same resource. It should immediately // Start a new watcher for the same resource. It should immediately
// receive the same does-not-exist notification. // receive the same does-not-exist notification.
auto watcher2 = StartWildcardCapableWatch("wc1"); auto watcher2 = StartWildcardCapableWatch("wc1");
EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1))); EXPECT_TRUE(watcher2->WaitForDoesNotExist());
// XdsClient should have sent an ACK message to the xDS server. // XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get()); request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value()); ASSERT_TRUE(request.has_value());
@ -2142,7 +2163,7 @@ TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) {
.Serialize()); .Serialize());
// Watcher should not see any update, since we should have ignored the // Watcher should not see any update, since we should have ignored the
// deletion. // deletion.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1))); EXPECT_TRUE(watcher->ExpectNoEvent());
// Check metric data. // Check metric data.
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
::testing::ElementsAre(::testing::Pair( ::testing::ElementsAre(::testing::Pair(
@ -2438,7 +2459,7 @@ TEST_F(XdsClientTest, ConnectionFails) {
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1)))); ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
// We should not see a resource-does-not-exist event, because the // We should not see a resource-does-not-exist event, because the
// timer should not be running while the channel is disconnected. // timer should not be running while the channel is disconnected.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); EXPECT_TRUE(watcher->ExpectNoEvent());
// Start a new watch. This watcher should be given the same error, // Start a new watch. This watcher should be given the same error,
// since we have not yet recovered. // since we have not yet recovered.
auto watcher2 = StartFooWatch("foo1"); auto watcher2 = StartFooWatch("foo1");
@ -2517,7 +2538,7 @@ TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
CheckRequestNode(*request); // Should be present on the first request. CheckRequestNode(*request); // Should be present on the first request.
// Do not send a response, but wait for the resource to be reported as // Do not send a response, but wait for the resource to be reported as
// not existing. // not existing.
EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5))); EXPECT_TRUE(watcher->WaitForDoesNotExist());
// Check metric data. // Check metric data.
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_)); ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
@ -2530,7 +2551,7 @@ TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
// Start a new watcher for the same resource. It should immediately // Start a new watcher for the same resource. It should immediately
// receive the same does-not-exist notification. // receive the same does-not-exist notification.
auto watcher2 = StartFooWatch("foo1"); auto watcher2 = StartFooWatch("foo1");
EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1))); EXPECT_TRUE(watcher2->WaitForDoesNotExist());
// Now server sends a response. // Now server sends a response.
stream->SendMessageToClient( stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url()) ResponseBuilder(XdsFooResourceType::Get()->type_url())
@ -2641,7 +2662,7 @@ TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) {
CheckRequestNode(*request); // Should be present on the first request. CheckRequestNode(*request); // Should be present on the first request.
// Server does NOT send a response immediately. // Server does NOT send a response immediately.
// Client should receive a resource does-not-exist. // Client should receive a resource does-not-exist.
ASSERT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4))); ASSERT_TRUE(watcher->WaitForDoesNotExist());
// Check metric data. // Check metric data.
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_)); ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
@ -2713,7 +2734,7 @@ TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) {
// Server does NOT send a response. // Server does NOT send a response.
// We should not see a resource-does-not-exist event, because the // We should not see a resource-does-not-exist event, because the
// timer should not be running while the channel is disconnected. // timer should not be running while the channel is disconnected.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); EXPECT_TRUE(watcher->ExpectNoEvent());
// Check metric data. // Check metric data.
EXPECT_THAT(GetResourceCounts(), EXPECT_THAT(GetResourceCounts(),
::testing::ElementsAre(::testing::Pair( ::testing::ElementsAre(::testing::Pair(
@ -2726,7 +2747,7 @@ TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) {
stream->CompleteSendMessageFromClient(); stream->CompleteSendMessageFromClient();
// Server does NOT send a response. // Server does NOT send a response.
// Watcher should see a does-not-exist event. // Watcher should see a does-not-exist event.
EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4))); EXPECT_TRUE(watcher->WaitForDoesNotExist());
// Check metric data. // Check metric data.
EXPECT_THAT(GetResourceCounts(), EXPECT_THAT(GetResourceCounts(),
::testing::ElementsAre(::testing::Pair( ::testing::ElementsAre(::testing::Pair(
@ -2915,7 +2936,7 @@ TEST_F(XdsClientTest,
/*resource_names=*/{"foo1", "foo2"}); /*resource_names=*/{"foo1", "foo2"});
stream->CompleteSendMessageFromClient(); stream->CompleteSendMessageFromClient();
// Make sure the watcher for foo1 does not see a does-not-exist event. // Make sure the watcher for foo1 does not see a does-not-exist event.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5))); EXPECT_TRUE(watcher->ExpectNoEvent());
// Cancel watches. // Cancel watches.
CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
CancelFooWatch(watcher2.get(), "foo2"); CancelFooWatch(watcher2.get(), "foo2");
@ -2992,7 +3013,7 @@ TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) {
// We should not see a resource-does-not-exist event, because the // We should not see a resource-does-not-exist event, because the
// resource was already cached, so the server can optimize by not // resource was already cached, so the server can optimize by not
// resending it. // resending it.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); EXPECT_TRUE(watcher->ExpectNoEvent());
// Check metric data. // Check metric data.
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
::testing::ElementsAre(::testing::Pair( ::testing::ElementsAre(::testing::Pair(
@ -3014,7 +3035,7 @@ TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) {
.AddFooResource(XdsFooResource("foo1", 6)) .AddFooResource(XdsFooResource("foo1", 6))
.Serialize()); .Serialize());
// Watcher will not see any update, since the resource is unchanged. // Watcher will not see any update, since the resource is unchanged.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1))); EXPECT_TRUE(watcher->ExpectNoEvent());
// Check metric data. // Check metric data.
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
::testing::ElementsAre(::testing::Pair( ::testing::ElementsAre(::testing::Pair(
@ -3504,7 +3525,7 @@ TEST_F(XdsClientTest, FederationWithUnknownAuthority) {
<< *error; << *error;
} }
TEST_F(XdsClientTest, FederationWithUnparsableXdstpResourceName) { TEST_F(XdsClientTest, FederationWithUnparseableXdstpResourceName) {
// Note: Not adding authority to bootstrap config. // Note: Not adding authority to bootstrap config.
InitXdsClient(); InitXdsClient();
// Start a watch for the xdstp resource name. // Start a watch for the xdstp resource name.
@ -3724,7 +3745,6 @@ TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
} }
TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) { TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
const absl::Duration timeout = absl::Seconds(5) * grpc_test_slowdown_factor();
InitXdsClient(); InitXdsClient();
// Start watches for "foo1" and "foo2". // Start watches for "foo1" and "foo2".
auto watcher1 = StartFooWatch("foo1"); auto watcher1 = StartFooWatch("foo1");
@ -3777,11 +3797,11 @@ TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
/*version_info=*/"1", /*response_nonce=*/"A", /*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"}); /*resource_names=*/{"foo1", "foo2"});
EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout)); EXPECT_TRUE(stream->WaitForReadsStarted(1));
resource1->read_delay_handle.reset(); resource1->read_delay_handle.reset();
EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout)); EXPECT_TRUE(stream->WaitForReadsStarted(1));
resource2->read_delay_handle.reset(); resource2->read_delay_handle.reset();
EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout)); EXPECT_TRUE(stream->WaitForReadsStarted(2));
resource1 = watcher1->WaitForNextResourceAndHandle(); resource1 = watcher1->WaitForNextResourceAndHandle();
ASSERT_NE(resource1, absl::nullopt); ASSERT_NE(resource1, absl::nullopt);
EXPECT_EQ(resource1->resource->name, "foo1"); EXPECT_EQ(resource1->resource->name, "foo1");
@ -3794,9 +3814,9 @@ TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
/*version_info=*/"2", /*response_nonce=*/"B", /*version_info=*/"2", /*response_nonce=*/"B",
/*error_detail=*/absl::OkStatus(), /*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"}); /*resource_names=*/{"foo1", "foo2"});
EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout)); EXPECT_TRUE(stream->WaitForReadsStarted(2));
resource1->read_delay_handle.reset(); resource1->read_delay_handle.reset();
EXPECT_TRUE(stream->WaitForReadsStarted(3, timeout)); EXPECT_TRUE(stream->WaitForReadsStarted(3));
// Cancel watch. // Cancel watch.
CancelFooWatch(watcher1.get(), "foo1"); CancelFooWatch(watcher1.get(), "foo1");
request = WaitForRequest(stream.get()); request = WaitForRequest(stream.get());
@ -4185,8 +4205,5 @@ TEST_F(XdsClientTest, FallbackOnStartup) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv); grpc::testing::TestEnvironment env(&argc, argv);
grpc_init(); return RUN_ALL_TESTS();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
} }

@ -29,15 +29,12 @@
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/util/orphanable.h" #include "src/core/util/orphanable.h"
#include "src/core/util/ref_counted_ptr.h" #include "src/core/util/ref_counted_ptr.h"
#include "src/core/xds/xds_client/xds_bootstrap.h" #include "src/core/xds/xds_client/xds_bootstrap.h"
#include "test/core/test_util/test_config.h" #include "test/core/test_util/test_config.h"
using grpc_event_engine::experimental::GetDefaultEventEngine;
namespace grpc_core { namespace grpc_core {
// //
@ -60,8 +57,8 @@ FakeXdsTransportFactory::FakeStreamingCall::~FakeStreamingCall() {
// synchronously, since those operations will trigger code in // synchronously, since those operations will trigger code in
// XdsClient that acquires its mutex, but it was already holding its // XdsClient that acquires its mutex, but it was already holding its
// mutex when it called us, so it would deadlock. // mutex when it called us, so it would deadlock.
GetDefaultEventEngine()->Run([event_handler = std::move(event_handler_), event_engine_->Run([event_handler = std::move(event_handler_),
status_sent = status_sent_]() mutable { status_sent = status_sent_]() mutable {
ExecCtx exec_ctx; ExecCtx exec_ctx;
if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus()); if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus());
event_handler.reset(); event_handler.reset();
@ -82,7 +79,6 @@ void FakeXdsTransportFactory::FakeStreamingCall::SendMessage(
MutexLock lock(&mu_); MutexLock lock(&mu_);
CHECK(!orphaned_); CHECK(!orphaned_);
from_client_messages_.push_back(std::move(payload)); from_client_messages_.push_back(std::move(payload));
cv_client_msg_.Signal();
if (transport_->auto_complete_messages_from_client()) { if (transport_->auto_complete_messages_from_client()) {
CompleteSendMessageFromClientLocked(/*ok=*/true); CompleteSendMessageFromClientLocked(/*ok=*/true);
} }
@ -94,18 +90,18 @@ bool FakeXdsTransportFactory::FakeStreamingCall::HaveMessageFromClient() {
} }
absl::optional<std::string> absl::optional<std::string>
FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient( FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient() {
absl::Duration timeout) { while (true) {
MutexLock lock(&mu_); event_engine_->Tick();
while (from_client_messages_.empty()) { MutexLock lock(&mu_);
if (cv_client_msg_.WaitWithTimeout(&mu_, if (from_client_messages_.empty()) {
timeout * grpc_test_slowdown_factor())) { if (event_engine_->IsIdle()) return absl::nullopt;
return absl::nullopt; continue;
} }
std::string payload = std::move(from_client_messages_.front());
from_client_messages_.pop_front();
return payload;
} }
std::string payload = from_client_messages_.front();
from_client_messages_.pop_front();
return payload;
} }
void FakeXdsTransportFactory::FakeStreamingCall:: void FakeXdsTransportFactory::FakeStreamingCall::
@ -113,12 +109,11 @@ void FakeXdsTransportFactory::FakeStreamingCall::
// Can't call event_handler_->OnRequestSent() synchronously, since that // Can't call event_handler_->OnRequestSent() synchronously, since that
// operation will trigger code in XdsClient that acquires its mutex, but it // operation will trigger code in XdsClient that acquires its mutex, but it
// was already holding its mutex when it called us, so it would deadlock. // was already holding its mutex when it called us, so it would deadlock.
GetDefaultEventEngine()->Run( event_engine_->Run([event_handler = event_handler_->Ref(), ok]() mutable {
[event_handler = event_handler_->Ref(), ok]() mutable { ExecCtx exec_ctx;
ExecCtx exec_ctx; event_handler->OnRequestSent(ok);
event_handler->OnRequestSent(ok); event_handler.reset();
event_handler.reset(); });
});
} }
void FakeXdsTransportFactory::FakeStreamingCall::CompleteSendMessageFromClient( void FakeXdsTransportFactory::FakeStreamingCall::CompleteSendMessageFromClient(
@ -135,11 +130,10 @@ void FakeXdsTransportFactory::FakeStreamingCall::StartRecvMessage() {
} }
++reads_started_; ++reads_started_;
++num_pending_reads_; ++num_pending_reads_;
cv_reads_started_.SignalAll();
if (!to_client_messages_.empty()) { if (!to_client_messages_.empty()) {
// Dispatch pending message (if there's one) on a separate thread to avoid // Dispatch pending message (if there's one) on a separate thread to avoid
// recursion // recursion
GetDefaultEventEngine()->Run([call = RefAsSubclass<FakeStreamingCall>()]() { event_engine_->Run([call = RefAsSubclass<FakeStreamingCall>()]() {
call->MaybeDeliverMessageToClient(); call->MaybeDeliverMessageToClient();
}); });
} }
@ -187,6 +181,16 @@ void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient(
event_handler->OnStatusReceived(std::move(status)); event_handler->OnStatusReceived(std::move(status));
} }
bool FakeXdsTransportFactory::FakeStreamingCall::WaitForReadsStarted(
size_t expected) {
while (true) {
event_engine_->Tick();
MutexLock lock(&mu_);
if (reads_started_ == expected) return true;
if (event_engine_->IsIdle()) return false;
}
}
bool FakeXdsTransportFactory::FakeStreamingCall::IsOrphaned() { bool FakeXdsTransportFactory::FakeStreamingCall::IsOrphaned() {
MutexLock lock(&mu_); MutexLock lock(&mu_);
return orphaned_; return orphaned_;
@ -223,7 +227,7 @@ void FakeXdsTransportFactory::FakeXdsTransport::Orphaned() {
// Can't destroy watchers synchronously, since that operation will trigger // Can't destroy watchers synchronously, since that operation will trigger
// code in XdsClient that acquires its mutex, but it was already holding // code in XdsClient that acquires its mutex, but it was already holding
// its mutex when it called us, so it would deadlock. // its mutex when it called us, so it would deadlock.
GetDefaultEventEngine()->Run([watchers = std::move(watchers_)]() mutable { event_engine_->Run([watchers = std::move(watchers_)]() mutable {
ExecCtx exec_ctx; ExecCtx exec_ctx;
watchers.clear(); watchers.clear();
}); });
@ -231,17 +235,14 @@ void FakeXdsTransportFactory::FakeXdsTransport::Orphaned() {
} }
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
FakeXdsTransportFactory::FakeXdsTransport::WaitForStream( FakeXdsTransportFactory::FakeXdsTransport::WaitForStream(const char* method) {
const char* method, absl::Duration timeout) { while (true) {
MutexLock lock(&mu_); event_engine_->Tick();
auto it = active_calls_.find(method); MutexLock lock(&mu_);
while (it == active_calls_.end() || it->second == nullptr) { auto it = active_calls_.find(method);
if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) { if (it != active_calls_.end() && it->second != nullptr) return it->second;
return nullptr; if (event_engine_->IsIdle()) return nullptr;
}
it = active_calls_.find(method);
} }
return it->second;
} }
void FakeXdsTransportFactory::FakeXdsTransport::RemoveStream( void FakeXdsTransportFactory::FakeXdsTransport::RemoveStream(
@ -273,7 +274,6 @@ FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall(
WeakRefAsSubclass<FakeXdsTransport>(), method, std::move(event_handler)); WeakRefAsSubclass<FakeXdsTransport>(), method, std::move(event_handler));
MutexLock lock(&mu_); MutexLock lock(&mu_);
active_calls_[method] = call->Ref().TakeAsSubclass<FakeStreamingCall>(); active_calls_[method] = call->Ref().TakeAsSubclass<FakeStreamingCall>();
cv_.Signal();
return call; return call;
} }
@ -318,13 +318,14 @@ void FakeXdsTransportFactory::SetAbortOnUndrainedMessages(bool value) {
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server, FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server,
const char* method, const char* method) {
absl::Duration timeout) {
auto transport = GetTransport(server); auto transport = GetTransport(server);
if (transport == nullptr) return nullptr; if (transport == nullptr) return nullptr;
return transport->WaitForStream(method, timeout); return transport->WaitForStream(method);
} }
void FakeXdsTransportFactory::Orphaned() { event_engine_.reset(); }
RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport> RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport>
FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server) { FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server) {
std::string key = server.Key(); std::string key = server.Key();

@ -40,6 +40,7 @@
#include "src/core/util/sync.h" #include "src/core/util/sync.h"
#include "src/core/xds/xds_client/xds_bootstrap.h" #include "src/core/xds/xds_client/xds_bootstrap.h"
#include "src/core/xds/xds_client/xds_transport.h" #include "src/core/xds/xds_client/xds_transport.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
namespace grpc_core { namespace grpc_core {
@ -61,6 +62,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
std::unique_ptr<StreamingCall::EventHandler> event_handler) std::unique_ptr<StreamingCall::EventHandler> event_handler)
: transport_(std::move(transport)), : transport_(std::move(transport)),
method_(method), method_(method),
event_engine_(transport_->factory()->event_engine_),
event_handler_(MakeRefCounted<RefCountedEventHandler>( event_handler_(MakeRefCounted<RefCountedEventHandler>(
std::move(event_handler))) {} std::move(event_handler))) {}
@ -75,8 +77,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
using StreamingCall::Ref; // Make it public. using StreamingCall::Ref; // Make it public.
bool HaveMessageFromClient(); bool HaveMessageFromClient();
absl::optional<std::string> WaitForMessageFromClient( absl::optional<std::string> WaitForMessageFromClient();
absl::Duration timeout);
// If FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient() // If FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient()
// was called to set the value to false before the creation of the // was called to set the value to false before the creation of the
@ -88,16 +89,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
void SendMessageToClient(absl::string_view payload); void SendMessageToClient(absl::string_view payload);
void MaybeSendStatusToClient(absl::Status status); void MaybeSendStatusToClient(absl::Status status);
bool WaitForReadsStarted(size_t expected, absl::Duration timeout) { bool WaitForReadsStarted(size_t expected);
MutexLock lock(&mu_);
const absl::Time deadline = absl::Now() + timeout;
do {
if (reads_started_ == expected) {
return true;
}
} while (!cv_reads_started_.WaitWithDeadline(&mu_, deadline));
return false;
}
private: private:
class RefCountedEventHandler : public RefCounted<RefCountedEventHandler> { class RefCountedEventHandler : public RefCounted<RefCountedEventHandler> {
@ -126,10 +118,10 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
WeakRefCountedPtr<FakeXdsTransport> transport_; WeakRefCountedPtr<FakeXdsTransport> transport_;
const char* method_; const char* method_;
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_;
Mutex mu_; Mutex mu_;
CondVar cv_reads_started_;
CondVar cv_client_msg_;
RefCountedPtr<RefCountedEventHandler> event_handler_ ABSL_GUARDED_BY(&mu_); RefCountedPtr<RefCountedEventHandler> event_handler_ ABSL_GUARDED_BY(&mu_);
std::deque<std::string> from_client_messages_ ABSL_GUARDED_BY(&mu_); std::deque<std::string> from_client_messages_ ABSL_GUARDED_BY(&mu_);
bool status_sent_ ABSL_GUARDED_BY(&mu_) = false; bool status_sent_ ABSL_GUARDED_BY(&mu_) = false;
@ -140,8 +132,11 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
}; };
explicit FakeXdsTransportFactory( explicit FakeXdsTransportFactory(
std::function<void()> too_many_pending_reads_callback) std::function<void()> too_many_pending_reads_callback,
: too_many_pending_reads_callback_( std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine)
: event_engine_(std::move(event_engine)),
too_many_pending_reads_callback_(
std::move(too_many_pending_reads_callback)) {} std::move(too_many_pending_reads_callback)) {}
void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server, void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server,
@ -169,10 +164,9 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
void SetAbortOnUndrainedMessages(bool value); void SetAbortOnUndrainedMessages(bool value);
RefCountedPtr<FakeStreamingCall> WaitForStream( RefCountedPtr<FakeStreamingCall> WaitForStream(
const XdsBootstrap::XdsServer& server, const char* method, const XdsBootstrap::XdsServer& server, const char* method);
absl::Duration timeout);
void Orphaned() override {} void Orphaned() override;
private: private:
class FakeXdsTransport : public XdsTransport { class FakeXdsTransport : public XdsTransport {
@ -185,7 +179,8 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
server_(server), server_(server),
auto_complete_messages_from_client_( auto_complete_messages_from_client_(
auto_complete_messages_from_client), auto_complete_messages_from_client),
abort_on_undrained_messages_(abort_on_undrained_messages) {} abort_on_undrained_messages_(abort_on_undrained_messages),
event_engine_(factory_->event_engine_) {}
void Orphaned() override; void Orphaned() override;
@ -199,8 +194,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
void TriggerConnectionFailure(absl::Status status); void TriggerConnectionFailure(absl::Status status);
RefCountedPtr<FakeStreamingCall> WaitForStream(const char* method, RefCountedPtr<FakeStreamingCall> WaitForStream(const char* method);
absl::Duration timeout);
void RemoveStream(const char* method, FakeStreamingCall* call); void RemoveStream(const char* method, FakeStreamingCall* call);
@ -224,9 +218,10 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
const XdsBootstrap::XdsServer& server_; const XdsBootstrap::XdsServer& server_;
const bool auto_complete_messages_from_client_; const bool auto_complete_messages_from_client_;
const bool abort_on_undrained_messages_; const bool abort_on_undrained_messages_;
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_;
Mutex mu_; Mutex mu_;
CondVar cv_;
std::set<RefCountedPtr<ConnectivityFailureWatcher>> watchers_ std::set<RefCountedPtr<ConnectivityFailureWatcher>> watchers_
ABSL_GUARDED_BY(&mu_); ABSL_GUARDED_BY(&mu_);
std::map<std::string /*method*/, RefCountedPtr<FakeStreamingCall>> std::map<std::string /*method*/, RefCountedPtr<FakeStreamingCall>>
@ -244,6 +239,9 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
RefCountedPtr<FakeXdsTransport> GetTransportLocked(const std::string& key) RefCountedPtr<FakeXdsTransport> GetTransportLocked(const std::string& key)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_;
Mutex mu_; Mutex mu_;
std::map<std::string /*XdsServer key*/, FakeXdsTransport*> transport_map_ std::map<std::string /*XdsServer key*/, FakeXdsTransport*> transport_map_
ABSL_GUARDED_BY(&mu_); ABSL_GUARDED_BY(&mu_);

Loading…
Cancel
Save