update fake transport impl

pull/37605/head
Mark D. Roth 6 months ago
parent b465528150
commit 5109d251a9
  1. 11
      test/core/xds/xds_client_fuzzer.cc
  2. 70
      test/core/xds/xds_client_test.cc
  3. 71
      test/core/xds/xds_transport_fake.cc
  4. 58
      test/core/xds/xds_transport_fake.h

@ -61,13 +61,12 @@ class Fuzzer {
// Leave xds_client_ unset, so Act() will be a no-op.
return;
}
auto transport_factory = MakeOrphanable<FakeXdsTransportFactory>(
transport_factory_ = MakeRefCounted<FakeXdsTransportFactory>(
[]() { Crash("Multiple concurrent reads"); });
transport_factory->SetAutoCompleteMessagesFromClient(false);
transport_factory->SetAbortOnUndrainedMessages(false);
transport_factory_ = transport_factory.get();
transport_factory_->SetAutoCompleteMessagesFromClient(false);
transport_factory_->SetAbortOnUndrainedMessages(false);
xds_client_ = MakeRefCounted<XdsClient>(
std::move(*bootstrap), std::move(transport_factory),
std::move(*bootstrap), transport_factory_,
grpc_event_engine::experimental::GetDefaultEventEngine(),
/*metrics_reporter=*/nullptr, "foo agent", "foo version");
}
@ -322,7 +321,7 @@ class Fuzzer {
}
RefCountedPtr<XdsClient> xds_client_;
FakeXdsTransportFactory* transport_factory_;
RefCountedPtr<FakeXdsTransportFactory> transport_factory_;
// Maps of currently active watchers for each resource type, keyed by
// resource name.

@ -729,14 +729,12 @@ class XdsClientTest : public ::testing::Test {
void InitXdsClient(
FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(),
Duration resource_request_timeout = Duration::Seconds(15)) {
auto transport_factory = MakeOrphanable<FakeXdsTransportFactory>(
transport_factory_ = MakeRefCounted<FakeXdsTransportFactory>(
[]() { FAIL() << "Multiple concurrent reads"; });
transport_factory_ =
transport_factory->Ref().TakeAsSubclass<FakeXdsTransportFactory>();
auto metrics_reporter = std::make_unique<MetricsReporter>();
metrics_reporter_ = metrics_reporter.get();
xds_client_ = MakeRefCounted<XdsClient>(
bootstrap_builder.Build(), std::move(transport_factory),
bootstrap_builder.Build(), transport_factory_,
grpc_event_engine::experimental::GetDefaultEventEngine(),
std::move(metrics_reporter), "foo agent", "foo version",
resource_request_timeout * grpc_test_slowdown_factor());
@ -986,7 +984,7 @@ TEST_F(XdsClientTest, BasicWatch) {
/*resource_names=*/{"foo1"});
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
// Check metric data.
EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
::testing::ElementsAre(::testing::Pair(
@ -1081,7 +1079,7 @@ TEST_F(XdsClientTest, UpdateFromServer) {
/*resource_names=*/{"foo1"});
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, MultipleWatchersForSameResource) {
@ -1185,7 +1183,7 @@ TEST_F(XdsClientTest, MultipleWatchersForSameResource) {
ASSERT_FALSE(WaitForRequest(stream.get()));
// Now cancel the second watcher.
CancelFooWatch(watcher2.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, SubscribeToMultipleResources) {
@ -1319,7 +1317,7 @@ TEST_F(XdsClientTest, SubscribeToMultipleResources) {
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
// Now cancel watch for "foo2".
CancelFooWatch(watcher2.get(), "foo2");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) {
@ -1441,7 +1439,7 @@ TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) {
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
// Now cancel watch for "foo2".
CancelFooWatch(watcher2.get(), "foo2");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, ResourceValidationFailure) {
@ -1568,7 +1566,7 @@ TEST_F(XdsClientTest, ResourceValidationFailure) {
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
CancelFooWatch(watcher2.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) {
@ -1767,7 +1765,7 @@ TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) {
CancelFooWatch(watcher2.get(), "foo2", /*delay_unsubscription=*/true);
CancelFooWatch(watcher3.get(), "foo3", /*delay_unsubscription=*/true);
CancelFooWatch(watcher4.get(), "foo4");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) {
@ -1882,7 +1880,7 @@ TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) {
// Cancel watches.
CancelFooWatch(watcher.get(), "foo1");
CancelFooWatch(watcher2.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, WildcardCapableResponseWithEmptyResource) {
@ -1949,7 +1947,7 @@ TEST_F(XdsClientTest, WildcardCapableResponseWithEmptyResource) {
/*resource_names=*/{"wc1"});
// Cancel watch.
CancelWildcardCapableWatch(watcher.get(), "wc1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
// This tests resource removal triggered by the server when using a
@ -2079,7 +2077,7 @@ TEST_F(XdsClientTest, ResourceDeletion) {
// Cancel watch.
CancelWildcardCapableWatch(watcher.get(), "wc1");
CancelWildcardCapableWatch(watcher2.get(), "wc1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
// This tests that when we ignore resource deletions from the server
@ -2213,7 +2211,7 @@ TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) {
// Cancel watch.
CancelWildcardCapableWatch(watcher.get(), "wc1");
CancelWildcardCapableWatch(watcher2.get(), "wc1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, StreamClosedByServer) {
@ -2262,7 +2260,7 @@ TEST_F(XdsClientTest, StreamClosedByServer) {
// XdsClient should NOT report error to watcher, because we saw a
// response on the stream before it failed.
// Stream should be orphaned.
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
// Check metric data.
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
kDefaultXdsServerUrl, true)));
@ -2306,7 +2304,7 @@ TEST_F(XdsClientTest, StreamClosedByServer) {
// Cancel watcher.
CancelFooWatch(watcher.get(), "foo1");
CancelFooWatch(watcher2.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
@ -2390,7 +2388,7 @@ TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
/*resource_names=*/{"foo1"});
// Cancel watcher.
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, ConnectionFails) {
@ -2486,7 +2484,7 @@ TEST_F(XdsClientTest, ConnectionFails) {
// Cancel watches.
CancelFooWatch(watcher.get(), "foo1");
CancelFooWatch(watcher2.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
@ -2572,7 +2570,7 @@ TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
CancelFooWatch(watcher2.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) {
@ -2687,7 +2685,7 @@ TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) {
/*resource_names=*/{"foo1"});
// Cancel watcher.
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) {
@ -2765,7 +2763,7 @@ TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) {
stream->CompleteSendMessageFromClient();
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
// In https://github.com/grpc/grpc/issues/29583, we ran into a case
@ -2921,7 +2919,7 @@ TEST_F(XdsClientTest,
// Cancel watches.
CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
CancelFooWatch(watcher2.get(), "foo2");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) {
@ -3041,7 +3039,7 @@ TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) {
/*resource_names=*/{"foo1"});
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, ResourceWrappedInResourceMessage) {
@ -3096,7 +3094,7 @@ TEST_F(XdsClientTest, ResourceWrappedInResourceMessage) {
/*resource_names=*/{"foo1"});
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, MultipleResourceTypes) {
@ -3211,7 +3209,7 @@ TEST_F(XdsClientTest, MultipleResourceTypes) {
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
// Now cancel watch for "bar1".
CancelBarWatch(watcher2.get(), "bar1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, Federation) {
@ -3360,10 +3358,10 @@ TEST_F(XdsClientTest, Federation) {
/*resource_names=*/{kXdstpResourceName});
// Cancel watch for "foo1".
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
// Now cancel watch for xdstp resource name.
CancelFooWatch(watcher2.get(), kXdstpResourceName);
EXPECT_TRUE(stream2->Orphaned());
EXPECT_TRUE(stream2->IsOrphaned());
}
TEST_F(XdsClientTest, FederationAuthorityDefaultsToTopLevelXdsServer) {
@ -3486,7 +3484,7 @@ TEST_F(XdsClientTest, FederationAuthorityDefaultsToTopLevelXdsServer) {
/*resource_names=*/{kXdstpResourceName});
// Now cancel watch for xdstp resource name.
CancelFooWatch(watcher2.get(), kXdstpResourceName);
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, FederationWithUnknownAuthority) {
@ -3566,7 +3564,7 @@ TEST_F(XdsClientTest, FederationDisabledWithNewStyleName) {
/*resource_names=*/{kXdstpResourceName});
// Cancel watch.
CancelFooWatch(watcher.get(), kXdstpResourceName);
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
@ -3719,10 +3717,10 @@ TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
::testing::Pair(authority_server.server_uri(), 1))));
// Cancel watch for "foo1".
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
// Now cancel watch for xdstp resource name.
CancelFooWatch(watcher2.get(), kXdstpResourceName);
EXPECT_TRUE(stream2->Orphaned());
EXPECT_TRUE(stream2->IsOrphaned());
}
TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
@ -3808,7 +3806,7 @@ TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo2"});
CancelFooWatch(watcher2.get(), "foo2");
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
TEST_F(XdsClientTest, FallbackAndRecover) {
@ -4017,7 +4015,7 @@ TEST_F(XdsClientTest, FallbackAndRecover) {
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
kDefaultXdsServerUrl, true)));
// Result (remote): The stream to the fallback server has been orphaned.
EXPECT_TRUE(stream2->Orphaned());
EXPECT_TRUE(stream2->IsOrphaned());
// Result (local): Resources are delivered to watchers.
resource = watcher->WaitForNextResource();
ASSERT_NE(resource, nullptr);
@ -4038,7 +4036,7 @@ TEST_F(XdsClientTest, FallbackAndRecover) {
CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
CancelFooWatch(watcher2.get(), "foo2");
// Result (remote): The stream to the primary server has been orphaned.
EXPECT_TRUE(stream->Orphaned());
EXPECT_TRUE(stream->IsOrphaned());
}
// Test for both servers being unavailable
@ -4165,7 +4163,7 @@ TEST_F(XdsClientTest, FallbackOnStartup) {
.set_nonce("D")
.AddFooResource(XdsFooResource("foo1", 42))
.Serialize());
EXPECT_TRUE(fallback_stream->Orphaned());
EXPECT_TRUE(fallback_stream->IsOrphaned());
resource = watcher->WaitForNextResource();
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");

@ -187,7 +187,7 @@ void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient(
event_handler->OnStatusReceived(std::move(status));
}
bool FakeXdsTransportFactory::FakeStreamingCall::Orphaned() {
bool FakeXdsTransportFactory::FakeStreamingCall::IsOrphaned() {
MutexLock lock(&mu_);
return orphaned_;
}
@ -198,18 +198,18 @@ bool FakeXdsTransportFactory::FakeStreamingCall::Orphaned() {
void FakeXdsTransportFactory::FakeXdsTransport::TriggerConnectionFailure(
absl::Status status) {
RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure;
std::set<RefCountedPtr<ConnectivityFailureWatcher>> watchers;
{
MutexLock lock(&mu_);
on_connectivity_failure = on_connectivity_failure_;
watchers = watchers_;
}
ExecCtx exec_ctx;
if (on_connectivity_failure != nullptr) {
on_connectivity_failure->Run(std::move(status));
for (const auto& watcher : watchers) {
watcher->OnConnectivityFailure(status);
}
}
void FakeXdsTransportFactory::FakeXdsTransport::Orphan() {
void FakeXdsTransportFactory::FakeXdsTransport::Orphaned() {
{
MutexLock lock(&factory_->mu_);
auto it = factory_->transport_map_.find(server_.Key());
@ -220,16 +220,14 @@ void FakeXdsTransportFactory::FakeXdsTransport::Orphan() {
factory_.reset();
{
MutexLock lock(&mu_);
// Can't destroy on_connectivity_failure_ synchronously, since that
// operation will trigger code in XdsClient that acquires its mutex, but
// it was already holding its mutex when it called us, so it would deadlock.
GetDefaultEventEngine()->Run([on_connectivity_failure = std::move(
on_connectivity_failure_)]() mutable {
// Can't destroy watchers synchronously, since that operation will trigger
// code in XdsClient that acquires its mutex, but it was already holding
// its mutex when it called us, so it would deadlock.
GetDefaultEventEngine()->Run([watchers = std::move(watchers_)]() mutable {
ExecCtx exec_ctx;
on_connectivity_failure.reset();
watchers.clear();
});
}
Unref();
}
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
@ -255,12 +253,24 @@ void FakeXdsTransportFactory::FakeXdsTransport::RemoveStream(
}
}
void FakeXdsTransportFactory::FakeXdsTransport::StartConnectivityFailureWatch(
RefCountedPtr<ConnectivityFailureWatcher> watcher) {
MutexLock lock(&mu_);
watchers_.insert(std::move(watcher));
}
void FakeXdsTransportFactory::FakeXdsTransport::StopConnectivityFailureWatch(
const RefCountedPtr<ConnectivityFailureWatcher>& watcher) {
MutexLock lock(&mu_);
watchers_.erase(watcher);
}
OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall(
const char* method,
std::unique_ptr<StreamingCall::EventHandler> event_handler) {
auto call = MakeOrphanable<FakeStreamingCall>(
RefAsSubclass<FakeXdsTransport>(), method, std::move(event_handler));
WeakRefAsSubclass<FakeXdsTransport>(), method, std::move(event_handler));
MutexLock lock(&mu_);
active_calls_[method] = call->Ref().TakeAsSubclass<FakeStreamingCall>();
cv_.Signal();
@ -274,19 +284,18 @@ FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall(
constexpr char FakeXdsTransportFactory::kAdsMethod[];
constexpr char FakeXdsTransportFactory::kLrsMethod[];
OrphanablePtr<XdsTransportFactory::XdsTransport>
FakeXdsTransportFactory::Create(
const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
absl::Status* /*status*/) {
RefCountedPtr<XdsTransportFactory::XdsTransport>
FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server,
absl::Status* /*status*/) {
std::string key = server.Key();
MutexLock lock(&mu_);
auto& entry = transport_map_[server.Key()];
CHECK(entry == nullptr);
auto transport = MakeOrphanable<FakeXdsTransport>(
RefAsSubclass<FakeXdsTransportFactory>(), server,
std::move(on_connectivity_failure), auto_complete_messages_from_client_,
abort_on_undrained_messages_);
entry = transport->Ref().TakeAsSubclass<FakeXdsTransport>();
auto transport = GetTransportLocked(key);
if (transport == nullptr) {
transport = MakeRefCounted<FakeXdsTransport>(
WeakRefAsSubclass<FakeXdsTransportFactory>(), server,
auto_complete_messages_from_client_, abort_on_undrained_messages_);
transport_map_.emplace(std::move(key), transport.get());
}
return transport;
}
@ -318,8 +327,16 @@ FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server,
RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport>
FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server) {
std::string key = server.Key();
MutexLock lock(&mu_);
return transport_map_[server.Key()];
return GetTransportLocked(key);
}
RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport>
FakeXdsTransportFactory::GetTransportLocked(const std::string& key) {
auto it = transport_map_.find(key);
if (it == transport_map_.end()) return nullptr;
return it->second->RefIfNonZero().TakeAsSubclass<FakeXdsTransport>();
}
} // namespace grpc_core

@ -57,7 +57,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
class FakeStreamingCall : public XdsTransport::StreamingCall {
public:
FakeStreamingCall(
RefCountedPtr<FakeXdsTransport> transport, const char* method,
WeakRefCountedPtr<FakeXdsTransport> transport, const char* method,
std::unique_ptr<StreamingCall::EventHandler> event_handler)
: transport_(std::move(transport)),
method_(method),
@ -68,6 +68,8 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
void Orphan() override;
bool IsOrphaned();
void StartRecvMessage() override;
using StreamingCall::Ref; // Make it public.
@ -86,8 +88,6 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
void SendMessageToClient(absl::string_view payload);
void MaybeSendStatusToClient(absl::Status status);
bool Orphaned();
bool WaitForReadsStarted(size_t expected, absl::Duration timeout) {
MutexLock lock(&mu_);
const absl::Time deadline = absl::Now() + timeout;
@ -124,7 +124,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
void MaybeDeliverMessageToClient();
RefCountedPtr<FakeXdsTransport> transport_;
WeakRefCountedPtr<FakeXdsTransport> transport_;
const char* method_;
Mutex mu_;
@ -144,8 +144,6 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
: too_many_pending_reads_callback_(
std::move(too_many_pending_reads_callback)) {}
using XdsTransportFactory::Ref; // Make it public.
void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server,
absl::Status status);
@ -174,26 +172,22 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
const XdsBootstrap::XdsServer& server, const char* method,
absl::Duration timeout);
void Orphan() override { Unref(); }
void Orphaned() override {}
private:
class FakeXdsTransport : public XdsTransport {
public:
FakeXdsTransport(RefCountedPtr<FakeXdsTransportFactory> factory,
FakeXdsTransport(WeakRefCountedPtr<FakeXdsTransportFactory> factory,
const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
bool auto_complete_messages_from_client,
bool abort_on_undrained_messages)
: factory_(std::move(factory)),
server_(server),
auto_complete_messages_from_client_(
auto_complete_messages_from_client),
abort_on_undrained_messages_(abort_on_undrained_messages),
on_connectivity_failure_(
MakeRefCounted<RefCountedOnConnectivityFailure>(
std::move(on_connectivity_failure))) {}
abort_on_undrained_messages_(abort_on_undrained_messages) {}
void Orphan() override;
void Orphaned() override;
bool auto_complete_messages_from_client() const {
return auto_complete_messages_from_client_;
@ -203,8 +197,6 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
return abort_on_undrained_messages_;
}
using XdsTransport::Ref; // Make it public.
void TriggerConnectionFailure(absl::Status status);
RefCountedPtr<FakeStreamingCall> WaitForStream(const char* method,
@ -217,20 +209,10 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
const XdsBootstrap::XdsServer* server() const { return &server_; }
private:
class RefCountedOnConnectivityFailure
: public RefCounted<RefCountedOnConnectivityFailure> {
public:
explicit RefCountedOnConnectivityFailure(
std::function<void(absl::Status)> on_connectivity_failure)
: on_connectivity_failure_(std::move(on_connectivity_failure)) {}
void Run(absl::Status status) {
on_connectivity_failure_(std::move(status));
}
private:
std::function<void(absl::Status)> on_connectivity_failure_;
};
void StartConnectivityFailureWatch(
RefCountedPtr<ConnectivityFailureWatcher> watcher) override;
void StopConnectivityFailureWatch(
const RefCountedPtr<ConnectivityFailureWatcher>& watcher) override;
OrphanablePtr<StreamingCall> CreateStreamingCall(
const char* method,
@ -238,29 +220,33 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
void ResetBackoff() override {}
RefCountedPtr<FakeXdsTransportFactory> factory_;
WeakRefCountedPtr<FakeXdsTransportFactory> factory_;
const XdsBootstrap::XdsServer& server_;
const bool auto_complete_messages_from_client_;
const bool abort_on_undrained_messages_;
Mutex mu_;
CondVar cv_;
RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure_
std::set<RefCountedPtr<ConnectivityFailureWatcher>> watchers_
ABSL_GUARDED_BY(&mu_);
std::map<std::string /*method*/, RefCountedPtr<FakeStreamingCall>>
active_calls_ ABSL_GUARDED_BY(&mu_);
};
OrphanablePtr<XdsTransport> Create(
// Returns an existing transport or creates a new one.
RefCountedPtr<XdsTransport> GetTransport(
const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
absl::Status* status) override;
absl::Status* /*status*/) override;
// Returns an existing transport, if any, or nullptr.
RefCountedPtr<FakeXdsTransport> GetTransport(
const XdsBootstrap::XdsServer& server);
RefCountedPtr<FakeXdsTransport> GetTransportLocked(const std::string& key)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
Mutex mu_;
std::map<std::string /*XdsServer key*/, RefCountedPtr<FakeXdsTransport>>
std::map<std::string /*XdsServer key*/, FakeXdsTransport*>
transport_map_ ABSL_GUARDED_BY(&mu_);
bool auto_complete_messages_from_client_ ABSL_GUARDED_BY(&mu_) = true;
bool abort_on_undrained_messages_ ABSL_GUARDED_BY(&mu_) = true;

Loading…
Cancel
Save