Second attempt: XdsClient: fix behavior for does-not-exist timer and stream termination (#31507)

* Revert "Revert "XdsClient: fix behavior for does-not-exist timer and stream termination (#31446)" (#31501)"

This reverts commit 824049da60.

* trigger does-not-exist timer on send_message completion instead of channel connectivity state

* reorganize and de-dup tests
pull/31519/head
Mark D. Roth 2 years ago committed by GitHub
parent a0fb351023
commit cc11f21ef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 150
      src/core/ext/xds/xds_client.cc
  2. 735
      test/core/xds/xds_client_test.cc
  3. 27
      test/core/xds/xds_transport_fake.cc
  4. 2
      test/core/xds/xds_transport_fake.h

@ -25,7 +25,6 @@
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
@ -178,10 +177,45 @@ class XdsClient::ChannelState::AdsCallState
Unref(DEBUG_LOCATION, "Orphan");
}
void MarkSubscriptionSendStarted()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
subscription_sent_ = true;
}
void MaybeMarkSubscriptionSendComplete(
RefCountedPtr<AdsCallState> ads_calld)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
if (subscription_sent_) MaybeStartTimer(std::move(ads_calld));
}
void MarkSeen() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
resource_seen_ = true;
MaybeCancelTimer();
}
void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
if (timer_handle_.has_value() &&
ads_calld_->xds_client()->engine()->Cancel(*timer_handle_)) {
timer_handle_.reset();
}
}
private:
void MaybeStartTimer(RefCountedPtr<AdsCallState> ads_calld)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
if (!timer_start_needed_) return;
timer_start_needed_ = false;
// Don't start timer if we've already either seen the resource or
// marked it as non-existing.
// Note: There are edge cases where we can have seen the resource
// before we have sent the initial subscription request, such as
// when we unsubscribe and then resubscribe to a given resource
// and then get a response containing that resource, all while a
// send_message op is in flight.
if (resource_seen_) return;
// Don't start timer if we haven't yet sent the initial subscription
// request for the resource.
if (!subscription_sent_) return;
// Don't start timer if it's already running.
if (timer_handle_.has_value()) return;
// Check if we already have a cached version of this resource
// (i.e., if this is the initial request for the resource after an
// ADS stream restart). If so, we don't start the timer, because
@ -202,48 +236,28 @@ class XdsClient::ChannelState::AdsCallState
});
}
void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
// If the timer hasn't been started yet, make sure we don't start
// it later. This can happen if the last watch for a resource is
// cancelled and then restarted, both while an ADS request is
// being sent (causing the unsubscription and then resubscription
// requests to be queued), and then we get a response that
// contains that resource. In that case, we would call
// MaybeCancelTimer() when we receive the response and then
// MaybeStartTimer() when we finally send the new request, thus
// causing the timer to fire when it shouldn't. For details,
// see https://github.com/grpc/grpc/issues/29583.
timer_start_needed_ = false;
if (timer_handle_.has_value()) {
ads_calld_->xds_client()->engine()->Cancel(*timer_handle_);
timer_handle_.reset();
}
}
private:
void OnTimer() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: timeout obtaining resource "
"{type=%s name=%s} from xds server",
ads_calld_->xds_client(),
ads_calld_->chand()->server_.server_uri().c_str(),
std::string(type_->type_url()).c_str(),
XdsClient::ConstructFullXdsResourceName(
name_.authority, type_->type_url(), name_.key)
.c_str());
}
{
MutexLock lock(&ads_calld_->xds_client()->mu_);
if (timer_handle_.has_value()) {
timer_handle_.reset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: timeout obtaining resource "
"{type=%s name=%s} from xds server",
ads_calld_->xds_client(),
ads_calld_->chand()->server_.server_uri().c_str(),
std::string(type_->type_url()).c_str(),
XdsClient::ConstructFullXdsResourceName(
name_.authority, type_->type_url(), name_.key)
.c_str());
}
auto& authority_state =
ads_calld_->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.key];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
state.watchers);
}
timer_handle_.reset();
resource_seen_ = true;
auto& authority_state =
ads_calld_->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.key];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
state.watchers);
}
ads_calld_->xds_client()->work_serializer_.DrainQueue();
ads_calld_.reset();
@ -253,7 +267,13 @@ class XdsClient::ChannelState::AdsCallState
const XdsResourceName name_;
RefCountedPtr<AdsCallState> ads_calld_;
bool timer_start_needed_ ABSL_GUARDED_BY(&XdsClient::mu_) = true;
// True if we have sent the initial subscription request for this
// resource on this ADS stream.
bool subscription_sent_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
// True if we have either (a) seen the resource in a response on this
// stream or (b) declared the resource to not exist due to the timer
// firing.
bool resource_seen_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
absl::optional<EventEngine::TaskHandle> timer_handle_
ABSL_GUARDED_BY(&XdsClient::mu_);
};
@ -308,7 +328,9 @@ class XdsClient::ChannelState::AdsCallState
bool sent_initial_message_ = false;
bool seen_response_ = false;
bool send_message_pending_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
const XdsResourceType* send_message_pending_
ABSL_GUARDED_BY(&XdsClient::mu_) = nullptr;
// Resource types for which requests need to be sent.
std::set<const XdsResourceType*> buffered_requests_;
@ -768,7 +790,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
if (it != timer_it->second.subscribed_resources.end()) {
auto res_it = it->second.find(parsed_resource_name->key);
if (res_it != it->second.end()) {
res_it->second->MaybeCancelTimer();
res_it->second->MarkSeen();
}
}
}
@ -919,7 +941,7 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
const XdsResourceType* type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
// Buffer message sending if an existing message is in flight.
if (send_message_pending_) {
if (send_message_pending_ != nullptr) {
buffered_requests_.insert(type);
return;
}
@ -939,7 +961,7 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
}
state.status = absl::OkStatus();
call_->SendMessage(std::move(serialized_message));
send_message_pending_ = true;
send_message_pending_ = type;
}
void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
@ -972,7 +994,18 @@ bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
void XdsClient::ChannelState::AdsCallState::OnRequestSent(bool ok) {
MutexLock lock(&xds_client()->mu_);
send_message_pending_ = false;
// For each resource that was in the message we just sent, start the
// resource timer if needed.
if (ok) {
auto& resource_type_state = state_map_[send_message_pending_];
for (const auto& p : resource_type_state.subscribed_resources) {
for (auto& q : p.second) {
q.second->MaybeMarkSubscriptionSendComplete(
Ref(DEBUG_LOCATION, "ResourceTimer"));
}
}
}
send_message_pending_ = nullptr;
if (ok && IsCurrentCallOnChannel()) {
// Continue to send another pending message if any.
// TODO(roth): The current code to handle buffered messages has the
@ -1104,13 +1137,26 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
xds_client(), chand()->server_.server_uri().c_str(), chand(),
this, call_.get(), status.ToString().c_str());
}
// Cancel any does-not-exist timers that may be pending.
for (const auto& p : state_map_) {
for (const auto& q : p.second.subscribed_resources) {
for (auto& r : q.second) {
r.second->MaybeCancelTimer();
}
}
}
// Ignore status from a stale call.
if (IsCurrentCallOnChannel()) {
// Try to restart the call.
parent_->OnCallFinishedLocked();
// Send error to all watchers for the channel.
chand()->SetChannelStatusLocked(absl::UnavailableError(absl::StrFormat(
"xDS call failed; status: %s", status.ToString().c_str())));
// If we didn't receive a response on the stream, report the
// stream failure as a connectivity failure, which will report the
// error to all watchers of resources on this channel.
if (!seen_response_) {
chand()->SetChannelStatusLocked(absl::UnavailableError(
absl::StrCat("xDS call failed with no responses received; status: ",
status.ToString())));
}
}
}
xds_client()->work_serializer_.DrainQueue();
@ -1136,7 +1182,7 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName(
authority, type->type_url(), resource_key));
OrphanablePtr<ResourceTimer>& resource_timer = p.second;
resource_timer->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceTimer"));
resource_timer->MarkSubscriptionSendStarted();
}
}
}

@ -236,6 +236,7 @@ class XdsClientTest : public ::testing::Test {
all_resources_required_in_sotw>,
ResourceStruct>::WatcherInterface {
public:
// Returns true if no event is received during the timeout period.
bool ExpectNoEvent(absl::Duration timeout) {
MutexLock lock(&mu_);
return !WaitForEventLocked(timeout);
@ -323,6 +324,8 @@ class XdsClientTest : public ::testing::Test {
cv_.Signal();
}
// Returns true if an event was received, or false if the timeout
// expires before any event is received.
bool WaitForEventLocked(absl::Duration timeout)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) {
while (queue_.empty()) {
@ -562,13 +565,12 @@ class XdsClientTest : public ::testing::Test {
// specified bootstrap config.
void InitXdsClient(
FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(),
Duration resource_request_timeout = Duration::Seconds(15) *
grpc_test_slowdown_factor()) {
Duration resource_request_timeout = Duration::Seconds(15)) {
auto transport_factory = MakeOrphanable<FakeXdsTransportFactory>();
transport_factory_ = transport_factory->Ref();
xds_client_ = MakeRefCounted<XdsClient>(bootstrap_builder.Build(),
std::move(transport_factory),
resource_request_timeout);
xds_client_ = MakeRefCounted<XdsClient>(
bootstrap_builder.Build(), std::move(transport_factory),
resource_request_timeout * grpc_test_slowdown_factor());
}
// Starts and cancels a watch for a Foo resource.
@ -628,7 +630,8 @@ class XdsClientTest : public ::testing::Test {
absl::Status status) {
const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server);
GPR_ASSERT(xds_server != nullptr);
transport_factory_->TriggerConnectionFailure(*xds_server, status);
transport_factory_->TriggerConnectionFailure(*xds_server,
std::move(status));
}
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream(
@ -1427,178 +1430,6 @@ TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) {
}
}
TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1));
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Do not send a response, but wait for the resource to be reported as
// not existing.
EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5)));
// Start a new watcher for the same resource. It should immediately
// receive the same does-not-exist notification.
auto watcher2 = StartFooWatch("foo1");
EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1)));
// Now server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// XdsClient should have delivered the response to the watchers.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
CancelFooWatch(watcher2.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
}
}
// In https://github.com/grpc/grpc/issues/29583, we ran into a case
// where we wound up starting a timer after we had already received the
// resource, thus incorrectly reporting the resource as not existing.
// This happened when unsubscribing and then resubscribing to the same
// resource a send_message op was already in flight and then receiving an
// update containing that resource.
TEST_F(XdsClientTest,
ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending) {
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1));
// Tell transport to let us manually trigger completion of the
// send_message ops to XdsClient.
transport_factory_->SetAutoCompleteMessagesFromClient(false);
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
stream->CompleteSendMessageFromClient();
// Server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// XdsClient should have delivered the response to the watchers.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
stream->CompleteSendMessageFromClient();
// Start a watch for a second resource.
auto watcher2 = StartFooWatch("foo2");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher2->HasEvent());
// XdsClient sends a request to subscribe to the new resource.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"});
// NOTE: We do NOT yet tell the XdsClient that the send_message op is
// complete.
// Unsubscribe from foo1 and then re-subscribe to it.
CancelFooWatch(watcher.get(), "foo1");
watcher = StartFooWatch("foo1");
// Now send a response from the server containing both foo1 and foo2.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("B")
.AddFooResource(XdsFooResource("foo1", 6))
.AddFooResource(XdsFooResource("foo2", 7))
.Serialize());
// The watcher for foo1 will receive an update even if the resource
// has not changed, since the previous value was removed from the
// cache when we unsubscribed.
resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// For foo2, the watcher should receive notification for the new resource.
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo2");
EXPECT_EQ(resource->value, 7);
// Now we finally tell XdsClient that its previous send_message op is
// complete.
stream->CompleteSendMessageFromClient();
// XdsClient should send an ACK with the updated subscription list
// (which happens to be identical to the old list), and it should not
// restart the does-not-exist timer.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"B",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"});
stream->CompleteSendMessageFromClient();
// Make sure the watcher for foo1 does not see a does-not-exist event.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5)));
// Cancel watches.
CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
CancelFooWatch(watcher2.get(), "foo2");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"B",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
}
}
// This tests resource removal triggered by the server when using a
// resource type that requires all resources to be present in every
// response, similar to LDS and CDS.
@ -1830,14 +1661,8 @@ TEST_F(XdsClientTest, StreamClosedByServer) {
/*resource_names=*/{"foo1"});
// Now server closes the stream.
stream->MaybeSendStatusToClient(absl::OkStatus());
// XdsClient should report error to watcher.
auto error = watcher->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed; "
"status: OK (node ID:xds_client_test)")
<< *error;
// XdsClient should NOT report error to watcher, because we saw a
// response on the stream before it failed.
// XdsClient should create a new stream.
stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
@ -1853,63 +1678,30 @@ TEST_F(XdsClientTest, StreamClosedByServer) {
CheckRequestNode(*request); // Should be present on the first request.
// Before the server resends the resource, start a new watcher for the
// same resource. This watcher should immediately receive the cached
// resource and then the error notification -- in that order.
// resource.
auto watcher2 = StartFooWatch("foo1");
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
error = watcher2->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed; "
"status: OK (node ID:xds_client_test)")
<< *error;
// Create a watcher for a new resource. This should immediately
// receive the cached channel error.
auto watcher3 = StartFooWatch("foo2");
error = watcher3->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed; "
"status: OK (node ID:xds_client_test)")
<< *error;
// And the client will send a new request subscribing to the new resource.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"});
// Server now sends the requested resources.
// Server now sends the requested resource.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("B")
.AddFooResource(XdsFooResource("foo1", 6))
.AddFooResource(XdsFooResource("foo2", 7))
.Serialize());
// Watchers for foo1 do NOT get an update, since the resource has not changed.
// Watcher does NOT get an update, since the resource has not changed.
EXPECT_FALSE(watcher->WaitForNextResource());
EXPECT_FALSE(watcher2->WaitForNextResource());
// The watcher for foo2 gets the newly delivered resource.
resource = watcher3->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo2");
EXPECT_EQ(resource->value, 7);
// XdsClient sends an ACK.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"B",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"});
// Cancel watchers.
CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
CancelFooWatch(watcher2.get(), "foo1", /*delay_unsubscription=*/true);
CancelFooWatch(watcher3.get(), "foo1");
/*resource_names=*/{"foo1"});
// Cancel watcher.
CancelFooWatch(watcher.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
@ -1920,10 +1712,8 @@ TEST_F(XdsClientTest, StreamClosedByServer) {
}
}
TEST_F(XdsClientTest, StreamClosedByServerAndResourcesNotResentOnNewStream) {
// Lower resources-does-not-exist timeout, to make sure that we're not
// triggering that here.
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
InitXdsClient();
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
@ -1939,70 +1729,69 @@ TEST_F(XdsClientTest, StreamClosedByServerAndResourcesNotResentOnNewStream) {
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Server sends a response.
// Server closes the stream without sending a response.
stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
// XdsClient should report an error to the watcher.
auto error = watcher->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed "
"with no responses received; status: UNAVAILABLE: ugh "
"(node ID:xds_client_test)")
<< *error;
// XdsClient should create a new stream.
stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient sends a subscription request.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Server now sends the requested resource.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// XdsClient should have delivered the response to the watcher.
// Watcher gets the resource.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
// XdsClient sends an ACK.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
// Now server closes the stream.
stream->MaybeSendStatusToClient(absl::OkStatus());
// XdsClient should report error to watcher.
auto error = watcher->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed; "
"status: OK (node ID:xds_client_test)")
<< *error;
// XdsClient should create a new stream.
stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient sends a subscription request.
// Note that the version persists from the previous stream, but the
// nonce does not.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Server does NOT send the resource again.
// Watcher should not get any update, since the resource has not changed.
// We wait 5s here to ensure we don't trigger the resource-does-not-exist
// timeout.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5)));
// Cancel watch.
// Cancel watcher.
CancelFooWatch(watcher.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"",
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
}
}
TEST_F(XdsClientTest, ConnectionFails) {
InitXdsClient();
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
// Lower resources-does-not-exist timeout, to make sure that we're not
// triggering that here.
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
// Tell transport to let us manually trigger completion of the
// send_message ops to XdsClient.
transport_factory_->SetAutoCompleteMessagesFromClient(false);
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
@ -2026,6 +1815,9 @@ TEST_F(XdsClientTest, ConnectionFails) {
"xDS channel for server default_xds_server: "
"connection failed (node ID:xds_client_test)")
<< *error;
// We should not see a resource-does-not-exist event, because the
// timer should not be running while the channel is disconnected.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
// Start a new watch. This watcher should be given the same error,
// since we have not yet recovered.
auto watcher2 = StartFooWatch("foo1");
@ -2036,11 +1828,11 @@ TEST_F(XdsClientTest, ConnectionFails) {
"xDS channel for server default_xds_server: "
"connection failed (node ID:xds_client_test)")
<< *error;
// Inside the XdsTransport interface, the channel will eventually
// reconnect, and the call will proceed. None of that will be visible
// to the XdsClient, because the call uses wait_for_ready. So here,
// to simulate the connection being established, all we need to do is
// allow the stream to proceed.
// Second watcher should not see resource-does-not-exist either.
EXPECT_FALSE(watcher2->HasEvent());
// The ADS stream uses wait_for_ready inside the XdsTransport interface,
// so when the channel reconnects, the already-started stream will proceed.
stream->CompleteSendMessageFromClient();
// Server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
@ -2060,6 +1852,407 @@ TEST_F(XdsClientTest, ConnectionFails) {
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
stream->CompleteSendMessageFromClient();
// Cancel watches.
CancelFooWatch(watcher.get(), "foo1");
CancelFooWatch(watcher2.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
stream->CompleteSendMessageFromClient();
}
}
TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1));
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Do not send a response, but wait for the resource to be reported as
// not existing.
EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5)));
// Start a new watcher for the same resource. It should immediately
// receive the same does-not-exist notification.
auto watcher2 = StartFooWatch("foo1");
EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1)));
// Now server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// XdsClient should have delivered the response to the watchers.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
CancelFooWatch(watcher2.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
}
}
TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) {
// Lower resources-does-not-exist timeout so test finishes faster.
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Stream fails.
stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
// XdsClient should report error to watcher.
auto error = watcher->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
EXPECT_EQ(error->message(),
"xDS channel for server default_xds_server: xDS call failed "
"with no responses received; status: UNAVAILABLE: ugh "
"(node ID:xds_client_test)")
<< *error;
// XdsClient should create a new stream.
stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient sends a subscription request.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Server does NOT send a response immediately.
// Client should receive a resource does-not-exist.
ASSERT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4)));
// Server now sends the requested resource.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// The resource is delivered to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient sends an ACK.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
// Cancel watcher.
CancelFooWatch(watcher.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
}
}
TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) {
// Lower resources-does-not-exist timeout, to make sure that we're not
// triggering that here.
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
// Tell transport to let us manually trigger completion of the
// send_message ops to XdsClient.
transport_factory_->SetAutoCompleteMessagesFromClient(false);
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Server does NOT send a response.
// We should not see a resource-does-not-exist event, because the
// timer should not be running while the channel is disconnected.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
// The ADS stream uses wait_for_ready inside the XdsTransport interface,
// so when the channel connects, the already-started stream will proceed.
stream->CompleteSendMessageFromClient();
// Server does NOT send a response.
// Watcher should see a does-not-exist event.
EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4)));
// Now server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
stream->CompleteSendMessageFromClient();
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
stream->CompleteSendMessageFromClient();
}
}
// In https://github.com/grpc/grpc/issues/29583, we ran into a case
// where we wound up starting a timer after we had already received the
// resource, thus incorrectly reporting the resource as not existing.
// This happened when unsubscribing and then resubscribing to the same
// resource a send_message op was already in flight and then receiving an
// update containing that resource.
TEST_F(XdsClientTest,
ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending) {
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1));
// Tell transport to let us manually trigger completion of the
// send_message ops to XdsClient.
transport_factory_->SetAutoCompleteMessagesFromClient(false);
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
stream->CompleteSendMessageFromClient();
// Server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// XdsClient should have delivered the response to the watchers.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
stream->CompleteSendMessageFromClient();
// Start a watch for a second resource.
auto watcher2 = StartFooWatch("foo2");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher2->HasEvent());
// XdsClient sends a request to subscribe to the new resource.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"});
// NOTE: We do NOT yet tell the XdsClient that the send_message op is
// complete.
// Unsubscribe from foo1 and then re-subscribe to it.
CancelFooWatch(watcher.get(), "foo1");
watcher = StartFooWatch("foo1");
// Now send a response from the server containing both foo1 and foo2.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("B")
.AddFooResource(XdsFooResource("foo1", 6))
.AddFooResource(XdsFooResource("foo2", 7))
.Serialize());
// The watcher for foo1 will receive an update even if the resource
// has not changed, since the previous value was removed from the
// cache when we unsubscribed.
resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// For foo2, the watcher should receive notification for the new resource.
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo2");
EXPECT_EQ(resource->value, 7);
// Now we finally tell XdsClient that its previous send_message op is
// complete.
stream->CompleteSendMessageFromClient();
// XdsClient should send an ACK with the updated subscription list
// (which happens to be identical to the old list), and it should not
// restart the does-not-exist timer.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"B",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"});
stream->CompleteSendMessageFromClient();
// Make sure the watcher for foo1 does not see a does-not-exist event.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5)));
// Cancel watches.
CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
CancelFooWatch(watcher2.get(), "foo2");
// The XdsClient may or may not send an unsubscription message
// before it closes the transport, depending on callback timing.
request = WaitForRequest(stream.get());
if (request.has_value()) {
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"B",
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
stream->CompleteSendMessageFromClient();
}
}
TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) {
// Lower resources-does-not-exist timeout, to make sure that we're not
// triggering that here.
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3));
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
auto request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
// Stream fails because of transport disconnection.
stream->MaybeSendStatusToClient(absl::UnavailableError("connection failed"));
// XdsClient should NOT report error to watcher, because we saw a
// response on the stream before it failed.
// XdsClient creates a new stream.
stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
// XdsClient should have sent a subscription request on the ADS stream.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
CheckRequestNode(*request); // Should be present on the first request.
// Server does NOT send a response.
// We should not see a resource-does-not-exist event, because the
// resource was already cached, so the server can optimize by not
// resending it.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
// Now server sends a response.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("1")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// Watcher will not see any update, since the resource is unchanged.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1)));
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),

@ -42,21 +42,20 @@ namespace grpc_core {
// FakeXdsTransportFactory::FakeStreamingCall
//
FakeXdsTransportFactory::FakeStreamingCall::~FakeStreamingCall() {
// Can't call event_handler_->OnStatusReceived() or unref event_handler_
// synchronously, since those operations will trigger code in
// XdsClient that acquires its mutex, but it was already holding its
// mutex when it called us, so it would deadlock.
GetDefaultEventEngine()->Run([event_handler = std::move(event_handler_),
status_sent = status_sent_]() mutable {
ExecCtx exec_ctx;
if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus());
event_handler.reset();
});
}
void FakeXdsTransportFactory::FakeStreamingCall::Orphan() {
{
MutexLock lock(&mu_);
// Can't call event_handler_->OnStatusReceived() or unref event_handler_
// synchronously, since those operations will trigger code in
// XdsClient that acquires its mutex, but it was already holding its
// mutex when it called us, so it would deadlock.
GetDefaultEventEngine()->Run([event_handler = std::move(event_handler_),
status_sent = status_sent_]() mutable {
ExecCtx exec_ctx;
if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus());
event_handler.reset();
});
status_sent_ = true;
}
transport_->RemoveStream(method_, this);
Unref();
}

@ -63,6 +63,8 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
event_handler_(MakeRefCounted<RefCountedEventHandler>(
std::move(event_handler))) {}
~FakeStreamingCall() override;
void Orphan() override;
using StreamingCall::Ref; // Make it public.

Loading…
Cancel
Save