XdsClient: add unit test and fix watcher notification bugs (#30823)
This adds a unit test for XdsClient and fixes several watcher-notification bugs found in the process. Specifically: - When an ADS stream fails or an xDS channel reports a connectivity failure, report an error only to the watchers for resources being subscribed to on that particular channel, not to watchers on other channels. - Cache the error status for the channel, so that if a new watcher is started after the channel reports the error, we can immediately report that error to the new watcher. - If a resource is NACKed and has not been previously cached, or does not exist, report that fact to any new watcher that may be started later. - If a resource in an ADS response is unparseable but is wrapped in a `Resource` wrapper, we do know its name, so record the validation failure in the cache and report it to the watchers. Co-authored-by: markdroth <markdroth@users.noreply.github.com>pull/30450/head
parent
66749fd497
commit
bcd8c991e6
13 changed files with 3027 additions and 132 deletions
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,230 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2022 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "test/core/xds/xds_transport_fake.h" |
||||||
|
|
||||||
|
#include <functional> |
||||||
|
#include <memory> |
||||||
|
#include <utility> |
||||||
|
|
||||||
|
#include <grpc/event_engine/event_engine.h> |
||||||
|
|
||||||
|
#include "src/core/ext/xds/xds_bootstrap.h" |
||||||
|
#include "src/core/lib/event_engine/default_event_engine.h" |
||||||
|
#include "src/core/lib/gprpp/orphanable.h" |
||||||
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||||
|
#include "src/core/lib/iomgr/exec_ctx.h" |
||||||
|
#include "test/core/util/test_config.h" |
||||||
|
|
||||||
|
using grpc_event_engine::experimental::GetDefaultEventEngine; |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
//
|
||||||
|
// FakeXdsTransportFactory::FakeStreamingCall
|
||||||
|
//
|
||||||
|
|
||||||
|
void FakeXdsTransportFactory::FakeStreamingCall::Orphan() { |
||||||
|
{ |
||||||
|
MutexLock lock(&mu_); |
||||||
|
// Can't call event_handler_->OnStatusReceived() or unref event_handler_
|
||||||
|
// synchronously, since those operations will trigger code in
|
||||||
|
// XdsClient that acquires its mutex, but it was already holding its
|
||||||
|
// mutex when it called us, so it would deadlock.
|
||||||
|
GetDefaultEventEngine()->Run([event_handler = std::move(event_handler_), |
||||||
|
status_sent = status_sent_]() mutable { |
||||||
|
ExecCtx exec_ctx; |
||||||
|
if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus()); |
||||||
|
event_handler.reset(); |
||||||
|
}); |
||||||
|
status_sent_ = true; |
||||||
|
} |
||||||
|
transport_->RemoveStream(method_, this); |
||||||
|
Unref(); |
||||||
|
} |
||||||
|
|
||||||
|
void FakeXdsTransportFactory::FakeStreamingCall::SendMessage( |
||||||
|
std::string payload) { |
||||||
|
MutexLock lock(&mu_); |
||||||
|
from_client_messages_.push_back(std::move(payload)); |
||||||
|
cv_.Signal(); |
||||||
|
// Can't call event_handler_->OnRequestSent() synchronously, since that
|
||||||
|
// operation will trigger code in XdsClient that acquires its mutex, but it
|
||||||
|
// was already holding its mutex when it called us, so it would deadlock.
|
||||||
|
GetDefaultEventEngine()->Run( |
||||||
|
[event_handler = event_handler_->Ref()]() mutable { |
||||||
|
ExecCtx exec_ctx; |
||||||
|
event_handler->OnRequestSent(/*ok=*/true); |
||||||
|
event_handler.reset(); |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
bool FakeXdsTransportFactory::FakeStreamingCall::HaveMessageFromClient() { |
||||||
|
MutexLock lock(&mu_); |
||||||
|
return !from_client_messages_.empty(); |
||||||
|
} |
||||||
|
|
||||||
|
absl::optional<std::string> |
||||||
|
FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient( |
||||||
|
absl::Duration timeout) { |
||||||
|
MutexLock lock(&mu_); |
||||||
|
while (from_client_messages_.empty()) { |
||||||
|
if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) { |
||||||
|
return absl::nullopt; |
||||||
|
} |
||||||
|
} |
||||||
|
std::string payload = from_client_messages_.front(); |
||||||
|
from_client_messages_.pop_front(); |
||||||
|
return payload; |
||||||
|
} |
||||||
|
|
||||||
|
void FakeXdsTransportFactory::FakeStreamingCall::SendMessageToClient( |
||||||
|
absl::string_view payload) { |
||||||
|
ExecCtx exec_ctx; |
||||||
|
RefCountedPtr<RefCountedEventHandler> event_handler; |
||||||
|
{ |
||||||
|
MutexLock lock(&mu_); |
||||||
|
event_handler = event_handler_->Ref(); |
||||||
|
} |
||||||
|
event_handler->OnRecvMessage(payload); |
||||||
|
} |
||||||
|
|
||||||
|
void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient( |
||||||
|
absl::Status status) { |
||||||
|
ExecCtx exec_ctx; |
||||||
|
RefCountedPtr<RefCountedEventHandler> event_handler; |
||||||
|
{ |
||||||
|
MutexLock lock(&mu_); |
||||||
|
if (status_sent_) return; |
||||||
|
status_sent_ = true; |
||||||
|
event_handler = event_handler_->Ref(); |
||||||
|
} |
||||||
|
event_handler->OnStatusReceived(std::move(status)); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// FakeXdsTransportFactory::FakeXdsTransport
|
||||||
|
//
|
||||||
|
|
||||||
|
void FakeXdsTransportFactory::FakeXdsTransport::TriggerConnectionFailure( |
||||||
|
absl::Status status) { |
||||||
|
RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure; |
||||||
|
{ |
||||||
|
MutexLock lock(&mu_); |
||||||
|
on_connectivity_failure = on_connectivity_failure_->Ref(); |
||||||
|
} |
||||||
|
ExecCtx exec_ctx; |
||||||
|
if (on_connectivity_failure != nullptr) { |
||||||
|
on_connectivity_failure->Run(std::move(status)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void FakeXdsTransportFactory::FakeXdsTransport::Orphan() { |
||||||
|
{ |
||||||
|
MutexLock lock(&mu_); |
||||||
|
// Can't destroy on_connectivity_failure_ synchronously, since that
|
||||||
|
// operation will trigger code in XdsClient that acquires its mutex, but
|
||||||
|
// it was already holding its mutex when it called us, so it would deadlock.
|
||||||
|
GetDefaultEventEngine()->Run([on_connectivity_failure = std::move( |
||||||
|
on_connectivity_failure_)]() mutable { |
||||||
|
ExecCtx exec_ctx; |
||||||
|
on_connectivity_failure.reset(); |
||||||
|
}); |
||||||
|
} |
||||||
|
Unref(); |
||||||
|
} |
||||||
|
|
||||||
|
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> |
||||||
|
FakeXdsTransportFactory::FakeXdsTransport::WaitForStream( |
||||||
|
const char* method, absl::Duration timeout) { |
||||||
|
MutexLock lock(&mu_); |
||||||
|
auto it = active_calls_.find(method); |
||||||
|
while (it == active_calls_.end() || it->second == nullptr) { |
||||||
|
if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) { |
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
it = active_calls_.find(method); |
||||||
|
} |
||||||
|
return it->second; |
||||||
|
} |
||||||
|
|
||||||
|
void FakeXdsTransportFactory::FakeXdsTransport::RemoveStream( |
||||||
|
const char* method, FakeStreamingCall* call) { |
||||||
|
MutexLock lock(&mu_); |
||||||
|
auto it = active_calls_.find(method); |
||||||
|
if (it != active_calls_.end() && it->second.get() == call) { |
||||||
|
active_calls_.erase(it); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall> |
||||||
|
FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall( |
||||||
|
const char* method, |
||||||
|
std::unique_ptr<StreamingCall::EventHandler> event_handler) { |
||||||
|
auto call = MakeOrphanable<FakeStreamingCall>(Ref(), method, |
||||||
|
std::move(event_handler)); |
||||||
|
MutexLock lock(&mu_); |
||||||
|
active_calls_[method] = call->Ref(); |
||||||
|
cv_.Signal(); |
||||||
|
return call; |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// FakeXdsTransportFactory
|
||||||
|
//
|
||||||
|
|
||||||
|
constexpr char FakeXdsTransportFactory::kAdsMethod[]; |
||||||
|
constexpr char FakeXdsTransportFactory::kAdsV2Method[]; |
||||||
|
|
||||||
|
OrphanablePtr<XdsTransportFactory::XdsTransport> |
||||||
|
FakeXdsTransportFactory::Create( |
||||||
|
const XdsBootstrap::XdsServer& server, |
||||||
|
std::function<void(absl::Status)> on_connectivity_failure, |
||||||
|
absl::Status* /*status*/) { |
||||||
|
auto transport = |
||||||
|
MakeOrphanable<FakeXdsTransport>(std::move(on_connectivity_failure)); |
||||||
|
MutexLock lock(&mu_); |
||||||
|
auto& entry = transport_map_[&server]; |
||||||
|
GPR_ASSERT(entry == nullptr); |
||||||
|
entry = transport->Ref(); |
||||||
|
return transport; |
||||||
|
} |
||||||
|
|
||||||
|
void FakeXdsTransportFactory::TriggerConnectionFailure( |
||||||
|
const XdsBootstrap::XdsServer& server, absl::Status status) { |
||||||
|
auto transport = GetTransport(server); |
||||||
|
transport->TriggerConnectionFailure(std::move(status)); |
||||||
|
} |
||||||
|
|
||||||
|
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> |
||||||
|
FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server, |
||||||
|
const char* method, |
||||||
|
absl::Duration timeout) { |
||||||
|
auto transport = GetTransport(server); |
||||||
|
return transport->WaitForStream(method, timeout); |
||||||
|
} |
||||||
|
|
||||||
|
RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport> |
||||||
|
FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server) { |
||||||
|
MutexLock lock(&mu_); |
||||||
|
RefCountedPtr<FakeXdsTransport> transport = transport_map_[&server]; |
||||||
|
GPR_ASSERT(transport != nullptr); |
||||||
|
return transport; |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace grpc_core
|
@ -0,0 +1,180 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2022 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H |
||||||
|
#define GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <deque> |
||||||
|
#include <functional> |
||||||
|
#include <memory> |
||||||
|
#include <string> |
||||||
|
|
||||||
|
#include "absl/status/status.h" |
||||||
|
#include "absl/types/optional.h" |
||||||
|
|
||||||
|
#include "src/core/ext/xds/xds_bootstrap.h" |
||||||
|
#include "src/core/ext/xds/xds_transport.h" |
||||||
|
#include "src/core/lib/gprpp/orphanable.h" |
||||||
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||||
|
#include "src/core/lib/gprpp/sync.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
class FakeXdsTransportFactory : public XdsTransportFactory { |
||||||
|
private: |
||||||
|
class FakeXdsTransport; |
||||||
|
|
||||||
|
public: |
||||||
|
static constexpr char kAdsMethod[] = |
||||||
|
"/envoy.service.discovery.v3.AggregatedDiscoveryService/" |
||||||
|
"StreamAggregatedResources"; |
||||||
|
static constexpr char kAdsV2Method[] = |
||||||
|
"/envoy.service.discovery.v2.AggregatedDiscoveryService/" |
||||||
|
"StreamAggregatedResources"; |
||||||
|
|
||||||
|
class FakeStreamingCall : public XdsTransport::StreamingCall { |
||||||
|
public: |
||||||
|
FakeStreamingCall( |
||||||
|
RefCountedPtr<FakeXdsTransport> transport, const char* method, |
||||||
|
std::unique_ptr<StreamingCall::EventHandler> event_handler) |
||||||
|
: transport_(std::move(transport)), |
||||||
|
method_(method), |
||||||
|
event_handler_(MakeRefCounted<RefCountedEventHandler>( |
||||||
|
std::move(event_handler))) {} |
||||||
|
|
||||||
|
void Orphan() override; |
||||||
|
|
||||||
|
using StreamingCall::Ref; // Make it public.
|
||||||
|
|
||||||
|
bool HaveMessageFromClient(); |
||||||
|
absl::optional<std::string> WaitForMessageFromClient( |
||||||
|
absl::Duration timeout); |
||||||
|
|
||||||
|
void SendMessageToClient(absl::string_view payload); |
||||||
|
void MaybeSendStatusToClient(absl::Status status); |
||||||
|
|
||||||
|
private: |
||||||
|
class RefCountedEventHandler : public RefCounted<RefCountedEventHandler> { |
||||||
|
public: |
||||||
|
explicit RefCountedEventHandler( |
||||||
|
std::unique_ptr<StreamingCall::EventHandler> event_handler) |
||||||
|
: event_handler_(std::move(event_handler)) {} |
||||||
|
|
||||||
|
void OnRequestSent(bool ok) { event_handler_->OnRequestSent(ok); } |
||||||
|
void OnRecvMessage(absl::string_view payload) { |
||||||
|
event_handler_->OnRecvMessage(payload); |
||||||
|
} |
||||||
|
void OnStatusReceived(absl::Status status) { |
||||||
|
event_handler_->OnStatusReceived(std::move(status)); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
std::unique_ptr<StreamingCall::EventHandler> event_handler_; |
||||||
|
}; |
||||||
|
|
||||||
|
void SendMessage(std::string payload) override; |
||||||
|
|
||||||
|
RefCountedPtr<FakeXdsTransport> transport_; |
||||||
|
const char* method_; |
||||||
|
|
||||||
|
Mutex mu_; |
||||||
|
CondVar cv_; |
||||||
|
RefCountedPtr<RefCountedEventHandler> event_handler_ ABSL_GUARDED_BY(&mu_); |
||||||
|
std::deque<std::string> from_client_messages_ ABSL_GUARDED_BY(&mu_); |
||||||
|
bool status_sent_ ABSL_GUARDED_BY(&mu_) = false; |
||||||
|
}; |
||||||
|
|
||||||
|
FakeXdsTransportFactory() = default; |
||||||
|
|
||||||
|
using XdsTransportFactory::Ref; // Make it public.
|
||||||
|
|
||||||
|
void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server, |
||||||
|
absl::Status status); |
||||||
|
|
||||||
|
RefCountedPtr<FakeStreamingCall> WaitForStream( |
||||||
|
const XdsBootstrap::XdsServer& server, const char* method, |
||||||
|
absl::Duration timeout); |
||||||
|
|
||||||
|
void Orphan() override { Unref(); } |
||||||
|
|
||||||
|
private: |
||||||
|
class FakeXdsTransport : public XdsTransport { |
||||||
|
public: |
||||||
|
explicit FakeXdsTransport( |
||||||
|
std::function<void(absl::Status)> on_connectivity_failure) |
||||||
|
: on_connectivity_failure_( |
||||||
|
MakeRefCounted<RefCountedOnConnectivityFailure>( |
||||||
|
std::move(on_connectivity_failure))) {} |
||||||
|
|
||||||
|
void Orphan() override; |
||||||
|
|
||||||
|
using XdsTransport::Ref; // Make it public.
|
||||||
|
|
||||||
|
void TriggerConnectionFailure(absl::Status status); |
||||||
|
|
||||||
|
RefCountedPtr<FakeStreamingCall> WaitForStream(const char* method, |
||||||
|
absl::Duration timeout); |
||||||
|
|
||||||
|
void RemoveStream(const char* method, FakeStreamingCall* call); |
||||||
|
|
||||||
|
private: |
||||||
|
class RefCountedOnConnectivityFailure |
||||||
|
: public RefCounted<RefCountedOnConnectivityFailure> { |
||||||
|
public: |
||||||
|
explicit RefCountedOnConnectivityFailure( |
||||||
|
std::function<void(absl::Status)> on_connectivity_failure) |
||||||
|
: on_connectivity_failure_(std::move(on_connectivity_failure)) {} |
||||||
|
|
||||||
|
void Run(absl::Status status) { |
||||||
|
on_connectivity_failure_(std::move(status)); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
std::function<void(absl::Status)> on_connectivity_failure_; |
||||||
|
}; |
||||||
|
|
||||||
|
OrphanablePtr<StreamingCall> CreateStreamingCall( |
||||||
|
const char* method, |
||||||
|
std::unique_ptr<StreamingCall::EventHandler> event_handler) override; |
||||||
|
|
||||||
|
void ResetBackoff() override {} |
||||||
|
|
||||||
|
Mutex mu_; |
||||||
|
CondVar cv_; |
||||||
|
RefCountedPtr<RefCountedOnConnectivityFailure> on_connectivity_failure_ |
||||||
|
ABSL_GUARDED_BY(&mu_); |
||||||
|
std::map<std::string /*method*/, RefCountedPtr<FakeStreamingCall>> |
||||||
|
active_calls_ ABSL_GUARDED_BY(&mu_); |
||||||
|
}; |
||||||
|
|
||||||
|
OrphanablePtr<XdsTransport> Create( |
||||||
|
const XdsBootstrap::XdsServer& server, |
||||||
|
std::function<void(absl::Status)> on_connectivity_failure, |
||||||
|
absl::Status* status) override; |
||||||
|
|
||||||
|
RefCountedPtr<FakeXdsTransport> GetTransport( |
||||||
|
const XdsBootstrap::XdsServer& server); |
||||||
|
|
||||||
|
Mutex mu_; |
||||||
|
std::map<const XdsBootstrap::XdsServer*, RefCountedPtr<FakeXdsTransport>> |
||||||
|
transport_map_ ABSL_GUARDED_BY(&mu_); |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif // GRPC_CORE_EXT_XDS_XDS_TRANSPORT_FAKE_H
|
Loading…
Reference in new issue