|
|
|
@ -236,7 +236,6 @@ class XdsClientTest : public ::testing::Test { |
|
|
|
|
all_resources_required_in_sotw>, |
|
|
|
|
ResourceStruct>::WatcherInterface { |
|
|
|
|
public: |
|
|
|
|
// Returns true if no event is received during the timeout period.
|
|
|
|
|
bool ExpectNoEvent(absl::Duration timeout) { |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
return !WaitForEventLocked(timeout); |
|
|
|
@ -324,8 +323,6 @@ class XdsClientTest : public ::testing::Test { |
|
|
|
|
cv_.Signal(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Returns true if an event was received, or false if the timeout
|
|
|
|
|
// expires before any event is received.
|
|
|
|
|
bool WaitForEventLocked(absl::Duration timeout) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { |
|
|
|
|
while (queue_.empty()) { |
|
|
|
@ -565,12 +562,13 @@ class XdsClientTest : public ::testing::Test { |
|
|
|
|
// specified bootstrap config.
|
|
|
|
|
void InitXdsClient( |
|
|
|
|
FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(), |
|
|
|
|
Duration resource_request_timeout = Duration::Seconds(15)) { |
|
|
|
|
Duration resource_request_timeout = Duration::Seconds(15) * |
|
|
|
|
grpc_test_slowdown_factor()) { |
|
|
|
|
auto transport_factory = MakeOrphanable<FakeXdsTransportFactory>(); |
|
|
|
|
transport_factory_ = transport_factory->Ref(); |
|
|
|
|
xds_client_ = MakeRefCounted<XdsClient>( |
|
|
|
|
bootstrap_builder.Build(), std::move(transport_factory), |
|
|
|
|
resource_request_timeout * grpc_test_slowdown_factor()); |
|
|
|
|
xds_client_ = MakeRefCounted<XdsClient>(bootstrap_builder.Build(), |
|
|
|
|
std::move(transport_factory), |
|
|
|
|
resource_request_timeout); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Starts and cancels a watch for a Foo resource.
|
|
|
|
@ -630,8 +628,7 @@ class XdsClientTest : public ::testing::Test { |
|
|
|
|
absl::Status status) { |
|
|
|
|
const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server); |
|
|
|
|
GPR_ASSERT(xds_server != nullptr); |
|
|
|
|
transport_factory_->TriggerConnectionFailure(*xds_server, |
|
|
|
|
std::move(status)); |
|
|
|
|
transport_factory_->TriggerConnectionFailure(*xds_server, status); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream( |
|
|
|
@ -1430,6 +1427,178 @@ TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) { |
|
|
|
|
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); |
|
|
|
|
// Start a watch for "foo1".
|
|
|
|
|
auto watcher = StartFooWatch("foo1"); |
|
|
|
|
// Watcher should initially not see any resource reported.
|
|
|
|
|
EXPECT_FALSE(watcher->HasEvent()); |
|
|
|
|
// XdsClient should have created an ADS stream.
|
|
|
|
|
auto stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
|
// XdsClient should have sent a subscription request on the ADS stream.
|
|
|
|
|
auto request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
// Do not send a response, but wait for the resource to be reported as
|
|
|
|
|
// not existing.
|
|
|
|
|
EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5))); |
|
|
|
|
// Start a new watcher for the same resource. It should immediately
|
|
|
|
|
// receive the same does-not-exist notification.
|
|
|
|
|
auto watcher2 = StartFooWatch("foo1"); |
|
|
|
|
EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1))); |
|
|
|
|
// Now server sends a response.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("A") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// XdsClient should have delivered the response to the watchers.
|
|
|
|
|
auto resource = watcher->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
resource = watcher2->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
// XdsClient should have sent an ACK message to the xDS server.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
// Cancel watch.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1"); |
|
|
|
|
CancelFooWatch(watcher2.get(), "foo1"); |
|
|
|
|
// The XdsClient may or may not send an unsubscription message
|
|
|
|
|
// before it closes the transport, depending on callback timing.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
if (request.has_value()) { |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// In https://github.com/grpc/grpc/issues/29583, we ran into a case
|
|
|
|
|
// where we wound up starting a timer after we had already received the
|
|
|
|
|
// resource, thus incorrectly reporting the resource as not existing.
|
|
|
|
|
// This happened when unsubscribing and then resubscribing to the same
|
|
|
|
|
// resource a send_message op was already in flight and then receiving an
|
|
|
|
|
// update containing that resource.
|
|
|
|
|
TEST_F(XdsClientTest, |
|
|
|
|
ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending) { |
|
|
|
|
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); |
|
|
|
|
// Tell transport to let us manually trigger completion of the
|
|
|
|
|
// send_message ops to XdsClient.
|
|
|
|
|
transport_factory_->SetAutoCompleteMessagesFromClient(false); |
|
|
|
|
// Start a watch for "foo1".
|
|
|
|
|
auto watcher = StartFooWatch("foo1"); |
|
|
|
|
// Watcher should initially not see any resource reported.
|
|
|
|
|
EXPECT_FALSE(watcher->HasEvent()); |
|
|
|
|
// XdsClient should have created an ADS stream.
|
|
|
|
|
auto stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
|
// XdsClient should have sent a subscription request on the ADS stream.
|
|
|
|
|
auto request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// Server sends a response.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("A") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// XdsClient should have delivered the response to the watchers.
|
|
|
|
|
auto resource = watcher->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
// XdsClient should have sent an ACK message to the xDS server.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// Start a watch for a second resource.
|
|
|
|
|
auto watcher2 = StartFooWatch("foo2"); |
|
|
|
|
// Watcher should initially not see any resource reported.
|
|
|
|
|
EXPECT_FALSE(watcher2->HasEvent()); |
|
|
|
|
// XdsClient sends a request to subscribe to the new resource.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1", "foo2"}); |
|
|
|
|
// NOTE: We do NOT yet tell the XdsClient that the send_message op is
|
|
|
|
|
// complete.
|
|
|
|
|
// Unsubscribe from foo1 and then re-subscribe to it.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1"); |
|
|
|
|
watcher = StartFooWatch("foo1"); |
|
|
|
|
// Now send a response from the server containing both foo1 and foo2.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("B") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.AddFooResource(XdsFooResource("foo2", 7)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// The watcher for foo1 will receive an update even if the resource
|
|
|
|
|
// has not changed, since the previous value was removed from the
|
|
|
|
|
// cache when we unsubscribed.
|
|
|
|
|
resource = watcher->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
// For foo2, the watcher should receive notification for the new resource.
|
|
|
|
|
resource = watcher2->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo2"); |
|
|
|
|
EXPECT_EQ(resource->value, 7); |
|
|
|
|
// Now we finally tell XdsClient that its previous send_message op is
|
|
|
|
|
// complete.
|
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// XdsClient should send an ACK with the updated subscription list
|
|
|
|
|
// (which happens to be identical to the old list), and it should not
|
|
|
|
|
// restart the does-not-exist timer.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"B", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1", "foo2"}); |
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// Make sure the watcher for foo1 does not see a does-not-exist event.
|
|
|
|
|
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5))); |
|
|
|
|
// Cancel watches.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); |
|
|
|
|
CancelFooWatch(watcher2.get(), "foo2"); |
|
|
|
|
// The XdsClient may or may not send an unsubscription message
|
|
|
|
|
// before it closes the transport, depending on callback timing.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
if (request.has_value()) { |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"B", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This tests resource removal triggered by the server when using a
|
|
|
|
|
// resource type that requires all resources to be present in every
|
|
|
|
|
// response, similar to LDS and CDS.
|
|
|
|
@ -1661,8 +1830,14 @@ TEST_F(XdsClientTest, StreamClosedByServer) { |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
// Now server closes the stream.
|
|
|
|
|
stream->MaybeSendStatusToClient(absl::OkStatus()); |
|
|
|
|
// XdsClient should NOT report error to watcher, because we saw a
|
|
|
|
|
// response on the stream before it failed.
|
|
|
|
|
// XdsClient should report error to watcher.
|
|
|
|
|
auto error = watcher->WaitForNextError(); |
|
|
|
|
ASSERT_TRUE(error.has_value()); |
|
|
|
|
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); |
|
|
|
|
EXPECT_EQ(error->message(), |
|
|
|
|
"xDS channel for server default_xds_server: xDS call failed; " |
|
|
|
|
"status: OK (node ID:xds_client_test)") |
|
|
|
|
<< *error; |
|
|
|
|
// XdsClient should create a new stream.
|
|
|
|
|
stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
@ -1678,30 +1853,63 @@ TEST_F(XdsClientTest, StreamClosedByServer) { |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
// Before the server resends the resource, start a new watcher for the
|
|
|
|
|
// same resource. This watcher should immediately receive the cached
|
|
|
|
|
// resource.
|
|
|
|
|
// resource and then the error notification -- in that order.
|
|
|
|
|
auto watcher2 = StartFooWatch("foo1"); |
|
|
|
|
resource = watcher2->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
// Server now sends the requested resource.
|
|
|
|
|
error = watcher2->WaitForNextError(); |
|
|
|
|
ASSERT_TRUE(error.has_value()); |
|
|
|
|
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); |
|
|
|
|
EXPECT_EQ(error->message(), |
|
|
|
|
"xDS channel for server default_xds_server: xDS call failed; " |
|
|
|
|
"status: OK (node ID:xds_client_test)") |
|
|
|
|
<< *error; |
|
|
|
|
// Create a watcher for a new resource. This should immediately
|
|
|
|
|
// receive the cached channel error.
|
|
|
|
|
auto watcher3 = StartFooWatch("foo2"); |
|
|
|
|
error = watcher3->WaitForNextError(); |
|
|
|
|
ASSERT_TRUE(error.has_value()); |
|
|
|
|
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); |
|
|
|
|
EXPECT_EQ(error->message(), |
|
|
|
|
"xDS channel for server default_xds_server: xDS call failed; " |
|
|
|
|
"status: OK (node ID:xds_client_test)") |
|
|
|
|
<< *error; |
|
|
|
|
// And the client will send a new request subscribing to the new resource.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1", "foo2"}); |
|
|
|
|
// Server now sends the requested resources.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("B") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.AddFooResource(XdsFooResource("foo2", 7)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// Watcher does NOT get an update, since the resource has not changed.
|
|
|
|
|
// Watchers for foo1 do NOT get an update, since the resource has not changed.
|
|
|
|
|
EXPECT_FALSE(watcher->WaitForNextResource()); |
|
|
|
|
EXPECT_FALSE(watcher2->WaitForNextResource()); |
|
|
|
|
// The watcher for foo2 gets the newly delivered resource.
|
|
|
|
|
resource = watcher3->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo2"); |
|
|
|
|
EXPECT_EQ(resource->value, 7); |
|
|
|
|
// XdsClient sends an ACK.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"B", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
// Cancel watcher.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1"); |
|
|
|
|
/*resource_names=*/{"foo1", "foo2"}); |
|
|
|
|
// Cancel watchers.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); |
|
|
|
|
CancelFooWatch(watcher2.get(), "foo1", /*delay_unsubscription=*/true); |
|
|
|
|
CancelFooWatch(watcher3.get(), "foo1"); |
|
|
|
|
// The XdsClient may or may not send an unsubscription message
|
|
|
|
|
// before it closes the transport, depending on callback timing.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
@ -1712,8 +1920,10 @@ TEST_F(XdsClientTest, StreamClosedByServer) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) { |
|
|
|
|
InitXdsClient(); |
|
|
|
|
TEST_F(XdsClientTest, StreamClosedByServerAndResourcesNotResentOnNewStream) { |
|
|
|
|
// Lower resources-does-not-exist timeout, to make sure that we're not
|
|
|
|
|
// triggering that here.
|
|
|
|
|
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); |
|
|
|
|
// Start a watch for "foo1".
|
|
|
|
|
auto watcher = StartFooWatch("foo1"); |
|
|
|
|
// Watcher should initially not see any resource reported.
|
|
|
|
@ -1729,66 +1939,67 @@ TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) { |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
// Server closes the stream without sending a response.
|
|
|
|
|
stream->MaybeSendStatusToClient(absl::UnavailableError("ugh")); |
|
|
|
|
// XdsClient should report an error to the watcher.
|
|
|
|
|
auto error = watcher->WaitForNextError(); |
|
|
|
|
ASSERT_TRUE(error.has_value()); |
|
|
|
|
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); |
|
|
|
|
EXPECT_EQ(error->message(), |
|
|
|
|
"xDS channel for server default_xds_server: xDS call failed " |
|
|
|
|
"with no responses received; status: UNAVAILABLE: ugh " |
|
|
|
|
"(node ID:xds_client_test)") |
|
|
|
|
<< *error; |
|
|
|
|
// XdsClient should create a new stream.
|
|
|
|
|
stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
|
// XdsClient sends a subscription request.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
// Server now sends the requested resource.
|
|
|
|
|
// Server sends a response.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("A") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// Watcher gets the resource.
|
|
|
|
|
// XdsClient should have delivered the response to the watcher.
|
|
|
|
|
auto resource = watcher->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
// XdsClient sends an ACK.
|
|
|
|
|
// XdsClient should have sent an ACK message to the xDS server.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
// Cancel watcher.
|
|
|
|
|
// Now server closes the stream.
|
|
|
|
|
stream->MaybeSendStatusToClient(absl::OkStatus()); |
|
|
|
|
// XdsClient should report error to watcher.
|
|
|
|
|
auto error = watcher->WaitForNextError(); |
|
|
|
|
ASSERT_TRUE(error.has_value()); |
|
|
|
|
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); |
|
|
|
|
EXPECT_EQ(error->message(), |
|
|
|
|
"xDS channel for server default_xds_server: xDS call failed; " |
|
|
|
|
"status: OK (node ID:xds_client_test)") |
|
|
|
|
<< *error; |
|
|
|
|
// XdsClient should create a new stream.
|
|
|
|
|
stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
|
// XdsClient sends a subscription request.
|
|
|
|
|
// Note that the version persists from the previous stream, but the
|
|
|
|
|
// nonce does not.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
// Server does NOT send the resource again.
|
|
|
|
|
// Watcher should not get any update, since the resource has not changed.
|
|
|
|
|
// We wait 5s here to ensure we don't trigger the resource-does-not-exist
|
|
|
|
|
// timeout.
|
|
|
|
|
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5))); |
|
|
|
|
// Cancel watch.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1"); |
|
|
|
|
// The XdsClient may or may not send an unsubscription message
|
|
|
|
|
// before it closes the transport, depending on callback timing.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
if (request.has_value()) { |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST_F(XdsClientTest, ConnectionFails) { |
|
|
|
|
// Lower resources-does-not-exist timeout, to make sure that we're not
|
|
|
|
|
// triggering that here.
|
|
|
|
|
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); |
|
|
|
|
// Tell transport to let us manually trigger completion of the
|
|
|
|
|
// send_message ops to XdsClient.
|
|
|
|
|
transport_factory_->SetAutoCompleteMessagesFromClient(false); |
|
|
|
|
InitXdsClient(); |
|
|
|
|
// Start a watch for "foo1".
|
|
|
|
|
auto watcher = StartFooWatch("foo1"); |
|
|
|
|
// Watcher should initially not see any resource reported.
|
|
|
|
@ -1815,9 +2026,6 @@ TEST_F(XdsClientTest, ConnectionFails) { |
|
|
|
|
"xDS channel for server default_xds_server: " |
|
|
|
|
"connection failed (node ID:xds_client_test)") |
|
|
|
|
<< *error; |
|
|
|
|
// We should not see a resource-does-not-exist event, because the
|
|
|
|
|
// timer should not be running while the channel is disconnected.
|
|
|
|
|
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); |
|
|
|
|
// Start a new watch. This watcher should be given the same error,
|
|
|
|
|
// since we have not yet recovered.
|
|
|
|
|
auto watcher2 = StartFooWatch("foo1"); |
|
|
|
@ -1828,11 +2036,11 @@ TEST_F(XdsClientTest, ConnectionFails) { |
|
|
|
|
"xDS channel for server default_xds_server: " |
|
|
|
|
"connection failed (node ID:xds_client_test)") |
|
|
|
|
<< *error; |
|
|
|
|
// Second watcher should not see resource-does-not-exist either.
|
|
|
|
|
EXPECT_FALSE(watcher2->HasEvent()); |
|
|
|
|
// The ADS stream uses wait_for_ready inside the XdsTransport interface,
|
|
|
|
|
// so when the channel reconnects, the already-started stream will proceed.
|
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// Inside the XdsTransport interface, the channel will eventually
|
|
|
|
|
// reconnect, and the call will proceed. None of that will be visible
|
|
|
|
|
// to the XdsClient, because the call uses wait_for_ready. So here,
|
|
|
|
|
// to simulate the connection being established, all we need to do is
|
|
|
|
|
// allow the stream to proceed.
|
|
|
|
|
// Server sends a response.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
@ -1852,407 +2060,6 @@ TEST_F(XdsClientTest, ConnectionFails) { |
|
|
|
|
// XdsClient should have sent an ACK message to the xDS server.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// Cancel watches.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1"); |
|
|
|
|
CancelFooWatch(watcher2.get(), "foo1"); |
|
|
|
|
// The XdsClient may or may not send an unsubscription message
|
|
|
|
|
// before it closes the transport, depending on callback timing.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
if (request.has_value()) { |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); |
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) { |
|
|
|
|
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); |
|
|
|
|
// Start a watch for "foo1".
|
|
|
|
|
auto watcher = StartFooWatch("foo1"); |
|
|
|
|
// Watcher should initially not see any resource reported.
|
|
|
|
|
EXPECT_FALSE(watcher->HasEvent()); |
|
|
|
|
// XdsClient should have created an ADS stream.
|
|
|
|
|
auto stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
|
// XdsClient should have sent a subscription request on the ADS stream.
|
|
|
|
|
auto request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
// Do not send a response, but wait for the resource to be reported as
|
|
|
|
|
// not existing.
|
|
|
|
|
EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5))); |
|
|
|
|
// Start a new watcher for the same resource. It should immediately
|
|
|
|
|
// receive the same does-not-exist notification.
|
|
|
|
|
auto watcher2 = StartFooWatch("foo1"); |
|
|
|
|
EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1))); |
|
|
|
|
// Now server sends a response.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("A") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// XdsClient should have delivered the response to the watchers.
|
|
|
|
|
auto resource = watcher->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
resource = watcher2->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
// XdsClient should have sent an ACK message to the xDS server.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
// Cancel watch.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1"); |
|
|
|
|
CancelFooWatch(watcher2.get(), "foo1"); |
|
|
|
|
// The XdsClient may or may not send an unsubscription message
|
|
|
|
|
// before it closes the transport, depending on callback timing.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
if (request.has_value()) { |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) { |
|
|
|
|
// Lower resources-does-not-exist timeout so test finishes faster.
|
|
|
|
|
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); |
|
|
|
|
// Start a watch for "foo1".
|
|
|
|
|
auto watcher = StartFooWatch("foo1"); |
|
|
|
|
// Watcher should initially not see any resource reported.
|
|
|
|
|
EXPECT_FALSE(watcher->HasEvent()); |
|
|
|
|
// XdsClient should have created an ADS stream.
|
|
|
|
|
auto stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
|
// XdsClient should have sent a subscription request on the ADS stream.
|
|
|
|
|
auto request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
// Stream fails.
|
|
|
|
|
stream->MaybeSendStatusToClient(absl::UnavailableError("ugh")); |
|
|
|
|
// XdsClient should report error to watcher.
|
|
|
|
|
auto error = watcher->WaitForNextError(); |
|
|
|
|
ASSERT_TRUE(error.has_value()); |
|
|
|
|
EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); |
|
|
|
|
EXPECT_EQ(error->message(), |
|
|
|
|
"xDS channel for server default_xds_server: xDS call failed " |
|
|
|
|
"with no responses received; status: UNAVAILABLE: ugh " |
|
|
|
|
"(node ID:xds_client_test)") |
|
|
|
|
<< *error; |
|
|
|
|
// XdsClient should create a new stream.
|
|
|
|
|
stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
|
// XdsClient sends a subscription request.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
// Server does NOT send a response immediately.
|
|
|
|
|
// Client should receive a resource does-not-exist.
|
|
|
|
|
ASSERT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4))); |
|
|
|
|
// Server now sends the requested resource.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("A") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// The resource is delivered to the watcher.
|
|
|
|
|
auto resource = watcher->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
// XdsClient sends an ACK.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
// Cancel watcher.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1"); |
|
|
|
|
// The XdsClient may or may not send an unsubscription message
|
|
|
|
|
// before it closes the transport, depending on callback timing.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
if (request.has_value()) { |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) { |
|
|
|
|
// Lower resources-does-not-exist timeout, to make sure that we're not
|
|
|
|
|
// triggering that here.
|
|
|
|
|
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); |
|
|
|
|
// Tell transport to let us manually trigger completion of the
|
|
|
|
|
// send_message ops to XdsClient.
|
|
|
|
|
transport_factory_->SetAutoCompleteMessagesFromClient(false); |
|
|
|
|
// Start a watch for "foo1".
|
|
|
|
|
auto watcher = StartFooWatch("foo1"); |
|
|
|
|
// Watcher should initially not see any resource reported.
|
|
|
|
|
EXPECT_FALSE(watcher->HasEvent()); |
|
|
|
|
// XdsClient should have created an ADS stream.
|
|
|
|
|
auto stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
|
// XdsClient should have sent a subscription request on the ADS stream.
|
|
|
|
|
auto request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
// Server does NOT send a response.
|
|
|
|
|
// We should not see a resource-does-not-exist event, because the
|
|
|
|
|
// timer should not be running while the channel is disconnected.
|
|
|
|
|
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); |
|
|
|
|
// The ADS stream uses wait_for_ready inside the XdsTransport interface,
|
|
|
|
|
// so when the channel connects, the already-started stream will proceed.
|
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// Server does NOT send a response.
|
|
|
|
|
// Watcher should see a does-not-exist event.
|
|
|
|
|
EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4))); |
|
|
|
|
// Now server sends a response.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("A") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// XdsClient should have delivered the response to the watcher.
|
|
|
|
|
auto resource = watcher->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
// XdsClient should have sent an ACK message to the xDS server.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// Cancel watch.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1"); |
|
|
|
|
// The XdsClient may or may not send an unsubscription message
|
|
|
|
|
// before it closes the transport, depending on callback timing.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
if (request.has_value()) { |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); |
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// In https://github.com/grpc/grpc/issues/29583, we ran into a case
|
|
|
|
|
// where we wound up starting a timer after we had already received the
|
|
|
|
|
// resource, thus incorrectly reporting the resource as not existing.
|
|
|
|
|
// This happened when unsubscribing and then resubscribing to the same
|
|
|
|
|
// resource a send_message op was already in flight and then receiving an
|
|
|
|
|
// update containing that resource.
|
|
|
|
|
TEST_F(XdsClientTest, |
|
|
|
|
ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending) { |
|
|
|
|
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); |
|
|
|
|
// Tell transport to let us manually trigger completion of the
|
|
|
|
|
// send_message ops to XdsClient.
|
|
|
|
|
transport_factory_->SetAutoCompleteMessagesFromClient(false); |
|
|
|
|
// Start a watch for "foo1".
|
|
|
|
|
auto watcher = StartFooWatch("foo1"); |
|
|
|
|
// Watcher should initially not see any resource reported.
|
|
|
|
|
EXPECT_FALSE(watcher->HasEvent()); |
|
|
|
|
// XdsClient should have created an ADS stream.
|
|
|
|
|
auto stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
|
// XdsClient should have sent a subscription request on the ADS stream.
|
|
|
|
|
auto request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// Server sends a response.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("A") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// XdsClient should have delivered the response to the watchers.
|
|
|
|
|
auto resource = watcher->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
// XdsClient should have sent an ACK message to the xDS server.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// Start a watch for a second resource.
|
|
|
|
|
auto watcher2 = StartFooWatch("foo2"); |
|
|
|
|
// Watcher should initially not see any resource reported.
|
|
|
|
|
EXPECT_FALSE(watcher2->HasEvent()); |
|
|
|
|
// XdsClient sends a request to subscribe to the new resource.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1", "foo2"}); |
|
|
|
|
// NOTE: We do NOT yet tell the XdsClient that the send_message op is
|
|
|
|
|
// complete.
|
|
|
|
|
// Unsubscribe from foo1 and then re-subscribe to it.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1"); |
|
|
|
|
watcher = StartFooWatch("foo1"); |
|
|
|
|
// Now send a response from the server containing both foo1 and foo2.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("B") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.AddFooResource(XdsFooResource("foo2", 7)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// The watcher for foo1 will receive an update even if the resource
|
|
|
|
|
// has not changed, since the previous value was removed from the
|
|
|
|
|
// cache when we unsubscribed.
|
|
|
|
|
resource = watcher->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
// For foo2, the watcher should receive notification for the new resource.
|
|
|
|
|
resource = watcher2->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo2"); |
|
|
|
|
EXPECT_EQ(resource->value, 7); |
|
|
|
|
// Now we finally tell XdsClient that its previous send_message op is
|
|
|
|
|
// complete.
|
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// XdsClient should send an ACK with the updated subscription list
|
|
|
|
|
// (which happens to be identical to the old list), and it should not
|
|
|
|
|
// restart the does-not-exist timer.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"B", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1", "foo2"}); |
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
// Make sure the watcher for foo1 does not see a does-not-exist event.
|
|
|
|
|
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5))); |
|
|
|
|
// Cancel watches.
|
|
|
|
|
CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); |
|
|
|
|
CancelFooWatch(watcher2.get(), "foo2"); |
|
|
|
|
// The XdsClient may or may not send an unsubscription message
|
|
|
|
|
// before it closes the transport, depending on callback timing.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
if (request.has_value()) { |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"B", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); |
|
|
|
|
stream->CompleteSendMessageFromClient(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) { |
|
|
|
|
// Lower resources-does-not-exist timeout, to make sure that we're not
|
|
|
|
|
// triggering that here.
|
|
|
|
|
InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); |
|
|
|
|
// Start a watch for "foo1".
|
|
|
|
|
auto watcher = StartFooWatch("foo1"); |
|
|
|
|
// Watcher should initially not see any resource reported.
|
|
|
|
|
EXPECT_FALSE(watcher->HasEvent()); |
|
|
|
|
// XdsClient should have created an ADS stream.
|
|
|
|
|
auto stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
|
// XdsClient should have sent a subscription request on the ADS stream.
|
|
|
|
|
auto request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
// Server sends a response.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("A") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// XdsClient should have delivered the response to the watcher.
|
|
|
|
|
auto resource = watcher->WaitForNextResource(); |
|
|
|
|
ASSERT_TRUE(resource.has_value()); |
|
|
|
|
EXPECT_EQ(resource->name, "foo1"); |
|
|
|
|
EXPECT_EQ(resource->value, 6); |
|
|
|
|
// XdsClient should have sent an ACK message to the xDS server.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
// Stream fails because of transport disconnection.
|
|
|
|
|
stream->MaybeSendStatusToClient(absl::UnavailableError("connection failed")); |
|
|
|
|
// XdsClient should NOT report error to watcher, because we saw a
|
|
|
|
|
// response on the stream before it failed.
|
|
|
|
|
// XdsClient creates a new stream.
|
|
|
|
|
stream = WaitForAdsStream(); |
|
|
|
|
ASSERT_TRUE(stream != nullptr); |
|
|
|
|
// XdsClient should have sent a subscription request on the ADS stream.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|
/*resource_names=*/{"foo1"}); |
|
|
|
|
CheckRequestNode(*request); // Should be present on the first request.
|
|
|
|
|
// Server does NOT send a response.
|
|
|
|
|
// We should not see a resource-does-not-exist event, because the
|
|
|
|
|
// resource was already cached, so the server can optimize by not
|
|
|
|
|
// resending it.
|
|
|
|
|
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); |
|
|
|
|
// Now server sends a response.
|
|
|
|
|
stream->SendMessageToClient( |
|
|
|
|
ResponseBuilder(XdsFooResourceType::Get()->type_url()) |
|
|
|
|
.set_version_info("1") |
|
|
|
|
.set_nonce("A") |
|
|
|
|
.AddFooResource(XdsFooResource("foo1", 6)) |
|
|
|
|
.Serialize()); |
|
|
|
|
// Watcher will not see any update, since the resource is unchanged.
|
|
|
|
|
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1))); |
|
|
|
|
// XdsClient should have sent an ACK message to the xDS server.
|
|
|
|
|
request = WaitForRequest(stream.get()); |
|
|
|
|
ASSERT_TRUE(request.has_value()); |
|
|
|
|
CheckRequest(*request, XdsFooResourceType::Get()->type_url(), |
|
|
|
|
/*version_info=*/"1", /*response_nonce=*/"A", |
|
|
|
|
/*error_detail=*/absl::OkStatus(), |
|
|
|
|