diff --git a/src/core/ext/xds/xds_resource_type_impl.h b/src/core/ext/xds/xds_resource_type_impl.h index 5b54ce8ab98..f1b3c5e1f11 100644 --- a/src/core/ext/xds/xds_resource_type_impl.h +++ b/src/core/ext/xds/xds_resource_type_impl.h @@ -35,17 +35,19 @@ namespace grpc_core { template class XdsResourceTypeImpl : public XdsResourceType { public: + using ResourceType = ResourceTypeStruct; + // XdsClient watcher that handles down-casting. class WatcherInterface : public XdsClient::ResourceWatcherInterface { public: - virtual void OnResourceChanged(ResourceTypeStruct listener) = 0; + virtual void OnResourceChanged(ResourceType listener) = 0; private: // Get result from XdsClient generic watcher interface, perform // down-casting, and invoke the caller's OnResourceChanged() method. void OnGenericResourceChanged( const XdsResourceType::ResourceData* resource) override { - OnResourceChanged(*static_cast(resource)); + OnResourceChanged(*static_cast(resource)); } }; @@ -70,14 +72,14 @@ class XdsResourceTypeImpl : public XdsResourceType { bool ResourcesEqual(const ResourceData* r1, const ResourceData* r2) const override { - return *static_cast(r1) == - *static_cast(r2); + return *static_cast(r1) == + *static_cast(r2); } std::unique_ptr CopyResource( const ResourceData* resource) const override { - return std::make_unique( - *static_cast(resource)); + return std::make_unique( + *static_cast(resource)); } }; diff --git a/test/core/xds/BUILD b/test/core/xds/BUILD index e888fe9da1c..533a2a384fa 100644 --- a/test/core/xds/BUILD +++ b/test/core/xds/BUILD @@ -18,6 +18,7 @@ load( "grpc_cc_test", "grpc_package", ) +load("//test/core/util:grpc_fuzzer.bzl", "grpc_proto_fuzzer") grpc_package(name = "test/core/xds") @@ -156,6 +157,35 @@ grpc_cc_test( ], ) +grpc_proto_fuzzer( + name = "xds_client_fuzzer", + srcs = ["xds_client_fuzzer.cc"], + corpus = "xds_client_corpora", + language = "C++", + proto = "xds_client_fuzzer.proto", + proto_deps = [ + "//src/proto/grpc/status:status_proto", + "//src/proto/grpc/testing/xds/v3:discovery_proto", + ], + tags = ["no_windows"], + uses_event_engine = False, + uses_polling = False, + deps = [ + ":xds_transport_fake", + "//src/core:grpc_xds_client", + "//test/core/util:grpc_test_util", + # These proto deps are needed to ensure that we can read these + # resource types out of the google.protobuf.Any fields in the + # textproto files in the corpora. + "//src/proto/grpc/testing/xds/v3:listener_proto", + "//src/proto/grpc/testing/xds/v3:route_proto", + "//src/proto/grpc/testing/xds/v3:cluster_proto", + "//src/proto/grpc/testing/xds/v3:endpoint_proto", + "//src/proto/grpc/testing/xds/v3:http_connection_manager_proto", + "//src/proto/grpc/testing/xds/v3:router_proto", + ], +) + grpc_cc_test( name = "xds_common_types_test", srcs = ["xds_common_types_test.cc"], diff --git a/test/core/xds/xds_client_corpora/basic_cluster b/test/core/xds/xds_client_corpora/basic_cluster new file mode 100644 index 00000000000..9fd3f10dc4e --- /dev/null +++ b/test/core/xds/xds_client_corpora/basic_cluster @@ -0,0 +1,41 @@ +bootstrap: "{\"xds_servers\": [{\"server_uri\":\"xds.example.com:443\", \"channel_creds\":[{\"type\": \"fake\"}]}]}" +actions { + start_watch { + resource_type { + cluster {} + } + resource_name: "cluster1" + } +} +actions { + read_message_from_client { + stream_id { + ads {} + } + ok: true + } +} +actions { + send_message_to_client { + stream_id { + ads {} + } + response { + version_info: "1" + nonce: "A" + type_url: "type.googleapis.com/envoy.config.cluster.v3.Cluster" + resources { + [type.googleapis.com/envoy.config.cluster.v3.Cluster] { + name: "cluster1" + type: EDS + eds_cluster_config { + eds_config { + ads {} + } + service_name: "endpoint1" + } + } + } + } + } +} diff --git a/test/core/xds/xds_client_corpora/basic_endpoint b/test/core/xds/xds_client_corpora/basic_endpoint new file mode 100644 index 00000000000..818fb2b44b1 --- /dev/null +++ b/test/core/xds/xds_client_corpora/basic_endpoint @@ -0,0 +1,57 @@ +bootstrap: "{\"xds_servers\": [{\"server_uri\":\"xds.example.com:443\", \"channel_creds\":[{\"type\": \"fake\"}]}]}" +actions { + start_watch { + resource_type { + endpoint {} + } + resource_name: "endpoint1" + } +} +actions { + read_message_from_client { + stream_id { + ads {} + } + ok: true + } +} +actions { + send_message_to_client { + stream_id { + ads {} + } + response { + version_info: "1" + nonce: "A" + type_url: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment" + resources { + [type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment] { + cluster_name: "endpoint1" + endpoints { + locality { + region: "region1" + zone: "zone1" + sub_zone: "sub_zone1" + } + load_balancing_weight { + value: 1 + } + lb_endpoints { + load_balancing_weight { + value: 1 + } + endpoint { + address { + socket_address { + address: "127.0.0.1" + port_value: 443 + } + } + } + } + } + } + } + } + } +} diff --git a/test/core/xds/xds_client_corpora/basic_listener b/test/core/xds/xds_client_corpora/basic_listener new file mode 100644 index 00000000000..e55f73e7278 --- /dev/null +++ b/test/core/xds/xds_client_corpora/basic_listener @@ -0,0 +1,50 @@ +bootstrap: "{\"xds_servers\": [{\"server_uri\":\"xds.example.com:443\", \"channel_creds\":[{\"type\": \"fake\"}]}]}" +actions { + start_watch { + resource_type { + listener {} + } + resource_name: "server.example.com" + } +} +actions { + read_message_from_client { + stream_id { + ads {} + } + ok: true + } +} +actions { + send_message_to_client { + stream_id { + ads {} + } + response { + version_info: "1" + nonce: "A" + type_url: "type.googleapis.com/envoy.config.listener.v3.Listener" + resources { + [type.googleapis.com/envoy.config.listener.v3.Listener] { + name: "server.example.com" + api_listener { + api_listener { + [type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager] { + http_filters { + name: "router" + typed_config { + [type.googleapis.com/envoy.extensions.filters.http.router.v3.Router] {} + } + } + rds { + route_config_name: "route_config" + config_source { self {} } + } + } + } + } + } + } + } + } +} diff --git a/test/core/xds/xds_client_corpora/basic_route_config b/test/core/xds/xds_client_corpora/basic_route_config new file mode 100644 index 00000000000..8585134e15b --- /dev/null +++ b/test/core/xds/xds_client_corpora/basic_route_config @@ -0,0 +1,45 @@ +bootstrap: "{\"xds_servers\": [{\"server_uri\":\"xds.example.com:443\", \"channel_creds\":[{\"type\": \"fake\"}]}]}" +actions { + start_watch { + resource_type { + route_config {} + } + resource_name: "route_config1" + } +} +actions { + read_message_from_client { + stream_id { + ads {} + } + ok: true + } +} +actions { + send_message_to_client { + stream_id { + ads {} + } + response { + version_info: "1" + nonce: "A" + type_url: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration" + resources { + [type.googleapis.com/envoy.config.route.v3.RouteConfiguration] { + name: "route_config1" + virtual_hosts { + domains: "*" + routes { + match { + prefix: "" + } + route { + cluster: "cluster1" + } + } + } + } + } + } + } +} diff --git a/test/core/xds/xds_client_fuzzer.cc b/test/core/xds/xds_client_fuzzer.cc new file mode 100644 index 00000000000..29a1bec320d --- /dev/null +++ b/test/core/xds/xds_client_fuzzer.cc @@ -0,0 +1,294 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include "src/core/ext/xds/xds_bootstrap_grpc.h" +#include "src/core/ext/xds/xds_client.h" +#include "src/core/ext/xds/xds_cluster.h" +#include "src/core/ext/xds/xds_endpoint.h" +#include "src/core/ext/xds/xds_listener.h" +#include "src/core/ext/xds/xds_route_config.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/libfuzzer/libfuzzer_macro.h" +#include "test/core/xds/xds_client_fuzzer.pb.h" +#include "test/core/xds/xds_transport_fake.h" + +namespace grpc_core { + +class Fuzzer { + public: + explicit Fuzzer(absl::string_view bootstrap_json) { + auto bootstrap = GrpcXdsBootstrap::Create(bootstrap_json); + if (!bootstrap.ok()) { + gpr_log(GPR_ERROR, "error creating bootstrap: %s", + bootstrap.status().ToString().c_str()); + // Leave xds_client_ unset, so Act() will be a no-op. + return; + } + auto transport_factory = MakeOrphanable(); + transport_factory->SetAutoCompleteMessagesFromClient(false); + transport_factory_ = transport_factory.get(); + xds_client_ = MakeRefCounted(std::move(*bootstrap), + std::move(transport_factory)); + } + + void Act(const xds_client_fuzzer::Action& action) { + if (xds_client_ == nullptr) return; + switch (action.action_type_case()) { + case xds_client_fuzzer::Action::kStartWatch: + switch (action.start_watch().resource_type().resource_type_case()) { + case xds_client_fuzzer::ResourceType::kListener: + StartWatch(&listener_watchers_, + action.start_watch().resource_name()); + break; + case xds_client_fuzzer::ResourceType::kRouteConfig: + StartWatch(&route_config_watchers_, + action.start_watch().resource_name()); + break; + case xds_client_fuzzer::ResourceType::kCluster: + StartWatch(&cluster_watchers_, + action.start_watch().resource_name()); + break; + case xds_client_fuzzer::ResourceType::kEndpoint: + StartWatch(&endpoint_watchers_, + action.start_watch().resource_name()); + break; + case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET: + break; + } + break; + case xds_client_fuzzer::Action::kStopWatch: + switch (action.stop_watch().resource_type().resource_type_case()) { + case xds_client_fuzzer::ResourceType::kListener: + StopWatch(&listener_watchers_, action.stop_watch().resource_name()); + break; + case xds_client_fuzzer::ResourceType::kRouteConfig: + StopWatch(&route_config_watchers_, + action.stop_watch().resource_name()); + break; + case xds_client_fuzzer::ResourceType::kCluster: + StopWatch(&cluster_watchers_, action.stop_watch().resource_name()); + break; + case xds_client_fuzzer::ResourceType::kEndpoint: + StopWatch(&endpoint_watchers_, action.stop_watch().resource_name()); + break; + case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET: + break; + } + break; + case xds_client_fuzzer::Action::kDumpCsdsData: + xds_client_->DumpClientConfigBinary(); + break; + case xds_client_fuzzer::Action::kTriggerConnectionFailure: + TriggerConnectionFailure( + action.trigger_connection_failure().authority(), + ToAbslStatus(action.trigger_connection_failure().status())); + break; + case xds_client_fuzzer::Action::kReadMessageFromClient: + ReadMessageFromClient(action.read_message_from_client().stream_id(), + action.read_message_from_client().ok()); + break; + case xds_client_fuzzer::Action::kSendMessageToClient: + SendMessageToClient(action.send_message_to_client().stream_id(), + action.send_message_to_client().response()); + break; + case xds_client_fuzzer::Action::kSendStatusToClient: + SendStatusToClient( + action.send_status_to_client().stream_id(), + ToAbslStatus(action.send_status_to_client().status())); + break; + case xds_client_fuzzer::Action::ACTION_TYPE_NOT_SET: + break; + } + } + + private: + template + class Watcher : public ResourceTypeType::WatcherInterface { + public: + using ResourceType = ResourceTypeType; + + explicit Watcher(std::string resource_name) + : resource_name_(std::move(resource_name)) {} + + void OnResourceChanged( + typename ResourceType::ResourceType resource) override { + gpr_log(GPR_INFO, "==> OnResourceChanged(%s %s): %s", + std::string(ResourceType::Get()->type_url()).c_str(), + resource_name_.c_str(), resource.ToString().c_str()); + } + + void OnError(absl::Status status) override { + gpr_log(GPR_INFO, "==> OnError(%s %s): %s", + std::string(ResourceType::Get()->type_url()).c_str(), + resource_name_.c_str(), status.ToString().c_str()); + } + + void OnResourceDoesNotExist() override { + gpr_log(GPR_INFO, "==> OnResourceDoesNotExist(%s %s)", + std::string(ResourceType::Get()->type_url()).c_str(), + resource_name_.c_str()); + } + + private: + std::string resource_name_; + }; + + using ListenerWatcher = Watcher; + using RouteConfigWatcher = Watcher; + using ClusterWatcher = Watcher; + using EndpointWatcher = Watcher; + + template + void StartWatch(std::map>* watchers, + std::string resource_name) { + gpr_log(GPR_INFO, "### StartWatch(%s %s)", + std::string(WatcherType::ResourceType::Get()->type_url()).c_str(), + resource_name.c_str()); + auto watcher = MakeRefCounted(resource_name); + (*watchers)[resource_name].insert(watcher.get()); + WatcherType::ResourceType::Get()->StartWatch( + xds_client_.get(), resource_name, std::move(watcher)); + } + + template + void StopWatch(std::map>* watchers, + std::string resource_name) { + gpr_log(GPR_INFO, "### StopWatch(%s %s)", + std::string(WatcherType::ResourceType::Get()->type_url()).c_str(), + resource_name.c_str()); + auto& watchers_set = (*watchers)[resource_name]; + auto it = watchers_set.begin(); + if (it == watchers_set.end()) return; + WatcherType::ResourceType::Get()->CancelWatch(xds_client_.get(), + resource_name, *it); + watchers_set.erase(it); + } + + static absl::Status ToAbslStatus(const google::rpc::Status& status) { + return absl::Status(static_cast(status.code()), + status.message()); + } + + const XdsBootstrap::XdsServer* GetServer(const std::string& authority) { + const GrpcXdsBootstrap& bootstrap = + static_cast(xds_client_->bootstrap()); + if (authority.empty()) return &bootstrap.server(); + const auto* authority_entry = + static_cast( + bootstrap.LookupAuthority(authority)); + if (authority_entry == nullptr) return nullptr; + if (authority_entry->server() != nullptr) return authority_entry->server(); + return &bootstrap.server(); + } + + void TriggerConnectionFailure(const std::string& authority, + absl::Status status) { + gpr_log(GPR_INFO, "### TriggerConnectionFailure(%s): %s", authority.c_str(), + status.ToString().c_str()); + const auto* xds_server = GetServer(authority); + if (xds_server == nullptr) return; + transport_factory_->TriggerConnectionFailure(*xds_server, + std::move(status)); + } + + static const char* StreamIdMethod( + const xds_client_fuzzer::StreamId& stream_id) { + switch (stream_id.method_case()) { + case xds_client_fuzzer::StreamId::kAds: + return FakeXdsTransportFactory::kAdsMethod; + case xds_client_fuzzer::StreamId::kLrs: + return FakeXdsTransportFactory::kLrsMethod; + case xds_client_fuzzer::StreamId::METHOD_NOT_SET: + return nullptr; + } + } + + RefCountedPtr GetStream( + const xds_client_fuzzer::StreamId& stream_id) { + const auto* xds_server = GetServer(stream_id.authority()); + if (xds_server == nullptr) return nullptr; + const char* method = StreamIdMethod(stream_id); + if (method == nullptr) return nullptr; + return transport_factory_->WaitForStream(*xds_server, method, + absl::ZeroDuration()); + } + + static std::string StreamIdString( + const xds_client_fuzzer::StreamId& stream_id) { + return absl::StrCat("{authority=\"", stream_id.authority(), + "\", method=", StreamIdMethod(stream_id), "}"); + } + + void ReadMessageFromClient(const xds_client_fuzzer::StreamId& stream_id, + bool ok) { + gpr_log(GPR_INFO, "### ReadMessageFromClient(%s): %s", + StreamIdString(stream_id).c_str(), ok ? "true" : "false"); + auto stream = GetStream(stream_id); + if (stream == nullptr) return; + gpr_log(GPR_INFO, " stream=%p", stream.get()); + auto message = stream->WaitForMessageFromClient(absl::ZeroDuration()); + if (message.has_value()) { + gpr_log(GPR_INFO, " completing send_message"); + stream->CompleteSendMessageFromClient(ok); + } + } + + void SendMessageToClient( + const xds_client_fuzzer::StreamId& stream_id, + const envoy::service::discovery::v3::DiscoveryResponse& response) { + gpr_log(GPR_INFO, "### SendMessageToClient(%s)", + StreamIdString(stream_id).c_str()); + auto stream = GetStream(stream_id); + if (stream == nullptr) return; + gpr_log(GPR_INFO, " stream=%p", stream.get()); + stream->SendMessageToClient(response.SerializeAsString()); + } + + void SendStatusToClient(const xds_client_fuzzer::StreamId& stream_id, + absl::Status status) { + gpr_log(GPR_INFO, "### SendStatusToClient(%s): %s", + StreamIdString(stream_id).c_str(), status.ToString().c_str()); + auto stream = GetStream(stream_id); + if (stream == nullptr) return; + gpr_log(GPR_INFO, " stream=%p", stream.get()); + stream->MaybeSendStatusToClient(std::move(status)); + } + + RefCountedPtr xds_client_; + FakeXdsTransportFactory* transport_factory_; + + // Maps of currently active watchers for each resource type, keyed by + // resource name. + std::map> listener_watchers_; + std::map> route_config_watchers_; + std::map> cluster_watchers_; + std::map> endpoint_watchers_; +}; + +} // namespace grpc_core + +bool squelch = true; + +DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Message& message) { + grpc_init(); + grpc_core::Fuzzer fuzzer(message.bootstrap()); + for (int i = 0; i < message.actions_size(); i++) { + fuzzer.Act(message.actions(i)); + } + grpc_shutdown(); +} diff --git a/test/core/xds/xds_client_fuzzer.proto b/test/core/xds/xds_client_fuzzer.proto new file mode 100644 index 00000000000..d475a4bfba6 --- /dev/null +++ b/test/core/xds/xds_client_fuzzer.proto @@ -0,0 +1,109 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +syntax = "proto3"; + +import "src/proto/grpc/status/status.proto"; +import "src/proto/grpc/testing/xds/v3/discovery.proto"; + +package xds_client_fuzzer; + +// +// interactions with XdsClient API +// + +// Use a oneof instead of an enum so that we can ensure that the code +// will fail to build if we add a type here and don't handle it in the +// fuzzer code. +message ResourceType { + message EmptyMessage {} + oneof resource_type { + EmptyMessage listener = 1; + EmptyMessage route_config = 2; + EmptyMessage cluster = 3; + EmptyMessage endpoint = 4; + } +} + +message StartWatch { + ResourceType resource_type = 1; + string resource_name = 2; +} + +message StopWatch { + ResourceType resource_type = 1; + string resource_name = 2; +} + +// TODO(roth): add LRS methods on XdsClient + +message DumpCsdsData {} + +// +// interactions with fake transport +// + +message TriggerConnectionFailure { + string authority = 1; + google.rpc.Status status = 2; +} + +message StreamId { + string authority = 1; + + // Use a oneof instead of an enum so that we can ensure that the code + // will fail to build if we add a type here and don't handle it in the + // fuzzer code. + message EmptyMessage {} + oneof method { + EmptyMessage ads = 2; + EmptyMessage lrs = 3; + } +} + +message ReadMessageFromClient { + StreamId stream_id = 1; + bool ok = 2; +} + +message SendMessageToClient { + StreamId stream_id = 1; + envoy.service.discovery.v3.DiscoveryResponse response = 2; +} + +message SendStatusToClient { + StreamId stream_id = 1; + google.rpc.Status status = 2; +} + +message Action { + oneof action_type { + // interactions with XdsClient API + StartWatch start_watch = 1; + StopWatch stop_watch = 2; + DumpCsdsData dump_csds_data = 3; + // interactions with fake transport + TriggerConnectionFailure trigger_connection_failure = 4; + ReadMessageFromClient read_message_from_client = 5; + SendMessageToClient send_message_to_client = 6; + SendStatusToClient send_status_to_client = 7; + } +} + +message Message { + string bootstrap = 1; + repeated Action actions = 2; +} diff --git a/test/core/xds/xds_client_test.cc b/test/core/xds/xds_client_test.cc index e767d18309e..5f99e51ed55 100644 --- a/test/core/xds/xds_client_test.cc +++ b/test/core/xds/xds_client_test.cc @@ -228,8 +228,6 @@ class XdsClientTest : public ::testing::Test { XdsTestResourceType, ResourceStruct> { public: - using ResourceStructType = ResourceStruct; - // A watcher implementation that queues delivered watches. class Watcher : public XdsResourceTypeImpl< XdsTestResourceType ResponseBuilder& AddResource( - const typename ResourceType::ResourceStructType& resource, + const typename ResourceType::ResourceType& resource, bool in_resource_wrapper = false) { auto* res = response_.add_resources(); *res = ResourceType::EncodeAsAny(resource); diff --git a/test/core/xds/xds_transport_fake.cc b/test/core/xds/xds_transport_fake.cc index b77d2ab11e7..87e37bc446e 100644 --- a/test/core/xds/xds_transport_fake.cc +++ b/test/core/xds/xds_transport_fake.cc @@ -205,7 +205,7 @@ FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall( // constexpr char FakeXdsTransportFactory::kAdsMethod[]; -constexpr char FakeXdsTransportFactory::kAdsV2Method[]; +constexpr char FakeXdsTransportFactory::kLrsMethod[]; OrphanablePtr FakeXdsTransportFactory::Create( diff --git a/test/core/xds/xds_transport_fake.h b/test/core/xds/xds_transport_fake.h index 44ef5a30508..8a81f51310c 100644 --- a/test/core/xds/xds_transport_fake.h +++ b/test/core/xds/xds_transport_fake.h @@ -49,9 +49,8 @@ class FakeXdsTransportFactory : public XdsTransportFactory { static constexpr char kAdsMethod[] = "/envoy.service.discovery.v3.AggregatedDiscoveryService/" "StreamAggregatedResources"; - static constexpr char kAdsV2Method[] = - "/envoy.service.discovery.v2.AggregatedDiscoveryService/" - "StreamAggregatedResources"; + static constexpr char kLrsMethod[] = + "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats"; class FakeStreamingCall : public XdsTransport::StreamingCall { public: