diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 59eb879bcc4..d538fbf3587 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -25,6 +25,7 @@ #include "absl/strings/match.h" #include "absl/strings/str_cat.h" +#include "absl/strings/str_format.h" #include "absl/strings/str_join.h" #include "absl/strings/str_split.h" #include "absl/strings/string_view.h" @@ -177,45 +178,10 @@ class XdsClient::ChannelState::AdsCallState Unref(DEBUG_LOCATION, "Orphan"); } - void MarkSubscriptionSendStarted() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { - subscription_sent_ = true; - } - - void MaybeMarkSubscriptionSendComplete( - RefCountedPtr ads_calld) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { - if (subscription_sent_) MaybeStartTimer(std::move(ads_calld)); - } - - void MarkSeen() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { - resource_seen_ = true; - MaybeCancelTimer(); - } - - void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { - if (timer_handle_.has_value() && - ads_calld_->xds_client()->engine()->Cancel(*timer_handle_)) { - timer_handle_.reset(); - } - } - - private: void MaybeStartTimer(RefCountedPtr ads_calld) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { - // Don't start timer if we've already either seen the resource or - // marked it as non-existing. - // Note: There are edge cases where we can have seen the resource - // before we have sent the initial subscription request, such as - // when we unsubscribe and then resubscribe to a given resource - // and then get a response containing that resource, all while a - // send_message op is in flight. - if (resource_seen_) return; - // Don't start timer if we haven't yet sent the initial subscription - // request for the resource. - if (!subscription_sent_) return; - // Don't start timer if it's already running. - if (timer_handle_.has_value()) return; + if (!timer_start_needed_) return; + timer_start_needed_ = false; // Check if we already have a cached version of this resource // (i.e., if this is the initial request for the resource after an // ADS stream restart). If so, we don't start the timer, because @@ -236,28 +202,48 @@ class XdsClient::ChannelState::AdsCallState }); } - void OnTimer() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] xds server %s: timeout obtaining resource " - "{type=%s name=%s} from xds server", - ads_calld_->xds_client(), - ads_calld_->chand()->server_.server_uri().c_str(), - std::string(type_->type_url()).c_str(), - XdsClient::ConstructFullXdsResourceName( - name_.authority, type_->type_url(), name_.key) - .c_str()); + void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { + // If the timer hasn't been started yet, make sure we don't start + // it later. This can happen if the last watch for a resource is + // cancelled and then restarted, both while an ADS request is + // being sent (causing the unsubscription and then resubscription + // requests to be queued), and then we get a response that + // contains that resource. In that case, we would call + // MaybeCancelTimer() when we receive the response and then + // MaybeStartTimer() when we finally send the new request, thus + // causing the timer to fire when it shouldn't. For details, + // see https://github.com/grpc/grpc/issues/29583. + timer_start_needed_ = false; + if (timer_handle_.has_value()) { + ads_calld_->xds_client()->engine()->Cancel(*timer_handle_); + timer_handle_.reset(); } + } + + private: + void OnTimer() { { MutexLock lock(&ads_calld_->xds_client()->mu_); - timer_handle_.reset(); - resource_seen_ = true; - auto& authority_state = - ads_calld_->xds_client()->authority_state_map_[name_.authority]; - ResourceState& state = authority_state.resource_map[type_][name_.key]; - state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist( - state.watchers); + if (timer_handle_.has_value()) { + timer_handle_.reset(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] xds server %s: timeout obtaining resource " + "{type=%s name=%s} from xds server", + ads_calld_->xds_client(), + ads_calld_->chand()->server_.server_uri().c_str(), + std::string(type_->type_url()).c_str(), + XdsClient::ConstructFullXdsResourceName( + name_.authority, type_->type_url(), name_.key) + .c_str()); + } + auto& authority_state = + ads_calld_->xds_client()->authority_state_map_[name_.authority]; + ResourceState& state = authority_state.resource_map[type_][name_.key]; + state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; + ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist( + state.watchers); + } } ads_calld_->xds_client()->work_serializer_.DrainQueue(); ads_calld_.reset(); @@ -267,13 +253,7 @@ class XdsClient::ChannelState::AdsCallState const XdsResourceName name_; RefCountedPtr ads_calld_; - // True if we have sent the initial subscription request for this - // resource on this ADS stream. - bool subscription_sent_ ABSL_GUARDED_BY(&XdsClient::mu_) = false; - // True if we have either (a) seen the resource in a response on this - // stream or (b) declared the resource to not exist due to the timer - // firing. - bool resource_seen_ ABSL_GUARDED_BY(&XdsClient::mu_) = false; + bool timer_start_needed_ ABSL_GUARDED_BY(&XdsClient::mu_) = true; absl::optional timer_handle_ ABSL_GUARDED_BY(&XdsClient::mu_); }; @@ -328,9 +308,7 @@ class XdsClient::ChannelState::AdsCallState bool sent_initial_message_ = false; bool seen_response_ = false; - - const XdsResourceType* send_message_pending_ - ABSL_GUARDED_BY(&XdsClient::mu_) = nullptr; + bool send_message_pending_ ABSL_GUARDED_BY(&XdsClient::mu_) = false; // Resource types for which requests need to be sent. std::set buffered_requests_; @@ -790,7 +768,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( if (it != timer_it->second.subscribed_resources.end()) { auto res_it = it->second.find(parsed_resource_name->key); if (res_it != it->second.end()) { - res_it->second->MarkSeen(); + res_it->second->MaybeCancelTimer(); } } } @@ -941,7 +919,7 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( const XdsResourceType* type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { // Buffer message sending if an existing message is in flight. - if (send_message_pending_ != nullptr) { + if (send_message_pending_) { buffered_requests_.insert(type); return; } @@ -961,7 +939,7 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( } state.status = absl::OkStatus(); call_->SendMessage(std::move(serialized_message)); - send_message_pending_ = type; + send_message_pending_ = true; } void XdsClient::ChannelState::AdsCallState::SubscribeLocked( @@ -994,18 +972,7 @@ bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const { void XdsClient::ChannelState::AdsCallState::OnRequestSent(bool ok) { MutexLock lock(&xds_client()->mu_); - // For each resource that was in the message we just sent, start the - // resource timer if needed. - if (ok) { - auto& resource_type_state = state_map_[send_message_pending_]; - for (const auto& p : resource_type_state.subscribed_resources) { - for (auto& q : p.second) { - q.second->MaybeMarkSubscriptionSendComplete( - Ref(DEBUG_LOCATION, "ResourceTimer")); - } - } - } - send_message_pending_ = nullptr; + send_message_pending_ = false; if (ok && IsCurrentCallOnChannel()) { // Continue to send another pending message if any. // TODO(roth): The current code to handle buffered messages has the @@ -1137,26 +1104,13 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived( xds_client(), chand()->server_.server_uri().c_str(), chand(), this, call_.get(), status.ToString().c_str()); } - // Cancel any does-not-exist timers that may be pending. - for (const auto& p : state_map_) { - for (const auto& q : p.second.subscribed_resources) { - for (auto& r : q.second) { - r.second->MaybeCancelTimer(); - } - } - } // Ignore status from a stale call. if (IsCurrentCallOnChannel()) { // Try to restart the call. parent_->OnCallFinishedLocked(); - // If we didn't receive a response on the stream, report the - // stream failure as a connectivity failure, which will report the - // error to all watchers of resources on this channel. - if (!seen_response_) { - chand()->SetChannelStatusLocked(absl::UnavailableError( - absl::StrCat("xDS call failed with no responses received; status: ", - status.ToString()))); - } + // Send error to all watchers for the channel. + chand()->SetChannelStatusLocked(absl::UnavailableError(absl::StrFormat( + "xDS call failed; status: %s", status.ToString().c_str()))); } } xds_client()->work_serializer_.DrainQueue(); @@ -1182,7 +1136,7 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName( authority, type->type_url(), resource_key)); OrphanablePtr& resource_timer = p.second; - resource_timer->MarkSubscriptionSendStarted(); + resource_timer->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceTimer")); } } } diff --git a/test/core/xds/xds_client_test.cc b/test/core/xds/xds_client_test.cc index e767d18309e..bde8173cb3b 100644 --- a/test/core/xds/xds_client_test.cc +++ b/test/core/xds/xds_client_test.cc @@ -236,7 +236,6 @@ class XdsClientTest : public ::testing::Test { all_resources_required_in_sotw>, ResourceStruct>::WatcherInterface { public: - // Returns true if no event is received during the timeout period. bool ExpectNoEvent(absl::Duration timeout) { MutexLock lock(&mu_); return !WaitForEventLocked(timeout); @@ -324,8 +323,6 @@ class XdsClientTest : public ::testing::Test { cv_.Signal(); } - // Returns true if an event was received, or false if the timeout - // expires before any event is received. bool WaitForEventLocked(absl::Duration timeout) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { while (queue_.empty()) { @@ -565,12 +562,13 @@ class XdsClientTest : public ::testing::Test { // specified bootstrap config. void InitXdsClient( FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(), - Duration resource_request_timeout = Duration::Seconds(15)) { + Duration resource_request_timeout = Duration::Seconds(15) * + grpc_test_slowdown_factor()) { auto transport_factory = MakeOrphanable(); transport_factory_ = transport_factory->Ref(); - xds_client_ = MakeRefCounted( - bootstrap_builder.Build(), std::move(transport_factory), - resource_request_timeout * grpc_test_slowdown_factor()); + xds_client_ = MakeRefCounted(bootstrap_builder.Build(), + std::move(transport_factory), + resource_request_timeout); } // Starts and cancels a watch for a Foo resource. @@ -630,8 +628,7 @@ class XdsClientTest : public ::testing::Test { absl::Status status) { const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server); GPR_ASSERT(xds_server != nullptr); - transport_factory_->TriggerConnectionFailure(*xds_server, - std::move(status)); + transport_factory_->TriggerConnectionFailure(*xds_server, status); } RefCountedPtr WaitForAdsStream( @@ -1430,6 +1427,178 @@ TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) { } } +TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) { + InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Do not send a response, but wait for the resource to be reported as + // not existing. + EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5))); + // Start a new watcher for the same resource. It should immediately + // receive the same does-not-exist notification. + auto watcher2 = StartFooWatch("foo1"); + EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1))); + // Now server sends a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource("foo1", 6)) + .Serialize()); + // XdsClient should have delivered the response to the watchers. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Cancel watch. + CancelFooWatch(watcher.get(), "foo1"); + CancelFooWatch(watcher2.get(), "foo1"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + +// In https://github.com/grpc/grpc/issues/29583, we ran into a case +// where we wound up starting a timer after we had already received the +// resource, thus incorrectly reporting the resource as not existing. +// This happened when unsubscribing and then resubscribing to the same +// resource a send_message op was already in flight and then receiving an +// update containing that resource. +TEST_F(XdsClientTest, + ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending) { + InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); + // Tell transport to let us manually trigger completion of the + // send_message ops to XdsClient. + transport_factory_->SetAutoCompleteMessagesFromClient(false); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + stream->CompleteSendMessageFromClient(); + // Server sends a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource("foo1", 6)) + .Serialize()); + // XdsClient should have delivered the response to the watchers. + auto resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + stream->CompleteSendMessageFromClient(); + // Start a watch for a second resource. + auto watcher2 = StartFooWatch("foo2"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher2->HasEvent()); + // XdsClient sends a request to subscribe to the new resource. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + // NOTE: We do NOT yet tell the XdsClient that the send_message op is + // complete. + // Unsubscribe from foo1 and then re-subscribe to it. + CancelFooWatch(watcher.get(), "foo1"); + watcher = StartFooWatch("foo1"); + // Now send a response from the server containing both foo1 and foo2. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("B") + .AddFooResource(XdsFooResource("foo1", 6)) + .AddFooResource(XdsFooResource("foo2", 7)) + .Serialize()); + // The watcher for foo1 will receive an update even if the resource + // has not changed, since the previous value was removed from the + // cache when we unsubscribed. + resource = watcher->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // For foo2, the watcher should receive notification for the new resource. + resource = watcher2->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo2"); + EXPECT_EQ(resource->value, 7); + // Now we finally tell XdsClient that its previous send_message op is + // complete. + stream->CompleteSendMessageFromClient(); + // XdsClient should send an ACK with the updated subscription list + // (which happens to be identical to the old list), and it should not + // restart the does-not-exist timer. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + stream->CompleteSendMessageFromClient(); + // Make sure the watcher for foo1 does not see a does-not-exist event. + EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5))); + // Cancel watches. + CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); + CancelFooWatch(watcher2.get(), "foo2"); + // The XdsClient may or may not send an unsubscription message + // before it closes the transport, depending on callback timing. + request = WaitForRequest(stream.get()); + if (request.has_value()) { + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); + } +} + // This tests resource removal triggered by the server when using a // resource type that requires all resources to be present in every // response, similar to LDS and CDS. @@ -1661,8 +1830,14 @@ TEST_F(XdsClientTest, StreamClosedByServer) { /*resource_names=*/{"foo1"}); // Now server closes the stream. stream->MaybeSendStatusToClient(absl::OkStatus()); - // XdsClient should NOT report error to watcher, because we saw a - // response on the stream before it failed. + // XdsClient should report error to watcher. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "xDS channel for server default_xds_server: xDS call failed; " + "status: OK (node ID:xds_client_test)") + << *error; // XdsClient should create a new stream. stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); @@ -1678,30 +1853,63 @@ TEST_F(XdsClientTest, StreamClosedByServer) { CheckRequestNode(*request); // Should be present on the first request. // Before the server resends the resource, start a new watcher for the // same resource. This watcher should immediately receive the cached - // resource. + // resource and then the error notification -- in that order. auto watcher2 = StartFooWatch("foo1"); resource = watcher2->WaitForNextResource(); ASSERT_TRUE(resource.has_value()); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); - // Server now sends the requested resource. + error = watcher2->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "xDS channel for server default_xds_server: xDS call failed; " + "status: OK (node ID:xds_client_test)") + << *error; + // Create a watcher for a new resource. This should immediately + // receive the cached channel error. + auto watcher3 = StartFooWatch("foo2"); + error = watcher3->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "xDS channel for server default_xds_server: xDS call failed; " + "status: OK (node ID:xds_client_test)") + << *error; + // And the client will send a new request subscribing to the new resource. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + // Server now sends the requested resources. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("B") .AddFooResource(XdsFooResource("foo1", 6)) + .AddFooResource(XdsFooResource("foo2", 7)) .Serialize()); - // Watcher does NOT get an update, since the resource has not changed. + // Watchers for foo1 do NOT get an update, since the resource has not changed. EXPECT_FALSE(watcher->WaitForNextResource()); + EXPECT_FALSE(watcher2->WaitForNextResource()); + // The watcher for foo2 gets the newly delivered resource. + resource = watcher3->WaitForNextResource(); + ASSERT_TRUE(resource.has_value()); + EXPECT_EQ(resource->name, "foo2"); + EXPECT_EQ(resource->value, 7); // XdsClient sends an ACK. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Cancel watcher. - CancelFooWatch(watcher.get(), "foo1"); + /*resource_names=*/{"foo1", "foo2"}); + // Cancel watchers. + CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); + CancelFooWatch(watcher2.get(), "foo1", /*delay_unsubscription=*/true); + CancelFooWatch(watcher3.get(), "foo1"); // The XdsClient may or may not send an unsubscription message // before it closes the transport, depending on callback timing. request = WaitForRequest(stream.get()); @@ -1712,8 +1920,10 @@ TEST_F(XdsClientTest, StreamClosedByServer) { } } -TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) { - InitXdsClient(); +TEST_F(XdsClientTest, StreamClosedByServerAndResourcesNotResentOnNewStream) { + // Lower resources-does-not-exist timeout, to make sure that we're not + // triggering that here. + InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. @@ -1729,66 +1939,67 @@ TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) { /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. - // Server closes the stream without sending a response. - stream->MaybeSendStatusToClient(absl::UnavailableError("ugh")); - // XdsClient should report an error to the watcher. - auto error = watcher->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "xDS channel for server default_xds_server: xDS call failed " - "with no responses received; status: UNAVAILABLE: ugh " - "(node ID:xds_client_test)") - << *error; - // XdsClient should create a new stream. - stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient sends a subscription request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Server now sends the requested resource. + // Server sends a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); - // Watcher gets the resource. + // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_TRUE(resource.has_value()); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); - // XdsClient sends an ACK. + // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); - // Cancel watcher. + // Now server closes the stream. + stream->MaybeSendStatusToClient(absl::OkStatus()); + // XdsClient should report error to watcher. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); + EXPECT_EQ(error->message(), + "xDS channel for server default_xds_server: xDS call failed; " + "status: OK (node ID:xds_client_test)") + << *error; + // XdsClient should create a new stream. + stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient sends a subscription request. + // Note that the version persists from the previous stream, but the + // nonce does not. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Server does NOT send the resource again. + // Watcher should not get any update, since the resource has not changed. + // We wait 5s here to ensure we don't trigger the resource-does-not-exist + // timeout. + EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5))); + // Cancel watch. CancelFooWatch(watcher.get(), "foo1"); // The XdsClient may or may not send an unsubscription message // before it closes the transport, depending on callback timing. request = WaitForRequest(stream.get()); if (request.has_value()) { CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", + /*version_info=*/"1", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); } } TEST_F(XdsClientTest, ConnectionFails) { - // Lower resources-does-not-exist timeout, to make sure that we're not - // triggering that here. - InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); - // Tell transport to let us manually trigger completion of the - // send_message ops to XdsClient. - transport_factory_->SetAutoCompleteMessagesFromClient(false); + InitXdsClient(); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. @@ -1815,9 +2026,6 @@ TEST_F(XdsClientTest, ConnectionFails) { "xDS channel for server default_xds_server: " "connection failed (node ID:xds_client_test)") << *error; - // We should not see a resource-does-not-exist event, because the - // timer should not be running while the channel is disconnected. - EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); // Start a new watch. This watcher should be given the same error, // since we have not yet recovered. auto watcher2 = StartFooWatch("foo1"); @@ -1828,11 +2036,11 @@ TEST_F(XdsClientTest, ConnectionFails) { "xDS channel for server default_xds_server: " "connection failed (node ID:xds_client_test)") << *error; - // Second watcher should not see resource-does-not-exist either. - EXPECT_FALSE(watcher2->HasEvent()); - // The ADS stream uses wait_for_ready inside the XdsTransport interface, - // so when the channel reconnects, the already-started stream will proceed. - stream->CompleteSendMessageFromClient(); + // Inside the XdsTransport interface, the channel will eventually + // reconnect, and the call will proceed. None of that will be visible + // to the XdsClient, because the call uses wait_for_ready. So here, + // to simulate the connection being established, all we need to do is + // allow the stream to proceed. // Server sends a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) @@ -1852,407 +2060,6 @@ TEST_F(XdsClientTest, ConnectionFails) { // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - stream->CompleteSendMessageFromClient(); - // Cancel watches. - CancelFooWatch(watcher.get(), "foo1"); - CancelFooWatch(watcher2.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - stream->CompleteSendMessageFromClient(); - } -} - -TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) { - InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Do not send a response, but wait for the resource to be reported as - // not existing. - EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5))); - // Start a new watcher for the same resource. It should immediately - // receive the same does-not-exist notification. - auto watcher2 = StartFooWatch("foo1"); - EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1))); - // Now server sends a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource("foo1", 6)) - .Serialize()); - // XdsClient should have delivered the response to the watchers. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Cancel watch. - CancelFooWatch(watcher.get(), "foo1"); - CancelFooWatch(watcher2.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) { - // Lower resources-does-not-exist timeout so test finishes faster. - InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Stream fails. - stream->MaybeSendStatusToClient(absl::UnavailableError("ugh")); - // XdsClient should report error to watcher. - auto error = watcher->WaitForNextError(); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); - EXPECT_EQ(error->message(), - "xDS channel for server default_xds_server: xDS call failed " - "with no responses received; status: UNAVAILABLE: ugh " - "(node ID:xds_client_test)") - << *error; - // XdsClient should create a new stream. - stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient sends a subscription request. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Server does NOT send a response immediately. - // Client should receive a resource does-not-exist. - ASSERT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4))); - // Server now sends the requested resource. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource("foo1", 6)) - .Serialize()); - // The resource is delivered to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient sends an ACK. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Cancel watcher. - CancelFooWatch(watcher.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - } -} - -TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) { - // Lower resources-does-not-exist timeout, to make sure that we're not - // triggering that here. - InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); - // Tell transport to let us manually trigger completion of the - // send_message ops to XdsClient. - transport_factory_->SetAutoCompleteMessagesFromClient(false); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Server does NOT send a response. - // We should not see a resource-does-not-exist event, because the - // timer should not be running while the channel is disconnected. - EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); - // The ADS stream uses wait_for_ready inside the XdsTransport interface, - // so when the channel connects, the already-started stream will proceed. - stream->CompleteSendMessageFromClient(); - // Server does NOT send a response. - // Watcher should see a does-not-exist event. - EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4))); - // Now server sends a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource("foo1", 6)) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - stream->CompleteSendMessageFromClient(); - // Cancel watch. - CancelFooWatch(watcher.get(), "foo1"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - stream->CompleteSendMessageFromClient(); - } -} - -// In https://github.com/grpc/grpc/issues/29583, we ran into a case -// where we wound up starting a timer after we had already received the -// resource, thus incorrectly reporting the resource as not existing. -// This happened when unsubscribing and then resubscribing to the same -// resource a send_message op was already in flight and then receiving an -// update containing that resource. -TEST_F(XdsClientTest, - ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending) { - InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); - // Tell transport to let us manually trigger completion of the - // send_message ops to XdsClient. - transport_factory_->SetAutoCompleteMessagesFromClient(false); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - stream->CompleteSendMessageFromClient(); - // Server sends a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource("foo1", 6)) - .Serialize()); - // XdsClient should have delivered the response to the watchers. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - stream->CompleteSendMessageFromClient(); - // Start a watch for a second resource. - auto watcher2 = StartFooWatch("foo2"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher2->HasEvent()); - // XdsClient sends a request to subscribe to the new resource. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2"}); - // NOTE: We do NOT yet tell the XdsClient that the send_message op is - // complete. - // Unsubscribe from foo1 and then re-subscribe to it. - CancelFooWatch(watcher.get(), "foo1"); - watcher = StartFooWatch("foo1"); - // Now send a response from the server containing both foo1 and foo2. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("B") - .AddFooResource(XdsFooResource("foo1", 6)) - .AddFooResource(XdsFooResource("foo2", 7)) - .Serialize()); - // The watcher for foo1 will receive an update even if the resource - // has not changed, since the previous value was removed from the - // cache when we unsubscribed. - resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // For foo2, the watcher should receive notification for the new resource. - resource = watcher2->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo2"); - EXPECT_EQ(resource->value, 7); - // Now we finally tell XdsClient that its previous send_message op is - // complete. - stream->CompleteSendMessageFromClient(); - // XdsClient should send an ACK with the updated subscription list - // (which happens to be identical to the old list), and it should not - // restart the does-not-exist timer. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1", "foo2"}); - stream->CompleteSendMessageFromClient(); - // Make sure the watcher for foo1 does not see a does-not-exist event. - EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5))); - // Cancel watches. - CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); - CancelFooWatch(watcher2.get(), "foo2"); - // The XdsClient may or may not send an unsubscription message - // before it closes the transport, depending on callback timing. - request = WaitForRequest(stream.get()); - if (request.has_value()) { - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"B", - /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); - stream->CompleteSendMessageFromClient(); - } -} - -TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) { - // Lower resources-does-not-exist timeout, to make sure that we're not - // triggering that here. - InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Server sends a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource("foo1", 6)) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResource(); - ASSERT_TRUE(resource.has_value()); - EXPECT_EQ(resource->name, "foo1"); - EXPECT_EQ(resource->value, 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - // Stream fails because of transport disconnection. - stream->MaybeSendStatusToClient(absl::UnavailableError("connection failed")); - // XdsClient should NOT report error to watcher, because we saw a - // response on the stream before it failed. - // XdsClient creates a new stream. - stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Server does NOT send a response. - // We should not see a resource-does-not-exist event, because the - // resource was already cached, so the server can optimize by not - // resending it. - EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); - // Now server sends a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource("foo1", 6)) - .Serialize()); - // Watcher will not see any update, since the resource is unchanged. - EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1))); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), diff --git a/test/core/xds/xds_transport_fake.cc b/test/core/xds/xds_transport_fake.cc index b77d2ab11e7..e52dd2a047e 100644 --- a/test/core/xds/xds_transport_fake.cc +++ b/test/core/xds/xds_transport_fake.cc @@ -42,20 +42,21 @@ namespace grpc_core { // FakeXdsTransportFactory::FakeStreamingCall // -FakeXdsTransportFactory::FakeStreamingCall::~FakeStreamingCall() { - // Can't call event_handler_->OnStatusReceived() or unref event_handler_ - // synchronously, since those operations will trigger code in - // XdsClient that acquires its mutex, but it was already holding its - // mutex when it called us, so it would deadlock. - GetDefaultEventEngine()->Run([event_handler = std::move(event_handler_), - status_sent = status_sent_]() mutable { - ExecCtx exec_ctx; - if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus()); - event_handler.reset(); - }); -} - void FakeXdsTransportFactory::FakeStreamingCall::Orphan() { + { + MutexLock lock(&mu_); + // Can't call event_handler_->OnStatusReceived() or unref event_handler_ + // synchronously, since those operations will trigger code in + // XdsClient that acquires its mutex, but it was already holding its + // mutex when it called us, so it would deadlock. + GetDefaultEventEngine()->Run([event_handler = std::move(event_handler_), + status_sent = status_sent_]() mutable { + ExecCtx exec_ctx; + if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus()); + event_handler.reset(); + }); + status_sent_ = true; + } transport_->RemoveStream(method_, this); Unref(); } diff --git a/test/core/xds/xds_transport_fake.h b/test/core/xds/xds_transport_fake.h index 44ef5a30508..bb44761975c 100644 --- a/test/core/xds/xds_transport_fake.h +++ b/test/core/xds/xds_transport_fake.h @@ -63,8 +63,6 @@ class FakeXdsTransportFactory : public XdsTransportFactory { event_handler_(MakeRefCounted( std::move(event_handler))) {} - ~FakeStreamingCall() override; - void Orphan() override; using StreamingCall::Ref; // Make it public.