Second attempt: XdsClient: implement fuzzer (#31606)

* Revert "Revert "XdsClient: implement fuzzer (#31560)" (#31604)"

This reverts commit 58b298f354.

* avoid dependency on status.proto
pull/31613/head
Mark D. Roth 2 years ago committed by GitHub
parent a638c407bb
commit 6c56fe6326
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      src/core/ext/xds/xds_resource_type_impl.h
  2. 29
      test/core/xds/BUILD
  3. 41
      test/core/xds/xds_client_corpora/basic_cluster
  4. 57
      test/core/xds/xds_client_corpora/basic_endpoint
  5. 50
      test/core/xds/xds_client_corpora/basic_listener
  6. 45
      test/core/xds/xds_client_corpora/basic_route_config
  7. 294
      test/core/xds/xds_client_fuzzer.cc
  8. 116
      test/core/xds/xds_client_fuzzer.proto
  9. 4
      test/core/xds/xds_client_test.cc
  10. 2
      test/core/xds/xds_transport_fake.cc
  11. 5
      test/core/xds/xds_transport_fake.h

@ -35,17 +35,19 @@ namespace grpc_core {
template <typename Subclass, typename ResourceTypeStruct>
class XdsResourceTypeImpl : public XdsResourceType {
public:
using ResourceType = ResourceTypeStruct;
// XdsClient watcher that handles down-casting.
class WatcherInterface : public XdsClient::ResourceWatcherInterface {
public:
virtual void OnResourceChanged(ResourceTypeStruct listener) = 0;
virtual void OnResourceChanged(ResourceType listener) = 0;
private:
// Get result from XdsClient generic watcher interface, perform
// down-casting, and invoke the caller's OnResourceChanged() method.
void OnGenericResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnResourceChanged(*static_cast<const ResourceTypeStruct*>(resource));
OnResourceChanged(*static_cast<const ResourceType*>(resource));
}
};
@ -70,14 +72,14 @@ class XdsResourceTypeImpl : public XdsResourceType {
bool ResourcesEqual(const ResourceData* r1,
const ResourceData* r2) const override {
return *static_cast<const ResourceTypeStruct*>(r1) ==
*static_cast<const ResourceTypeStruct*>(r2);
return *static_cast<const ResourceType*>(r1) ==
*static_cast<const ResourceType*>(r2);
}
std::unique_ptr<ResourceData> CopyResource(
const ResourceData* resource) const override {
return std::make_unique<ResourceTypeStruct>(
*static_cast<const ResourceTypeStruct*>(resource));
return std::make_unique<ResourceType>(
*static_cast<const ResourceType*>(resource));
}
};

@ -18,6 +18,7 @@ load(
"grpc_cc_test",
"grpc_package",
)
load("//test/core/util:grpc_fuzzer.bzl", "grpc_proto_fuzzer")
grpc_package(name = "test/core/xds")
@ -156,6 +157,34 @@ grpc_cc_test(
],
)
grpc_proto_fuzzer(
name = "xds_client_fuzzer",
srcs = ["xds_client_fuzzer.cc"],
corpus = "xds_client_corpora",
language = "C++",
proto = "xds_client_fuzzer.proto",
proto_deps = [
"//src/proto/grpc/testing/xds/v3:discovery_proto",
],
tags = ["no_windows"],
uses_event_engine = False,
uses_polling = False,
deps = [
":xds_transport_fake",
"//src/core:grpc_xds_client",
"//test/core/util:grpc_test_util",
# These proto deps are needed to ensure that we can read these
# resource types out of the google.protobuf.Any fields in the
# textproto files in the corpora.
"//src/proto/grpc/testing/xds/v3:listener_proto",
"//src/proto/grpc/testing/xds/v3:route_proto",
"//src/proto/grpc/testing/xds/v3:cluster_proto",
"//src/proto/grpc/testing/xds/v3:endpoint_proto",
"//src/proto/grpc/testing/xds/v3:http_connection_manager_proto",
"//src/proto/grpc/testing/xds/v3:router_proto",
],
)
grpc_cc_test(
name = "xds_common_types_test",
srcs = ["xds_common_types_test.cc"],

@ -0,0 +1,41 @@
bootstrap: "{\"xds_servers\": [{\"server_uri\":\"xds.example.com:443\", \"channel_creds\":[{\"type\": \"fake\"}]}]}"
actions {
start_watch {
resource_type {
cluster {}
}
resource_name: "cluster1"
}
}
actions {
read_message_from_client {
stream_id {
ads {}
}
ok: true
}
}
actions {
send_message_to_client {
stream_id {
ads {}
}
response {
version_info: "1"
nonce: "A"
type_url: "type.googleapis.com/envoy.config.cluster.v3.Cluster"
resources {
[type.googleapis.com/envoy.config.cluster.v3.Cluster] {
name: "cluster1"
type: EDS
eds_cluster_config {
eds_config {
ads {}
}
service_name: "endpoint1"
}
}
}
}
}
}

@ -0,0 +1,57 @@
bootstrap: "{\"xds_servers\": [{\"server_uri\":\"xds.example.com:443\", \"channel_creds\":[{\"type\": \"fake\"}]}]}"
actions {
start_watch {
resource_type {
endpoint {}
}
resource_name: "endpoint1"
}
}
actions {
read_message_from_client {
stream_id {
ads {}
}
ok: true
}
}
actions {
send_message_to_client {
stream_id {
ads {}
}
response {
version_info: "1"
nonce: "A"
type_url: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
resources {
[type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment] {
cluster_name: "endpoint1"
endpoints {
locality {
region: "region1"
zone: "zone1"
sub_zone: "sub_zone1"
}
load_balancing_weight {
value: 1
}
lb_endpoints {
load_balancing_weight {
value: 1
}
endpoint {
address {
socket_address {
address: "127.0.0.1"
port_value: 443
}
}
}
}
}
}
}
}
}
}

@ -0,0 +1,50 @@
bootstrap: "{\"xds_servers\": [{\"server_uri\":\"xds.example.com:443\", \"channel_creds\":[{\"type\": \"fake\"}]}]}"
actions {
start_watch {
resource_type {
listener {}
}
resource_name: "server.example.com"
}
}
actions {
read_message_from_client {
stream_id {
ads {}
}
ok: true
}
}
actions {
send_message_to_client {
stream_id {
ads {}
}
response {
version_info: "1"
nonce: "A"
type_url: "type.googleapis.com/envoy.config.listener.v3.Listener"
resources {
[type.googleapis.com/envoy.config.listener.v3.Listener] {
name: "server.example.com"
api_listener {
api_listener {
[type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager] {
http_filters {
name: "router"
typed_config {
[type.googleapis.com/envoy.extensions.filters.http.router.v3.Router] {}
}
}
rds {
route_config_name: "route_config"
config_source { self {} }
}
}
}
}
}
}
}
}
}

@ -0,0 +1,45 @@
bootstrap: "{\"xds_servers\": [{\"server_uri\":\"xds.example.com:443\", \"channel_creds\":[{\"type\": \"fake\"}]}]}"
actions {
start_watch {
resource_type {
route_config {}
}
resource_name: "route_config1"
}
}
actions {
read_message_from_client {
stream_id {
ads {}
}
ok: true
}
}
actions {
send_message_to_client {
stream_id {
ads {}
}
response {
version_info: "1"
nonce: "A"
type_url: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"
resources {
[type.googleapis.com/envoy.config.route.v3.RouteConfiguration] {
name: "route_config1"
virtual_hosts {
domains: "*"
routes {
match {
prefix: ""
}
route {
cluster: "cluster1"
}
}
}
}
}
}
}
}

@ -0,0 +1,294 @@
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/log.h>
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_cluster.h"
#include "src/core/ext/xds/xds_endpoint.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/ext/xds/xds_route_config.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/xds/xds_client_fuzzer.pb.h"
#include "test/core/xds/xds_transport_fake.h"
namespace grpc_core {
class Fuzzer {
public:
explicit Fuzzer(absl::string_view bootstrap_json) {
auto bootstrap = GrpcXdsBootstrap::Create(bootstrap_json);
if (!bootstrap.ok()) {
gpr_log(GPR_ERROR, "error creating bootstrap: %s",
bootstrap.status().ToString().c_str());
// Leave xds_client_ unset, so Act() will be a no-op.
return;
}
auto transport_factory = MakeOrphanable<FakeXdsTransportFactory>();
transport_factory->SetAutoCompleteMessagesFromClient(false);
transport_factory_ = transport_factory.get();
xds_client_ = MakeRefCounted<XdsClient>(std::move(*bootstrap),
std::move(transport_factory));
}
void Act(const xds_client_fuzzer::Action& action) {
if (xds_client_ == nullptr) return;
switch (action.action_type_case()) {
case xds_client_fuzzer::Action::kStartWatch:
switch (action.start_watch().resource_type().resource_type_case()) {
case xds_client_fuzzer::ResourceType::kListener:
StartWatch(&listener_watchers_,
action.start_watch().resource_name());
break;
case xds_client_fuzzer::ResourceType::kRouteConfig:
StartWatch(&route_config_watchers_,
action.start_watch().resource_name());
break;
case xds_client_fuzzer::ResourceType::kCluster:
StartWatch(&cluster_watchers_,
action.start_watch().resource_name());
break;
case xds_client_fuzzer::ResourceType::kEndpoint:
StartWatch(&endpoint_watchers_,
action.start_watch().resource_name());
break;
case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET:
break;
}
break;
case xds_client_fuzzer::Action::kStopWatch:
switch (action.stop_watch().resource_type().resource_type_case()) {
case xds_client_fuzzer::ResourceType::kListener:
StopWatch(&listener_watchers_, action.stop_watch().resource_name());
break;
case xds_client_fuzzer::ResourceType::kRouteConfig:
StopWatch(&route_config_watchers_,
action.stop_watch().resource_name());
break;
case xds_client_fuzzer::ResourceType::kCluster:
StopWatch(&cluster_watchers_, action.stop_watch().resource_name());
break;
case xds_client_fuzzer::ResourceType::kEndpoint:
StopWatch(&endpoint_watchers_, action.stop_watch().resource_name());
break;
case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET:
break;
}
break;
case xds_client_fuzzer::Action::kDumpCsdsData:
xds_client_->DumpClientConfigBinary();
break;
case xds_client_fuzzer::Action::kTriggerConnectionFailure:
TriggerConnectionFailure(
action.trigger_connection_failure().authority(),
ToAbslStatus(action.trigger_connection_failure().status()));
break;
case xds_client_fuzzer::Action::kReadMessageFromClient:
ReadMessageFromClient(action.read_message_from_client().stream_id(),
action.read_message_from_client().ok());
break;
case xds_client_fuzzer::Action::kSendMessageToClient:
SendMessageToClient(action.send_message_to_client().stream_id(),
action.send_message_to_client().response());
break;
case xds_client_fuzzer::Action::kSendStatusToClient:
SendStatusToClient(
action.send_status_to_client().stream_id(),
ToAbslStatus(action.send_status_to_client().status()));
break;
case xds_client_fuzzer::Action::ACTION_TYPE_NOT_SET:
break;
}
}
private:
template <typename ResourceTypeType>
class Watcher : public ResourceTypeType::WatcherInterface {
public:
using ResourceType = ResourceTypeType;
explicit Watcher(std::string resource_name)
: resource_name_(std::move(resource_name)) {}
void OnResourceChanged(
typename ResourceType::ResourceType resource) override {
gpr_log(GPR_INFO, "==> OnResourceChanged(%s %s): %s",
std::string(ResourceType::Get()->type_url()).c_str(),
resource_name_.c_str(), resource.ToString().c_str());
}
void OnError(absl::Status status) override {
gpr_log(GPR_INFO, "==> OnError(%s %s): %s",
std::string(ResourceType::Get()->type_url()).c_str(),
resource_name_.c_str(), status.ToString().c_str());
}
void OnResourceDoesNotExist() override {
gpr_log(GPR_INFO, "==> OnResourceDoesNotExist(%s %s)",
std::string(ResourceType::Get()->type_url()).c_str(),
resource_name_.c_str());
}
private:
std::string resource_name_;
};
using ListenerWatcher = Watcher<XdsListenerResourceType>;
using RouteConfigWatcher = Watcher<XdsRouteConfigResourceType>;
using ClusterWatcher = Watcher<XdsClusterResourceType>;
using EndpointWatcher = Watcher<XdsEndpointResourceType>;
template <typename WatcherType>
void StartWatch(std::map<std::string, std::set<WatcherType*>>* watchers,
std::string resource_name) {
gpr_log(GPR_INFO, "### StartWatch(%s %s)",
std::string(WatcherType::ResourceType::Get()->type_url()).c_str(),
resource_name.c_str());
auto watcher = MakeRefCounted<WatcherType>(resource_name);
(*watchers)[resource_name].insert(watcher.get());
WatcherType::ResourceType::Get()->StartWatch(
xds_client_.get(), resource_name, std::move(watcher));
}
template <typename WatcherType>
void StopWatch(std::map<std::string, std::set<WatcherType*>>* watchers,
std::string resource_name) {
gpr_log(GPR_INFO, "### StopWatch(%s %s)",
std::string(WatcherType::ResourceType::Get()->type_url()).c_str(),
resource_name.c_str());
auto& watchers_set = (*watchers)[resource_name];
auto it = watchers_set.begin();
if (it == watchers_set.end()) return;
WatcherType::ResourceType::Get()->CancelWatch(xds_client_.get(),
resource_name, *it);
watchers_set.erase(it);
}
static absl::Status ToAbslStatus(const xds_client_fuzzer::Status& status) {
return absl::Status(static_cast<absl::StatusCode>(status.code()),
status.message());
}
const XdsBootstrap::XdsServer* GetServer(const std::string& authority) {
const GrpcXdsBootstrap& bootstrap =
static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap());
if (authority.empty()) return &bootstrap.server();
const auto* authority_entry =
static_cast<const GrpcXdsBootstrap::GrpcAuthority*>(
bootstrap.LookupAuthority(authority));
if (authority_entry == nullptr) return nullptr;
if (authority_entry->server() != nullptr) return authority_entry->server();
return &bootstrap.server();
}
void TriggerConnectionFailure(const std::string& authority,
absl::Status status) {
gpr_log(GPR_INFO, "### TriggerConnectionFailure(%s): %s", authority.c_str(),
status.ToString().c_str());
const auto* xds_server = GetServer(authority);
if (xds_server == nullptr) return;
transport_factory_->TriggerConnectionFailure(*xds_server,
std::move(status));
}
static const char* StreamIdMethod(
const xds_client_fuzzer::StreamId& stream_id) {
switch (stream_id.method_case()) {
case xds_client_fuzzer::StreamId::kAds:
return FakeXdsTransportFactory::kAdsMethod;
case xds_client_fuzzer::StreamId::kLrs:
return FakeXdsTransportFactory::kLrsMethod;
case xds_client_fuzzer::StreamId::METHOD_NOT_SET:
return nullptr;
}
}
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> GetStream(
const xds_client_fuzzer::StreamId& stream_id) {
const auto* xds_server = GetServer(stream_id.authority());
if (xds_server == nullptr) return nullptr;
const char* method = StreamIdMethod(stream_id);
if (method == nullptr) return nullptr;
return transport_factory_->WaitForStream(*xds_server, method,
absl::ZeroDuration());
}
static std::string StreamIdString(
const xds_client_fuzzer::StreamId& stream_id) {
return absl::StrCat("{authority=\"", stream_id.authority(),
"\", method=", StreamIdMethod(stream_id), "}");
}
void ReadMessageFromClient(const xds_client_fuzzer::StreamId& stream_id,
bool ok) {
gpr_log(GPR_INFO, "### ReadMessageFromClient(%s): %s",
StreamIdString(stream_id).c_str(), ok ? "true" : "false");
auto stream = GetStream(stream_id);
if (stream == nullptr) return;
gpr_log(GPR_INFO, " stream=%p", stream.get());
auto message = stream->WaitForMessageFromClient(absl::ZeroDuration());
if (message.has_value()) {
gpr_log(GPR_INFO, " completing send_message");
stream->CompleteSendMessageFromClient(ok);
}
}
void SendMessageToClient(
const xds_client_fuzzer::StreamId& stream_id,
const envoy::service::discovery::v3::DiscoveryResponse& response) {
gpr_log(GPR_INFO, "### SendMessageToClient(%s)",
StreamIdString(stream_id).c_str());
auto stream = GetStream(stream_id);
if (stream == nullptr) return;
gpr_log(GPR_INFO, " stream=%p", stream.get());
stream->SendMessageToClient(response.SerializeAsString());
}
void SendStatusToClient(const xds_client_fuzzer::StreamId& stream_id,
absl::Status status) {
gpr_log(GPR_INFO, "### SendStatusToClient(%s): %s",
StreamIdString(stream_id).c_str(), status.ToString().c_str());
auto stream = GetStream(stream_id);
if (stream == nullptr) return;
gpr_log(GPR_INFO, " stream=%p", stream.get());
stream->MaybeSendStatusToClient(std::move(status));
}
RefCountedPtr<XdsClient> xds_client_;
FakeXdsTransportFactory* transport_factory_;
// Maps of currently active watchers for each resource type, keyed by
// resource name.
std::map<std::string, std::set<ListenerWatcher*>> listener_watchers_;
std::map<std::string, std::set<RouteConfigWatcher*>> route_config_watchers_;
std::map<std::string, std::set<ClusterWatcher*>> cluster_watchers_;
std::map<std::string, std::set<EndpointWatcher*>> endpoint_watchers_;
};
} // namespace grpc_core
bool squelch = true;
DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Message& message) {
grpc_init();
grpc_core::Fuzzer fuzzer(message.bootstrap());
for (int i = 0; i < message.actions_size(); i++) {
fuzzer.Act(message.actions(i));
}
grpc_shutdown();
}

@ -0,0 +1,116 @@
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
syntax = "proto3";
import "src/proto/grpc/testing/xds/v3/discovery.proto";
package xds_client_fuzzer;
// We'd ideally like to use google.rpc.Status instead of creating our
// own proto for this, but that winds up causing all sorts of dependency
// headaches.
message Status {
int32 code = 1;
string message = 2;
}
//
// interactions with XdsClient API
//
// Use a oneof instead of an enum so that we can ensure that the code
// will fail to build if we add a type here and don't handle it in the
// fuzzer code.
message ResourceType {
message EmptyMessage {}
oneof resource_type {
EmptyMessage listener = 1;
EmptyMessage route_config = 2;
EmptyMessage cluster = 3;
EmptyMessage endpoint = 4;
}
}
message StartWatch {
ResourceType resource_type = 1;
string resource_name = 2;
}
message StopWatch {
ResourceType resource_type = 1;
string resource_name = 2;
}
// TODO(roth): add LRS methods on XdsClient
message DumpCsdsData {}
//
// interactions with fake transport
//
message TriggerConnectionFailure {
string authority = 1;
Status status = 2;
}
message StreamId {
string authority = 1;
// Use a oneof instead of an enum so that we can ensure that the code
// will fail to build if we add a type here and don't handle it in the
// fuzzer code.
message EmptyMessage {}
oneof method {
EmptyMessage ads = 2;
EmptyMessage lrs = 3;
}
}
message ReadMessageFromClient {
StreamId stream_id = 1;
bool ok = 2;
}
message SendMessageToClient {
StreamId stream_id = 1;
envoy.service.discovery.v3.DiscoveryResponse response = 2;
}
message SendStatusToClient {
StreamId stream_id = 1;
Status status = 2;
}
message Action {
oneof action_type {
// interactions with XdsClient API
StartWatch start_watch = 1;
StopWatch stop_watch = 2;
DumpCsdsData dump_csds_data = 3;
// interactions with fake transport
TriggerConnectionFailure trigger_connection_failure = 4;
ReadMessageFromClient read_message_from_client = 5;
SendMessageToClient send_message_to_client = 6;
SendStatusToClient send_status_to_client = 7;
}
}
message Message {
string bootstrap = 1;
repeated Action actions = 2;
}

@ -228,8 +228,6 @@ class XdsClientTest : public ::testing::Test {
XdsTestResourceType<ResourceStruct, all_resources_required_in_sotw>,
ResourceStruct> {
public:
using ResourceStructType = ResourceStruct;
// A watcher implementation that queues delivered watches.
class Watcher : public XdsResourceTypeImpl<
XdsTestResourceType<ResourceStruct,
@ -494,7 +492,7 @@ class XdsClientTest : public ::testing::Test {
template <typename ResourceType>
ResponseBuilder& AddResource(
const typename ResourceType::ResourceStructType& resource,
const typename ResourceType::ResourceType& resource,
bool in_resource_wrapper = false) {
auto* res = response_.add_resources();
*res = ResourceType::EncodeAsAny(resource);

@ -205,7 +205,7 @@ FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall(
//
constexpr char FakeXdsTransportFactory::kAdsMethod[];
constexpr char FakeXdsTransportFactory::kAdsV2Method[];
constexpr char FakeXdsTransportFactory::kLrsMethod[];
OrphanablePtr<XdsTransportFactory::XdsTransport>
FakeXdsTransportFactory::Create(

@ -49,9 +49,8 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
static constexpr char kAdsMethod[] =
"/envoy.service.discovery.v3.AggregatedDiscoveryService/"
"StreamAggregatedResources";
static constexpr char kAdsV2Method[] =
"/envoy.service.discovery.v2.AggregatedDiscoveryService/"
"StreamAggregatedResources";
static constexpr char kLrsMethod[] =
"/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats";
class FakeStreamingCall : public XdsTransport::StreamingCall {
public:

Loading…
Cancel
Save