XdsClient: fix behavior for does-not-exist timer and stream termination (#31446)

* XdsClient: don't run does-not-exist timer while channel is in TRANSIENT_FAILURE

* attempt to address test flakiness

* fix test cleanup

* add tests for long connection attempts

* improve comments

* add another test case

* test changes -- not all passing yet

* improve interface for connectivity state reporting

* report error on stream status only if no response seen

* iwyu

* fix test flakes
pull/31496/head
Mark D. Roth 2 years ago committed by GitHub
parent 49da06c3a1
commit 7cae004601
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 184
      src/core/ext/xds/xds_client.cc
  2. 3
      src/core/ext/xds/xds_client.h
  3. 38
      src/core/ext/xds/xds_transport.h
  4. 23
      src/core/ext/xds/xds_transport_grpc.cc
  5. 9
      src/core/ext/xds/xds_transport_grpc.h
  6. 502
      test/core/xds/xds_client_test.cc
  7. 85
      test/core/xds/xds_transport_fake.cc
  8. 51
      test/core/xds/xds_transport_fake.h

@ -25,7 +25,6 @@
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
@ -128,6 +127,9 @@ class XdsClient::ChannelState::AdsCallState
bool HasSubscribedResources() const;
void ChannelConnected() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void ChannelDisconnected() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
private:
class AdsResponseParser : public XdsApi::AdsResponseParserInterface {
public:
@ -178,10 +180,29 @@ class XdsClient::ChannelState::AdsCallState
Unref(DEBUG_LOCATION, "Orphan");
}
void MarkSubscriptionSent(RefCountedPtr<AdsCallState> ads_calld)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
subscription_sent_ = true;
MaybeStartTimer(std::move(ads_calld));
}
void MarkSeen() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
resource_seen_ = true;
MaybeCancelTimer();
}
void MaybeStartTimer(RefCountedPtr<AdsCallState> ads_calld)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
if (!timer_start_needed_) return;
timer_start_needed_ = false;
// Don't start timer if we've already either seen the resource or
// marked it as non-existing.
if (resource_seen_) return;
// Don't start timer if we haven't yet sent the initial subscription
// request for the resource.
if (!subscription_sent_) return;
// Don't start timer if the channel is not connected.
if (!ads_calld->chand()->channel_connected_) return;
// Don't start timer if it's already running.
if (timer_handle_.has_value()) return;
// Check if we already have a cached version of this resource
// (i.e., if this is the initial request for the resource after an
// ADS stream restart). If so, we don't start the timer, because
@ -203,29 +224,14 @@ class XdsClient::ChannelState::AdsCallState
}
void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
// If the timer hasn't been started yet, make sure we don't start
// it later. This can happen if the last watch for a resource is
// cancelled and then restarted, both while an ADS request is
// being sent (causing the unsubscription and then resubscription
// requests to be queued), and then we get a response that
// contains that resource. In that case, we would call
// MaybeCancelTimer() when we receive the response and then
// MaybeStartTimer() when we finally send the new request, thus
// causing the timer to fire when it shouldn't. For details,
// see https://github.com/grpc/grpc/issues/29583.
timer_start_needed_ = false;
if (timer_handle_.has_value()) {
ads_calld_->xds_client()->engine()->Cancel(*timer_handle_);
if (timer_handle_.has_value() &&
ads_calld_->xds_client()->engine()->Cancel(*timer_handle_)) {
timer_handle_.reset();
}
}
private:
void OnTimer() {
{
MutexLock lock(&ads_calld_->xds_client()->mu_);
if (timer_handle_.has_value()) {
timer_handle_.reset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: timeout obtaining resource "
@ -237,6 +243,10 @@ class XdsClient::ChannelState::AdsCallState
name_.authority, type_->type_url(), name_.key)
.c_str());
}
{
MutexLock lock(&ads_calld_->xds_client()->mu_);
timer_handle_.reset();
resource_seen_ = true;
auto& authority_state =
ads_calld_->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.key];
@ -244,7 +254,6 @@ class XdsClient::ChannelState::AdsCallState
ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
state.watchers);
}
}
ads_calld_->xds_client()->work_serializer_.DrainQueue();
ads_calld_.reset();
}
@ -253,7 +262,13 @@ class XdsClient::ChannelState::AdsCallState
const XdsResourceName name_;
RefCountedPtr<AdsCallState> ads_calld_;
bool timer_start_needed_ ABSL_GUARDED_BY(&XdsClient::mu_) = true;
// True if we have sent the initial subscription request for this
// resource on this ADS stream.
bool subscription_sent_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
// True if we have either (a) seen the resource in a response on this
// stream or (b) declared the resource to not exist due to the timer
// firing.
bool resource_seen_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
absl::optional<EventEngine::TaskHandle> timer_handle_
ABSL_GUARDED_BY(&XdsClient::mu_);
};
@ -410,6 +425,81 @@ class XdsClient::ChannelState::LrsCallState
OrphanablePtr<Reporter> reporter_;
};
// Handles connectivity state reports from the transport.
class XdsClient::ChannelState::ConnectivityStateReporter
: public XdsTransportFactory::ConnectivityStateReporter {
public:
explicit ConnectivityStateReporter(
WeakRefCountedPtr<ChannelState> channel_state)
: channel_state_(std::move(channel_state)) {}
void ReportConnecting() override {
{
MutexLock lock(&channel_state_->xds_client_->mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds channel %p for server %s: "
"attempting to connect",
channel_state_->xds_client(), channel_state_.get(),
channel_state_->server_.server_uri().c_str());
}
SetChannelDisconnectedLocked();
}
channel_state_->xds_client_->work_serializer_.DrainQueue();
}
void ReportReady() override {
{
MutexLock lock(&channel_state_->xds_client_->mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds channel %p for server %s: connected",
channel_state_->xds_client(), channel_state_.get(),
channel_state_->server_.server_uri().c_str());
}
channel_state_->channel_connected_ = true;
// Notify the ADS call of the connectivity state, so that it can
// start timers as needed.
if (channel_state_->ads_calld_ != nullptr) {
auto* calld = channel_state_->ads_calld_->calld();
if (calld != nullptr) calld->ChannelConnected();
}
}
channel_state_->xds_client_->work_serializer_.DrainQueue();
}
void ReportTransientFailure(absl::Status status) override {
{
MutexLock lock(&channel_state_->xds_client_->mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds channel %p for server %s: "
"connectivity failed: %s",
channel_state_->xds_client(), channel_state_.get(),
channel_state_->server_.server_uri().c_str(),
status.ToString().c_str());
}
SetChannelDisconnectedLocked();
channel_state_->SetChannelStatusLocked(std::move(status));
}
channel_state_->xds_client_->work_serializer_.DrainQueue();
}
private:
void SetChannelDisconnectedLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
channel_state_->channel_connected_ = false;
// Notify the ADS call of the disconnection, so that it can stop
// timers as needed.
if (channel_state_->ads_calld_ != nullptr) {
auto* calld = channel_state_->ads_calld_->calld();
if (calld != nullptr) calld->ChannelDisconnected();
}
}
WeakRefCountedPtr<ChannelState> channel_state_;
};
//
// XdsClient::ChannelState
//
@ -429,10 +519,8 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
absl::Status status;
transport_ = xds_client_->transport_factory_->Create(
server,
[self = WeakRef(DEBUG_LOCATION, "OnConnectivityFailure")](
absl::Status status) {
self->OnConnectivityFailure(std::move(status));
},
std::make_unique<ConnectivityStateReporter>(
WeakRef(DEBUG_LOCATION, "OnConnectivityChange")),
&status);
GPR_ASSERT(transport_ != nullptr);
if (!status.ok()) SetChannelStatusLocked(std::move(status));
@ -516,14 +604,6 @@ void XdsClient::ChannelState::UnsubscribeLocked(const XdsResourceType* type,
}
}
void XdsClient::ChannelState::OnConnectivityFailure(absl::Status status) {
{
MutexLock lock(&xds_client_->mu_);
SetChannelStatusLocked(std::move(status));
}
xds_client_->work_serializer_.DrainQueue();
}
void XdsClient::ChannelState::SetChannelStatusLocked(absl::Status status) {
if (shutting_down_) return;
status = absl::Status(status.code(), absl::StrCat("xDS channel for server ",
@ -768,7 +848,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
if (it != timer_it->second.subscribed_resources.end()) {
auto res_it = it->second.find(parsed_resource_name->key);
if (res_it != it->second.end()) {
res_it->second->MaybeCancelTimer();
res_it->second->MarkSeen();
}
}
}
@ -970,6 +1050,26 @@ bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
return false;
}
void XdsClient::ChannelState::AdsCallState::ChannelConnected() {
for (const auto& p : state_map_) {
for (const auto& q : p.second.subscribed_resources) {
for (auto& r : q.second) {
r.second->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceTimer"));
}
}
}
}
void XdsClient::ChannelState::AdsCallState::ChannelDisconnected() {
for (const auto& p : state_map_) {
for (const auto& q : p.second.subscribed_resources) {
for (auto& r : q.second) {
r.second->MaybeCancelTimer();
}
}
}
}
void XdsClient::ChannelState::AdsCallState::OnRequestSent(bool ok) {
MutexLock lock(&xds_client()->mu_);
send_message_pending_ = false;
@ -1108,9 +1208,14 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
if (IsCurrentCallOnChannel()) {
// Try to restart the call.
parent_->OnCallFinishedLocked();
// Send error to all watchers for the channel.
chand()->SetChannelStatusLocked(absl::UnavailableError(absl::StrFormat(
"xDS call failed; status: %s", status.ToString().c_str())));
// If we didn't receive a response on the stream, report the
// stream failure as a connectivity failure, which will report the
// error to all watchers of resources on this channel.
if (!seen_response_) {
chand()->SetChannelStatusLocked(absl::UnavailableError(
absl::StrCat("xDS call failed with no responses received; status: ",
status.ToString())));
}
}
}
xds_client()->work_serializer_.DrainQueue();
@ -1136,7 +1241,8 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName(
authority, type->type_url(), resource_key));
OrphanablePtr<ResourceTimer>& resource_timer = p.second;
resource_timer->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceTimer"));
resource_timer->MarkSubscriptionSent(
Ref(DEBUG_LOCATION, "ResourceTimer"));
}
}
}

@ -204,7 +204,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
private:
void OnConnectivityFailure(absl::Status status);
class ConnectivityStateReporter;
// Enqueues error notifications to watchers. Caller must drain
// XdsClient::work_serializer_ after releasing the lock.
@ -229,6 +229,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
resource_type_version_map_;
absl::Status status_;
bool channel_connected_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
};
struct ResourceState {

@ -19,7 +19,6 @@
#include <grpc/support/port_platform.h>
#include <functional>
#include <memory>
#include <string>
@ -70,14 +69,45 @@ class XdsTransportFactory : public InternallyRefCounted<XdsTransportFactory> {
virtual void ResetBackoff() = 0;
};
// An interface for reporting connectivity state events for a transport.
//
// A transport can be in one of three states:
// - CONNECTING: transport is attempting to establish a connection
// - READY: transport is connected and ready to start streams
// - TRANSIENT_FAILURE: connection attempt has failed
//
// A transport is assumed to start in state CONNECTING. Expected
// transitions:
// - CONNECTING -> READY (when connection is successfully established)
// - CONNECTING -> TRANSIENT_FAILURE (when the connection attempt fails)
// - TRANSIENT_FAILURE -> READY (when a connection attempt had failed
// but a subsequent attempt has succeeded)
// - READY -> CONNECTING (when an established connection fails)
//
// Note that a transport should not transition from TRANSIENT_FAILURE to
// CONNECTING; once the transport has failed a connection attempt, it
// should remain in TRANSIENT_FAILURE until a subsequent connection
// attempt succeeds.
class ConnectivityStateReporter {
public:
virtual ~ConnectivityStateReporter() = default;
// Invoked when the transport enters state CONNECTING.
virtual void ReportConnecting() = 0;
// Invoked when the transport enters state READY.
virtual void ReportReady() = 0;
// Invoked when the transport enters state TRANSIENT_FAILURE.
virtual void ReportTransientFailure(absl::Status status) = 0;
};
// Creates a new transport for the specified server.
// The on_connectivity_failure callback will be invoked whenever there is
// a connectivity failure on the transport.
// The transport will use connectivity_state_reporter to report its
// connectivity state.
// *status will be set if there is an error creating the channel,
// although the returned channel must still accept calls (which may fail).
virtual OrphanablePtr<XdsTransport> Create(
const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
std::unique_ptr<ConnectivityStateReporter> connectivity_state_reporter,
absl::Status* status) = 0;
};

@ -20,7 +20,6 @@
#include <string.h>
#include <functional>
#include <memory>
#include <utility>
@ -229,20 +228,24 @@ class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher
: public AsyncConnectivityStateWatcherInterface {
public:
explicit StateWatcher(
std::function<void(absl::Status)> on_connectivity_failure)
: on_connectivity_failure_(std::move(on_connectivity_failure)) {}
std::unique_ptr<ConnectivityStateReporter> connectivity_state_reporter)
: connectivity_state_reporter_(std::move(connectivity_state_reporter)) {}
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
on_connectivity_failure_(absl::Status(
if (new_state == GRPC_CHANNEL_READY) {
connectivity_state_reporter_->ReportReady();
} else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
connectivity_state_reporter_->ReportTransientFailure(absl::Status(
status.code(),
absl::StrCat("channel in TRANSIENT_FAILURE: ", status.message())));
} else { // IDLE or CONNECTING
connectivity_state_reporter_->ReportConnecting();
}
}
std::function<void(absl::Status)> on_connectivity_failure_;
std::unique_ptr<ConnectivityStateReporter> connectivity_state_reporter_;
};
//
@ -270,7 +273,7 @@ bool IsLameChannel(grpc_channel* channel) {
GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport(
GrpcXdsTransportFactory* factory, const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
std::unique_ptr<ConnectivityStateReporter> connectivity_state_reporter,
absl::Status* status)
: factory_(factory) {
channel_ = CreateXdsChannel(
@ -283,7 +286,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport(
ClientChannel* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr);
watcher_ = new StateWatcher(std::move(on_connectivity_failure));
watcher_ = new StateWatcher(std::move(connectivity_state_reporter));
client_channel->AddConnectivityWatcher(
GRPC_CHANNEL_IDLE,
OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
@ -347,10 +350,10 @@ GrpcXdsTransportFactory::~GrpcXdsTransportFactory() {
OrphanablePtr<XdsTransportFactory::XdsTransport>
GrpcXdsTransportFactory::Create(
const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
std::unique_ptr<ConnectivityStateReporter> connectivity_state_reporter,
absl::Status* status) {
return MakeOrphanable<GrpcXdsTransport>(
this, server, std::move(on_connectivity_failure), status);
this, server, std::move(connectivity_state_reporter), status);
}
} // namespace grpc_core

@ -19,7 +19,6 @@
#include <grpc/support/port_platform.h>
#include <functional>
#include <memory>
#include <string>
@ -51,7 +50,7 @@ class GrpcXdsTransportFactory : public XdsTransportFactory {
OrphanablePtr<XdsTransport> Create(
const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
std::unique_ptr<ConnectivityStateReporter> connectivity_state_reporter,
absl::Status* status) override;
grpc_pollset_set* interested_parties() const { return interested_parties_; }
@ -66,9 +65,9 @@ class GrpcXdsTransportFactory::GrpcXdsTransport
public:
class GrpcStreamingCall;
GrpcXdsTransport(GrpcXdsTransportFactory* factory,
const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
GrpcXdsTransport(
GrpcXdsTransportFactory* factory, const XdsBootstrap::XdsServer& server,
std::unique_ptr<ConnectivityStateReporter> connectivity_state_reporter,
absl::Status* status);
~GrpcXdsTransport() override;

@ -236,6 +236,7 @@ class XdsClientTest : public ::testing::Test {
all_resources_required_in_sotw>,
ResourceStruct>::WatcherInterface {
public:
// Returns true if no event is received during the timeout period.
bool ExpectNoEvent(absl::Duration timeout) {
MutexLock lock(&mu_);
return !WaitForEventLocked(timeout);
@ -323,6 +324,8 @@ class XdsClientTest : public ::testing::Test {
cv_.Signal();
}
// Returns true if an event was received, or false if the timeout
// expires before any event is received.
bool WaitForEventLocked(absl::Duration timeout)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) {
while (queue_.empty()) {
@ -562,13 +565,12 @@ class XdsClientTest : public ::testing::Test {
// specified bootstrap config.
void InitXdsClient(
FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(),
Duration resource_request_timeout = Duration::Seconds(15) *
grpc_test_slowdown_factor()) {
Duration resource_request_timeout = Duration::Seconds(15)) {
auto transport_factory = MakeOrphanable<FakeXdsTransportFactory>();
transport_factory_ = transport_factory->Ref();
xds_client_ = MakeRefCounted<XdsClient>(bootstrap_builder.Build(),
std::move(transport_factory),
resource_request_timeout);
xds_client_ = MakeRefCounted<XdsClient>(
bootstrap_builder.Build(), std::move(transport_factory),
resource_request_timeout * grpc_test_slowdown_factor());
}
// Starts and cancels a watch for a Foo resource.
@ -624,11 +626,23 @@ class XdsClientTest : public ::testing::Test {
timeout * grpc_test_slowdown_factor());
}
void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server,
void ReportConnecting(const XdsBootstrap::XdsServer& server) {
const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server);
GPR_ASSERT(xds_server != nullptr);
transport_factory_->ReportConnecting(*xds_server);
}
void ReportReady(const XdsBootstrap::XdsServer& server) {
const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server);
GPR_ASSERT(xds_server != nullptr);
transport_factory_->ReportReady(*xds_server);
}
void ReportTransientFailure(const XdsBootstrap::XdsServer& server,
absl::Status status) {
const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server);
GPR_ASSERT(xds_server != nullptr);
transport_factory_->TriggerConnectionFailure(*xds_server, status);
transport_factory_->ReportTransientFailure(*xds_server, std::move(status));
}
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream(
@ -1830,14 +1844,8 @@ TEST_F(XdsClientTest, StreamClosedByServer) {
/*resource_names=*/{"foo1"});
// Now server closes the stream.
stream->MaybeSendStatusToClient(absl::OkStatus());
// XdsClient should report error to watcher.
auto error = watcher->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed; "
"status: OK (node ID:xds_client_test)")
<< *error;
// XdsClient should NOT report error to watcher, because we saw a
// response on the stream before it failed.
// XdsClient should create a new stream.
stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
@ -1853,69 +1861,106 @@ TEST_F(XdsClientTest, StreamClosedByServer) {
CheckRequestNode(*request); // Should be present on the first request.
// Before the server resends the resource, start a new watcher for the
// same resource. This watcher should immediately receive the cached
// resource and then the error notification -- in that order.
// resource.
auto watcher2 = StartFooWatch("foo1");
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
error = watcher2->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed; "
"status: OK (node ID:xds_client_test)")
<< *error;
// Create a watcher for a new resource. This should immediately
// receive the cached channel error.
auto watcher3 = StartFooWatch("foo2");
error = watcher3->WaitForNextError();
// Server now sends the requested resource.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("B")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// Watcher does NOT get an update, since the resource has not changed.
EXPECT_FALSE(watcher->WaitForNextResource());
// XdsClient sends an ACK.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"B",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
// Cancel watcher.
CancelFooWatch(watcher.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"B",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
}
}
TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
InitXdsClient();
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Server closes the stream without sending a response.
stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
// XdsClient should report an error to the watcher.
auto error = watcher->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed; "
"status: OK (node ID:xds_client_test)")
"xDS channel for server default_xds_server: xDS call failed "
"with no responses received; status: UNAVAILABLE: ugh "
"(node ID:xds_client_test)")
<< *error;
// And the client will send a new request subscribing to the new resource.
// XdsClient should create a new stream.
stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient sends a subscription request.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"",
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"});
// Server now sends the requested resources.
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Server now sends the requested resource.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("B")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.AddFooResource(XdsFooResource("foo2", 7))
.Serialize());
// Watchers for foo1 do NOT get an update, since the resource has not changed.
EXPECT_FALSE(watcher->WaitForNextResource());
EXPECT_FALSE(watcher2->WaitForNextResource());
// The watcher for foo2 gets the newly delivered resource.
resource = watcher3->WaitForNextResource();
// Watcher gets the resource.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo2");
EXPECT_EQ(resource->value, 7);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient sends an ACK.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"B",
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"});
// Cancel watchers.
CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
CancelFooWatch(watcher2.get(), "foo1", /*delay_unsubscription=*/true);
CancelFooWatch(watcher3.get(), "foo1");
/*resource_names=*/{"foo1"});
// Cancel watcher.
CancelFooWatch(watcher.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"B",
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
}
}
@ -1960,14 +2005,8 @@ TEST_F(XdsClientTest, StreamClosedByServerAndResourcesNotResentOnNewStream) {
/*resource_names=*/{"foo1"});
// Now server closes the stream.
stream->MaybeSendStatusToClient(absl::OkStatus());
// XdsClient should report error to watcher.
auto error = watcher->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed; "
"status: OK (node ID:xds_client_test)")
<< *error;
// XdsClient should NOT report error to watcher, because we saw a
// response on the stream before it failed.
// XdsClient should create a new stream.
stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
@ -1998,8 +2037,91 @@ TEST_F(XdsClientTest, StreamClosedByServerAndResourcesNotResentOnNewStream) {
}
}
// This test is similar to the previous test, except that the resource
// is not received on the original stream before the stream fails. That
// difference means that we are not expecting the optimization where the
// server may not re-send the resource, which means that we instead
// trigger the does-not-exist timeout.
TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) {
// Lower resources-does-not-exist timeout so test finishes faster.
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Stream fails.
stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
// XdsClient should report error to watcher.
auto error = watcher->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed "
"with no responses received; status: UNAVAILABLE: ugh "
"(node ID:xds_client_test)")
<< *error;
// XdsClient should create a new stream.
stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient sends a subscription request.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Server does NOT send a response immediately.
// Client should receive a resource does-not-exist.
ASSERT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4)));
// Server now sends the requested resource.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// The resource is delivered to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient sends an ACK.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
// Cancel watcher.
CancelFooWatch(watcher.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
}
}
TEST_F(XdsClientTest, ConnectionFails) {
InitXdsClient();
// Lower resources-does-not-exist timeout, to make sure that we're not
// triggering that here.
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
// Tell transport not to immediately report that it is connected.
transport_factory_->SetAutoReportTransportReady(false);
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
@ -2016,7 +2138,7 @@ TEST_F(XdsClientTest, ConnectionFails) {
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Transport reports connection failure.
TriggerConnectionFailure(xds_client_->bootstrap().server(),
ReportTransientFailure(xds_client_->bootstrap().server(),
absl::UnavailableError("connection failed"));
// XdsClient should report an error to the watcher.
auto error = watcher->WaitForNextError();
@ -2026,6 +2148,9 @@ TEST_F(XdsClientTest, ConnectionFails) {
"xDS channel for server default_xds_server: "
"connection failed (node ID:xds_client_test)")
<< *error;
// We should not see a resource-does-not-exist event, because the
// timer should not be running while the channel is disconnected.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
// Start a new watch. This watcher should be given the same error,
// since we have not yet recovered.
auto watcher2 = StartFooWatch("foo1");
@ -2036,11 +2161,12 @@ TEST_F(XdsClientTest, ConnectionFails) {
"xDS channel for server default_xds_server: "
"connection failed (node ID:xds_client_test)")
<< *error;
// Inside the XdsTransport interface, the channel will eventually
// reconnect, and the call will proceed. None of that will be visible
// to the XdsClient, because the call uses wait_for_ready. So here,
// to simulate the connection being established, all we need to do is
// allow the stream to proceed.
// Second watcher should not see resource-does-not-exist either.
EXPECT_FALSE(watcher2->HasEvent());
// Report channel as having become connected.
ReportReady(xds_client_->bootstrap().server());
// The ADS stream uses wait_for_ready inside the XdsTransport interface,
// so when the channel reconnects, the already-started stream will proceed.
// Server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
@ -2060,6 +2186,252 @@ TEST_F(XdsClientTest, ConnectionFails) {
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
// Cancel watches.
CancelFooWatch(watcher.get(), "foo1");
CancelFooWatch(watcher2.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
}
}
TEST_F(XdsClientTest, LongConnectionAttempt) {
// Lower resources-does-not-exist timeout, to make sure that we're not
// triggering that here.
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
// Tell transport not to immediately report that it is connected.
transport_factory_->SetAutoReportTransportReady(false);
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// We should not see a resource-does-not-exist event, because the
// timer should not be running while the channel is disconnected.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
// Report channel as having become connected.
ReportReady(xds_client_->bootstrap().server());
// The ADS stream uses wait_for_ready inside the XdsTransport interface,
// so when the channel connects, the already-started stream will proceed.
// Server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
}
}
TEST_F(XdsClientTest, LongReconnectionAttempt) {
// Lower resources-does-not-exist timeout, to make sure that we're not
// triggering that here.
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
// Tell transport not to immediately report that it is connected.
transport_factory_->SetAutoReportTransportReady(false);
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Transport reports connection.
ReportReady(xds_client_->bootstrap().server());
// Server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
// Transport reports disconnection.
ReportConnecting(xds_client_->bootstrap().server());
// Stream fails because of transport disconnection.
stream->MaybeSendStatusToClient(absl::UnavailableError("connection failed"));
// XdsClient should NOT report error to watcher, because we saw a
// response on the stream before it failed.
// XdsClient creates a new stream.
stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Report channel as having become connected.
ReportReady(xds_client_->bootstrap().server());
// We should not see a resource-does-not-exist event, because the
// resource was already cached, so the server can optimize by not
// resending it.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
// The ADS stream uses wait_for_ready inside the XdsTransport interface,
// so when the channel connects, the already-started stream will proceed.
// Server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// Watcher will not see any update, since the resource is unchanged.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1)));
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
}
}
// This test is similar to the previous test, except that the resource
// is not received on the original stream before the stream fails. That
// difference means that we are not expecting the optimization where the
// server may not re-send the resource, which means that we instead
// trigger the does-not-exist timeout.
TEST_F(XdsClientTest, LongReconnectionAttemptBeforeResourceReceived) {
// Lower resources-does-not-exist timeout, to make sure that we're not
// triggering that here.
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
// Tell transport not to immediately report that it is connected.
transport_factory_->SetAutoReportTransportReady(false);
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Transport reports connected.
ReportReady(xds_client_->bootstrap().server());
// Server does NOT send a response.
// Now report channel as having become disconnected.
ReportConnecting(xds_client_->bootstrap().server());
// Stream fails because of transport disconnection.
stream->MaybeSendStatusToClient(absl::UnavailableError("connection failed"));
// XdsClient should report an error to the watcher.
auto error = watcher->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed "
"with no responses received; status: "
"UNAVAILABLE: connection failed (node ID:xds_client_test)")
<< *error;
// XdsClient creates a new stream.
stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// We should not see a resource-does-not-exist event, because the
// timer should not be running while the channel is disconnected.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
// Now report channel as having become connected.
ReportReady(xds_client_->bootstrap().server());
// The ADS stream uses wait_for_ready inside the XdsTransport interface,
// so when the channel connects, the already-started stream will proceed.
// Server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// XdsClient sends the resource to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
@ -2578,7 +2950,7 @@ TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{kXdstpResourceName});
// Now cause a channel failure on the stream to the authority's xDS server.
TriggerConnectionFailure(authority_server,
ReportTransientFailure(authority_server,
absl::UnavailableError("connection failed"));
// The watcher for the xdstp resource name should see the error.
auto error = watcher2->WaitForNextError();

@ -20,7 +20,6 @@
#include <stdint.h>
#include <functional>
#include <memory>
#include <utility>
@ -138,29 +137,69 @@ void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient(
// FakeXdsTransportFactory::FakeXdsTransport
//
void FakeXdsTransportFactory::FakeXdsTransport::TriggerConnectionFailure(
FakeXdsTransportFactory::FakeXdsTransport::FakeXdsTransport(
std::unique_ptr<ConnectivityStateReporter> connectivity_state_reporter,
bool auto_complete_messages_from_client, bool auto_report_transport_ready)
: auto_complete_messages_from_client_(auto_complete_messages_from_client),
connectivity_state_reporter_(std::move(connectivity_state_reporter)) {
if (auto_report_transport_ready) {
// Send connectivity change update indicating the channel is connected.
GetDefaultEventEngine()->Run(
[connectivity_state_reporter = connectivity_state_reporter_]() mutable {
ExecCtx exec_ctx;
connectivity_state_reporter->ReportReady();
connectivity_state_reporter.reset();
});
}
}
void FakeXdsTransportFactory::FakeXdsTransport::ReportConnecting() {
std::shared_ptr<ConnectivityStateReporter> connectivity_state_reporter;
{
MutexLock lock(&mu_);
connectivity_state_reporter = connectivity_state_reporter_;
}
ExecCtx exec_ctx;
if (connectivity_state_reporter != nullptr) {
connectivity_state_reporter->ReportConnecting();
}
}
void FakeXdsTransportFactory::FakeXdsTransport::ReportReady() {
std::shared_ptr<ConnectivityStateReporter> connectivity_state_reporter;
{
MutexLock lock(&mu_);
connectivity_state_reporter = connectivity_state_reporter_;
}
ExecCtx exec_ctx;
if (connectivity_state_reporter != nullptr) {
connectivity_state_reporter->ReportReady();
}
}
void FakeXdsTransportFactory::FakeXdsTransport::ReportTransientFailure(
absl::Status status) {
RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure;
std::shared_ptr<ConnectivityStateReporter> connectivity_state_reporter;
{
MutexLock lock(&mu_);
on_connectivity_failure = on_connectivity_failure_->Ref();
connectivity_state_reporter = connectivity_state_reporter_;
}
ExecCtx exec_ctx;
if (on_connectivity_failure != nullptr) {
on_connectivity_failure->Run(std::move(status));
if (connectivity_state_reporter != nullptr) {
connectivity_state_reporter->ReportTransientFailure(std::move(status));
}
}
void FakeXdsTransportFactory::FakeXdsTransport::Orphan() {
{
MutexLock lock(&mu_);
// Can't destroy on_connectivity_failure_ synchronously, since that
// Can't destroy on_connectivity_change_ synchronously, since that
// operation will trigger code in XdsClient that acquires its mutex, but
// it was already holding its mutex when it called us, so it would deadlock.
GetDefaultEventEngine()->Run([on_connectivity_failure = std::move(
on_connectivity_failure_)]() mutable {
GetDefaultEventEngine()->Run([connectivity_state_reporter = std::move(
connectivity_state_reporter_)]() mutable {
ExecCtx exec_ctx;
on_connectivity_failure.reset();
connectivity_state_reporter.reset();
});
}
Unref();
@ -211,21 +250,34 @@ constexpr char FakeXdsTransportFactory::kAdsV2Method[];
OrphanablePtr<XdsTransportFactory::XdsTransport>
FakeXdsTransportFactory::Create(
const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
std::unique_ptr<ConnectivityStateReporter> connectivity_state_reporter,
absl::Status* /*status*/) {
MutexLock lock(&mu_);
auto& entry = transport_map_[&server];
GPR_ASSERT(entry == nullptr);
auto transport = MakeOrphanable<FakeXdsTransport>(
std::move(on_connectivity_failure), auto_complete_messages_from_client_);
std::move(connectivity_state_reporter),
auto_complete_messages_from_client_, auto_report_transport_ready_);
entry = transport->Ref();
return transport;
}
void FakeXdsTransportFactory::TriggerConnectionFailure(
void FakeXdsTransportFactory::ReportConnecting(
const XdsBootstrap::XdsServer& server) {
auto transport = GetTransport(server);
transport->ReportConnecting();
}
void FakeXdsTransportFactory::ReportReady(
const XdsBootstrap::XdsServer& server) {
auto transport = GetTransport(server);
transport->ReportReady();
}
void FakeXdsTransportFactory::ReportTransientFailure(
const XdsBootstrap::XdsServer& server, absl::Status status) {
auto transport = GetTransport(server);
transport->TriggerConnectionFailure(std::move(status));
transport->ReportTransientFailure(std::move(status));
}
void FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient(bool value) {
@ -233,6 +285,11 @@ void FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient(bool value) {
auto_complete_messages_from_client_ = value;
}
void FakeXdsTransportFactory::SetAutoReportTransportReady(bool value) {
MutexLock lock(&mu_);
auto_report_transport_ready_ = value;
}
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server,
const char* method,

@ -20,7 +20,6 @@
#include <grpc/support/port_platform.h>
#include <deque>
#include <functional>
#include <map>
#include <memory>
#include <string>
@ -119,7 +118,9 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
using XdsTransportFactory::Ref; // Make it public.
void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server,
void ReportConnecting(const XdsBootstrap::XdsServer& server);
void ReportReady(const XdsBootstrap::XdsServer& server);
void ReportTransientFailure(const XdsBootstrap::XdsServer& server,
absl::Status status);
// By default, FakeStreamingCall will automatically invoke
@ -133,6 +134,17 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
// will not be affected.
void SetAutoCompleteMessagesFromClient(bool value);
// By default, FakeTransport will immediately report to the XdsClient
// that it is connected as soon as it is created. If this is set to
// false, that behavior will be inhibited, and the test must invoke
// ReportReady() to explicitly report to the XdsClient that
// connectivity has been established.
//
// This value affects all transports created after this call is
// complete. Any transport that already exists prior to this call
// will not be affected.
void SetAutoReportTransportReady(bool value);
RefCountedPtr<FakeStreamingCall> WaitForStream(
const XdsBootstrap::XdsServer& server, const char* method,
absl::Duration timeout);
@ -142,13 +154,10 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
private:
class FakeXdsTransport : public XdsTransport {
public:
FakeXdsTransport(std::function<void(absl::Status)> on_connectivity_failure,
bool auto_complete_messages_from_client)
: auto_complete_messages_from_client_(
auto_complete_messages_from_client),
on_connectivity_failure_(
MakeRefCounted<RefCountedOnConnectivityFailure>(
std::move(on_connectivity_failure))) {}
FakeXdsTransport(
std::unique_ptr<ConnectivityStateReporter> connectivity_state_reporter,
bool auto_complete_messages_from_client,
bool auto_report_transport_ready);
void Orphan() override;
@ -158,7 +167,9 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
using XdsTransport::Ref; // Make it public.
void TriggerConnectionFailure(absl::Status status);
void ReportConnecting();
void ReportReady();
void ReportTransientFailure(absl::Status status);
RefCountedPtr<FakeStreamingCall> WaitForStream(const char* method,
absl::Duration timeout);
@ -166,21 +177,6 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
void RemoveStream(const char* method, FakeStreamingCall* call);
private:
class RefCountedOnConnectivityFailure
: public RefCounted<RefCountedOnConnectivityFailure> {
public:
explicit RefCountedOnConnectivityFailure(
std::function<void(absl::Status)> on_connectivity_failure)
: on_connectivity_failure_(std::move(on_connectivity_failure)) {}
void Run(absl::Status status) {
on_connectivity_failure_(std::move(status));
}
private:
std::function<void(absl::Status)> on_connectivity_failure_;
};
OrphanablePtr<StreamingCall> CreateStreamingCall(
const char* method,
std::unique_ptr<StreamingCall::EventHandler> event_handler) override;
@ -191,7 +187,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
Mutex mu_;
CondVar cv_;
RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure_
std::shared_ptr<ConnectivityStateReporter> connectivity_state_reporter_
ABSL_GUARDED_BY(&mu_);
std::map<std::string /*method*/, RefCountedPtr<FakeStreamingCall>>
active_calls_ ABSL_GUARDED_BY(&mu_);
@ -199,7 +195,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
OrphanablePtr<XdsTransport> Create(
const XdsBootstrap::XdsServer& server,
std::function<void(absl::Status)> on_connectivity_failure,
std::unique_ptr<ConnectivityStateReporter> connectivity_state_reporter,
absl::Status* status) override;
RefCountedPtr<FakeXdsTransport> GetTransport(
@ -209,6 +205,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
std::map<const XdsBootstrap::XdsServer*, RefCountedPtr<FakeXdsTransport>>
transport_map_ ABSL_GUARDED_BY(&mu_);
bool auto_complete_messages_from_client_ ABSL_GUARDED_BY(&mu_) = true;
bool auto_report_transport_ready_ ABSL_GUARDED_BY(&mu_) = true;
};
} // namespace grpc_core

Loading…
Cancel
Save