[fuzzing] Define a common fuzzing interface and move API fuzzer to it (#32853)

Requires cherrypick for grpc_fuzzer.bzl file.
pull/32878/head
Vignesh Babu 2 years ago committed by GitHub
parent bd940c0dd5
commit a2c89d0b24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      test/core/end2end/fuzzers/BUILD
  2. 875
      test/core/end2end/fuzzers/api_fuzzer.cc
  3. 113
      test/core/end2end/fuzzers/fuzzing_common.cc
  4. 92
      test/core/end2end/fuzzers/fuzzing_common.h
  5. 29
      test/core/util/grpc_fuzzer.bzl

@ -12,24 +12,42 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_package") load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_package", "grpc_proto_library")
load("//test/core/util:grpc_fuzzer.bzl", "grpc_fuzzer", "grpc_proto_fuzzer") load("//test/core/util:grpc_fuzzer.bzl", "grpc_fuzzer", "grpc_proto_fuzzer")
grpc_package(name = "test/core/end2end/fuzzers") grpc_package(name = "test/core/end2end/fuzzers")
licenses(["notice"]) licenses(["notice"])
grpc_proto_library(
name = "api_fuzzer_proto",
srcs = ["api_fuzzer.proto"],
has_services = False,
deps = [
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
"//test/core/util:fuzz_config_vars_proto",
],
)
grpc_cc_library(
name = "fuzzing_common",
srcs = ["fuzzing_common.cc"],
hdrs = ["fuzzing_common.h"],
external_deps = ["absl/strings"],
deps = [
"api_fuzzer_proto",
"//:gpr",
"//:grpc",
],
)
grpc_proto_fuzzer( grpc_proto_fuzzer(
name = "api_fuzzer", name = "api_fuzzer",
size = "enormous", size = "enormous",
srcs = ["api_fuzzer.cc"], srcs = ["api_fuzzer.cc"],
corpus = "api_fuzzer_corpus", corpus = "api_fuzzer_corpus",
language = "C++", language = "C++",
proto = "api_fuzzer.proto", proto = None,
proto_deps = [
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
"//test/core/util:fuzz_config_vars_proto",
],
tags = [ tags = [
"no_mac", "no_mac",
"no_windows", "no_windows",
@ -37,6 +55,8 @@ grpc_proto_fuzzer(
uses_event_engine = False, uses_event_engine = False,
uses_polling = False, uses_polling = False,
deps = [ deps = [
"api_fuzzer_proto",
"fuzzing_common",
"//:gpr", "//:gpr",
"//:grpc", "//:grpc",
"//src/core:channel_args", "//src/core:channel_args",

@ -73,6 +73,7 @@
#include "src/libfuzzer/libfuzzer_macro.h" #include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/end2end/data/ssl_test_data.h" #include "test/core/end2end/data/ssl_test_data.h"
#include "test/core/end2end/fuzzers/api_fuzzer.pb.h" #include "test/core/end2end/fuzzers/api_fuzzer.pb.h"
#include "test/core/end2end/fuzzers/fuzzing_common.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/util/fuzz_config_vars.h" #include "test/core/util/fuzz_config_vars.h"
@ -95,15 +96,6 @@ bool leak_check = true;
static void dont_log(gpr_log_func_args* /*args*/) {} static void dont_log(gpr_log_func_args* /*args*/) {}
////////////////////////////////////////////////////////////////////////////////
// global state
static grpc_server* g_server;
static grpc_channel* g_channel;
static grpc_resource_quota* g_resource_quota;
static std::vector<grpc_passthru_endpoint_channel_action> g_channel_actions;
static std::atomic<bool> g_channel_force_delete{false};
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// dns resolution // dns resolution
@ -249,6 +241,73 @@ grpc_ares_request* my_dns_lookup_ares(
static void my_cancel_ares_request(grpc_ares_request* request) { static void my_cancel_ares_request(grpc_ares_request* request) {
GPR_ASSERT(request == nullptr); GPR_ASSERT(request == nullptr);
} }
////////////////////////////////////////////////////////////////////////////////
// globals
class Call;
namespace grpc {
namespace testing {
class ApiFuzzer : public BasicFuzzer {
public:
ApiFuzzer();
~ApiFuzzer() override;
Call* ActiveCall();
bool Continue();
void TryShutdown();
void Tick();
grpc_server* Server() { return server_; }
const std::vector<grpc_passthru_endpoint_channel_action>& ChannelActions() {
return channel_actions_;
}
private:
Result PollCq() override;
Result CreateChannel(
const api_fuzzer::CreateChannel& create_channel) override;
Result CloseChannel() override;
Result CreateServer(const api_fuzzer::CreateServer& create_server) override;
Result ShutdownServer() override;
Result CancelAllCallsIfShutdown() override;
Result DestroyServerIfReady() override;
Result CheckConnectivity(bool try_to_connect) override;
Result WatchConnectivity(uint32_t duration_us) override;
Result CreateCall(const api_fuzzer::CreateCall& create_call) override;
Result QueueBatchForActiveCall(const api_fuzzer::Batch& queue_batch) override;
Result ChangeActiveCall() override;
Result CancelActiveCall() override;
Result SendPingOnChannel() override;
Result ServerRequestCall() override;
Result DestroyActiveCall() override;
Result ValidatePeerForActiveCall() override;
Result ValidateChannelTarget() override;
Result ResizeResourceQuota(uint32_t resize_resource_quota) override;
std::shared_ptr<FuzzingEventEngine> engine_;
grpc_completion_queue* cq_ = nullptr;
grpc_server* server_ = nullptr;
grpc_channel* channel_ = nullptr;
grpc_resource_quota* resource_quota_;
std::vector<grpc_passthru_endpoint_channel_action> channel_actions_;
std::atomic<bool> channel_force_delete_{false};
std::vector<std::shared_ptr<Call>> calls_;
size_t active_call_ = 0;
bool server_shutdown_ = false;
int pending_server_shutdowns_ = 0;
int pending_channel_watches_ = 0;
int pending_pings_ = 0;
};
} // namespace testing
} // namespace grpc
namespace {
grpc::testing::ApiFuzzer* g_api_fuzzer = nullptr;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// client connection // client connection
@ -262,15 +321,16 @@ typedef struct {
} future_connect; } future_connect;
static void do_connect(future_connect fc) { static void do_connect(future_connect fc) {
if (g_server != nullptr) { if (g_api_fuzzer->Server() != nullptr) {
grpc_endpoint* client; grpc_endpoint* client;
grpc_endpoint* server; grpc_endpoint* server;
grpc_passthru_endpoint_create(&client, &server, nullptr, true); grpc_passthru_endpoint_create(&client, &server, nullptr, true);
*fc.ep = client; *fc.ep = client;
start_scheduling_grpc_passthru_endpoint_channel_effects(client, start_scheduling_grpc_passthru_endpoint_channel_effects(
g_channel_actions); client, g_api_fuzzer->ChannelActions());
grpc_core::Server* core_server = grpc_core::Server::FromC(g_server); grpc_core::Server* core_server =
grpc_core::Server::FromC(g_api_fuzzer->Server());
grpc_transport* transport = grpc_create_chttp2_transport( grpc_transport* transport = grpc_create_chttp2_transport(
core_server->channel_args(), server, false); core_server->channel_args(), server, false);
GPR_ASSERT(GRPC_LOG_IF_ERROR( GPR_ASSERT(GRPC_LOG_IF_ERROR(
@ -520,8 +580,8 @@ class Call : public std::enable_shared_from_this<Call> {
case api_fuzzer::BatchOp::kReceiveMessage: case api_fuzzer::BatchOp::kReceiveMessage:
// Allow only one active pending_recv_message_op to exist. Otherwise if // Allow only one active pending_recv_message_op to exist. Otherwise if
// the previous enqueued recv_message_op is not complete by the time // the previous enqueued recv_message_op is not complete by the time
// we get here, then under certain conditions, enqueing this op will // we get here, then under certain conditions, enqueuing this op will
// over-write the internal call->receiving_buffer maintained by grpc // overwrite the internal call->receiving_buffer maintained by grpc
// leading to a memory leak. // leading to a memory leak.
if (call_closed_ || pending_recv_message_op_) { if (call_closed_ || pending_recv_message_op_) {
*batch_is_ok = false; *batch_is_ok = false;
@ -613,22 +673,6 @@ class Call : public std::enable_shared_from_this<Call> {
std::vector<grpc_slice> unref_slices_; std::vector<grpc_slice> unref_slices_;
}; };
static std::vector<std::shared_ptr<Call>> g_calls;
static size_t g_active_call = 0;
static Call* ActiveCall() {
while (!g_calls.empty()) {
if (g_active_call >= g_calls.size()) {
g_active_call = 0;
}
if (g_calls[g_active_call] != nullptr && !g_calls[g_active_call]->done()) {
return g_calls[g_active_call].get();
}
g_calls.erase(g_calls.begin() + g_active_call);
}
return nullptr;
}
Call::~Call() { Call::~Call() {
if (call_ != nullptr) { if (call_ != nullptr) {
grpc_call_unref(call_); grpc_call_unref(call_);
@ -653,7 +697,8 @@ Call::~Call() {
} }
template <typename ChannelArgContainer> template <typename ChannelArgContainer>
grpc_channel_args* ReadArgs(const ChannelArgContainer& args) { grpc_channel_args* ReadArgs(grpc_resource_quota* resource_quota,
const ChannelArgContainer& args) {
grpc_channel_args* res = grpc_channel_args* res =
static_cast<grpc_channel_args*>(gpr_malloc(sizeof(grpc_channel_args))); static_cast<grpc_channel_args*>(gpr_malloc(sizeof(grpc_channel_args)));
res->args = res->args =
@ -671,9 +716,9 @@ grpc_channel_args* ReadArgs(const ChannelArgContainer& args) {
break; break;
case api_fuzzer::ChannelArg::kResourceQuota: case api_fuzzer::ChannelArg::kResourceQuota:
if (args[i].key() != GRPC_ARG_RESOURCE_QUOTA) continue; if (args[i].key() != GRPC_ARG_RESOURCE_QUOTA) continue;
grpc_resource_quota_ref(g_resource_quota); grpc_resource_quota_ref(resource_quota);
res->args[j].type = GRPC_ARG_POINTER; res->args[j].type = GRPC_ARG_POINTER;
res->args[j].value.pointer.p = g_resource_quota; res->args[j].value.pointer.p = resource_quota;
res->args[j].value.pointer.vtable = grpc_resource_quota_arg_vtable(); res->args[j].value.pointer.vtable = grpc_resource_quota_arg_vtable();
break; break;
case api_fuzzer::ChannelArg::VALUE_NOT_SET: case api_fuzzer::ChannelArg::VALUE_NOT_SET:
@ -802,19 +847,11 @@ static grpc_channel_credentials* ReadChannelCreds(
} }
} }
DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { namespace grpc {
if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) { namespace testing {
gpr_set_log_function(dont_log);
} ApiFuzzer::ApiFuzzer() {
if (msg.has_config_vars()) { engine_ =
grpc_core::ApplyFuzzConfigVars(msg.config_vars());
}
grpc_event_engine::experimental::SetEventEngineFactory(
[actions = msg.event_engine_actions()]() {
return std::make_unique<FuzzingEventEngine>(
FuzzingEventEngine::Options(), actions);
});
auto engine =
std::dynamic_pointer_cast<FuzzingEventEngine>(GetDefaultEventEngine()); std::dynamic_pointer_cast<FuzzingEventEngine>(GetDefaultEventEngine());
grpc_init(); grpc_init();
grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable); grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable);
@ -824,392 +861,404 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
grpc_core::Executor::SetThreadingAll(false); grpc_core::Executor::SetThreadingAll(false);
} }
grpc_core::ResetDNSResolver( grpc_core::ResetDNSResolver(
std::make_unique<FuzzerDNSResolver>(engine.get())); std::make_unique<FuzzerDNSResolver>(engine_.get()));
grpc_dns_lookup_hostname_ares = my_dns_lookup_ares; grpc_dns_lookup_hostname_ares = my_dns_lookup_ares;
grpc_cancel_ares_request = my_cancel_ares_request; grpc_cancel_ares_request = my_cancel_ares_request;
GPR_ASSERT(g_channel == nullptr); GPR_ASSERT(channel_ == nullptr);
GPR_ASSERT(g_server == nullptr); GPR_ASSERT(server_ == nullptr);
bool server_shutdown = false; resource_quota_ = grpc_resource_quota_create("api_fuzzer");
int pending_server_shutdowns = 0; cq_ = grpc_completion_queue_create_for_next(nullptr);
int pending_channel_watches = 0; }
int pending_pings = 0;
g_resource_quota = grpc_resource_quota_create("api_fuzzer"); Call* ApiFuzzer::ActiveCall() {
while (!calls_.empty()) {
if (active_call_ >= calls_.size()) {
active_call_ = 0;
}
if (calls_[active_call_] != nullptr && !calls_[active_call_]->done()) {
return calls_[active_call_].get();
}
calls_.erase(calls_.begin() + active_call_);
}
return nullptr;
}
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); bool ApiFuzzer::Continue() {
return channel_ != nullptr || server_ != nullptr ||
pending_channel_watches_ > 0 || pending_pings_ > 0 ||
ActiveCall() != nullptr;
}
int action_index = 0; void ApiFuzzer::TryShutdown() {
auto no_more_actions = [&]() { action_index = msg.actions_size(); }; engine_->FuzzingDone();
auto poll_cq = [&]() -> bool { if (channel_ != nullptr) {
grpc_event ev = grpc_completion_queue_next( grpc_channel_destroy(channel_);
cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); channel_ = nullptr;
switch (ev.type) { }
case GRPC_OP_COMPLETE: { if (server_ != nullptr) {
static_cast<Validator*>(ev.tag)->Run(ev.success); if (!server_shutdown_) {
break; grpc_server_shutdown_and_notify(
} server_, cq_, AssertSuccessAndDecrement(&pending_server_shutdowns_));
case GRPC_QUEUE_TIMEOUT: server_shutdown_ = true;
break; pending_server_shutdowns_++;
case GRPC_QUEUE_SHUTDOWN: } else if (pending_server_shutdowns_ == 0) {
return true; grpc_server_destroy(server_);
server_ = nullptr;
} }
return false; }
}; for (auto& call : calls_) {
if (call == nullptr) continue;
if (call->type() == CallType::PENDING_SERVER) continue;
call->Shutdown();
}
while (action_index < msg.actions_size() || g_channel != nullptr || grpc_timer_manager_tick();
g_server != nullptr || pending_channel_watches > 0 || GPR_ASSERT(PollCq() == Result::kPending);
pending_pings > 0 || ActiveCall() != nullptr) { }
engine->Tick();
if (action_index == msg.actions_size()) { ApiFuzzer::~ApiFuzzer() {
engine->FuzzingDone(); GPR_ASSERT(channel_ == nullptr);
if (g_channel != nullptr) { GPR_ASSERT(server_ == nullptr);
grpc_channel_destroy(g_channel); GPR_ASSERT(ActiveCall() == nullptr);
g_channel = nullptr; GPR_ASSERT(calls_.empty());
}
if (g_server != nullptr) {
if (!server_shutdown) {
grpc_server_shutdown_and_notify(
g_server, cq,
AssertSuccessAndDecrement(&pending_server_shutdowns));
server_shutdown = true;
pending_server_shutdowns++;
} else if (pending_server_shutdowns == 0) {
grpc_server_destroy(g_server);
g_server = nullptr;
}
}
for (auto& call : g_calls) {
if (call == nullptr) continue;
if (call->type() == CallType::PENDING_SERVER) continue;
call->Shutdown();
}
grpc_timer_manager_tick(); grpc_completion_queue_shutdown(cq_);
GPR_ASSERT(!poll_cq()); GPR_ASSERT(PollCq() == Result::kComplete);
continue; grpc_completion_queue_destroy(cq_);
grpc_resource_quota_unref(resource_quota_);
grpc_shutdown_blocking();
engine_->UnsetGlobalHooks();
}
void ApiFuzzer::Tick() {
engine_->Tick();
grpc_timer_manager_tick();
if (channel_force_delete_.exchange(false) && channel_) {
grpc_channel_destroy(channel_);
channel_ = nullptr;
channel_actions_.clear();
}
}
ApiFuzzer::Result ApiFuzzer::PollCq() {
grpc_event ev = grpc_completion_queue_next(
cq_, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
switch (ev.type) {
case GRPC_OP_COMPLETE: {
static_cast<Validator*>(ev.tag)->Run(ev.success);
break;
} }
case GRPC_QUEUE_TIMEOUT:
break;
case GRPC_QUEUE_SHUTDOWN:
return Result::kComplete;
}
return Result::kPending;
}
ApiFuzzer::Result ApiFuzzer::CreateChannel(
const api_fuzzer::CreateChannel& create_channel) {
if (!create_channel.channel_actions_size() || channel_ != nullptr) {
return Result::kFailed;
} else {
grpc_channel_args* args =
ReadArgs(resource_quota_, create_channel.channel_args());
grpc_channel_credentials* creds =
create_channel.has_channel_creds()
? ReadChannelCreds(create_channel.channel_creds())
: grpc_insecure_credentials_create();
channel_ =
grpc_channel_create(create_channel.target().c_str(), creds, args);
grpc_channel_credentials_release(creds);
channel_actions_.clear();
for (int i = 0; i < create_channel.channel_actions_size(); i++) {
const api_fuzzer::ChannelAction& channel_action =
create_channel.channel_actions(i);
channel_actions_.push_back({
std::min(channel_action.wait_ms(), kMaxWaitMs),
std::min(channel_action.add_n_bytes_writable(),
kMaxAddNWritableBytes),
std::min(channel_action.add_n_bytes_readable(),
kMaxAddNReadableBytes),
});
}
GPR_ASSERT(channel_ != nullptr);
channel_force_delete_ = false;
{
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(args);
}
}
return Result::kComplete;
}
grpc_timer_manager_tick(); ApiFuzzer::Result ApiFuzzer::CloseChannel() {
if (channel_ != nullptr) {
grpc_channel_destroy(channel_);
channel_ = nullptr;
} else {
return Result::kFailed;
}
return Result::kComplete;
}
if (g_channel_force_delete.exchange(false) && g_channel) { ApiFuzzer::Result ApiFuzzer::CreateServer(
grpc_channel_destroy(g_channel); const api_fuzzer::CreateServer& create_server) {
g_channel = nullptr; if (server_ == nullptr) {
g_channel_actions.clear(); grpc_channel_args* args =
ReadArgs(resource_quota_, create_server.channel_args());
server_ = grpc_server_create(args, nullptr);
GPR_ASSERT(server_ != nullptr);
{
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(args);
} }
grpc_server_register_completion_queue(server_, cq_, nullptr);
grpc_server_start(server_);
server_shutdown_ = false;
GPR_ASSERT(pending_server_shutdowns_ == 0);
} else {
return Result::kFailed;
}
return Result::kComplete;
}
const api_fuzzer::Action& action = msg.actions(action_index); ApiFuzzer::Result ApiFuzzer::ShutdownServer() {
action_index++; if (server_ != nullptr) {
switch (action.type_case()) { grpc_server_shutdown_and_notify(
case api_fuzzer::Action::TYPE_NOT_SET: server_, cq_, AssertSuccessAndDecrement(&pending_server_shutdowns_));
no_more_actions(); pending_server_shutdowns_++;
break; server_shutdown_ = true;
// tickle completion queue } else {
case api_fuzzer::Action::kPollCq: { return Result::kFailed;
GPR_ASSERT(!poll_cq()); }
break; return Result::kComplete;
} }
// create an insecure channel
case api_fuzzer::Action::kCreateChannel: {
if (!action.create_channel().channel_actions_size() ||
g_channel != nullptr) {
no_more_actions();
} else {
grpc_channel_args* args =
ReadArgs(action.create_channel().channel_args());
grpc_channel_credentials* creds =
action.create_channel().has_channel_creds()
? ReadChannelCreds(action.create_channel().channel_creds())
: grpc_insecure_credentials_create();
g_channel = grpc_channel_create(
action.create_channel().target().c_str(), creds, args);
grpc_channel_credentials_release(creds);
g_channel_actions.clear();
for (int i = 0; i < action.create_channel().channel_actions_size();
i++) {
const api_fuzzer::ChannelAction& channel_action =
action.create_channel().channel_actions(i);
g_channel_actions.push_back({
std::min(channel_action.wait_ms(), kMaxWaitMs),
std::min(channel_action.add_n_bytes_writable(),
kMaxAddNWritableBytes),
std::min(channel_action.add_n_bytes_readable(),
kMaxAddNReadableBytes),
});
}
GPR_ASSERT(g_channel != nullptr);
g_channel_force_delete = false;
{
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(args);
}
}
break;
}
// destroy a channel
case api_fuzzer::Action::kCloseChannel: {
if (g_channel != nullptr) {
grpc_channel_destroy(g_channel);
g_channel = nullptr;
} else {
no_more_actions();
}
break;
}
// bring up a server
case api_fuzzer::Action::kCreateServer: {
if (g_server == nullptr) {
grpc_channel_args* args =
ReadArgs(action.create_server().channel_args());
g_server = grpc_server_create(args, nullptr);
GPR_ASSERT(g_server != nullptr);
{
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(args);
}
grpc_server_register_completion_queue(g_server, cq, nullptr);
grpc_server_start(g_server);
server_shutdown = false;
GPR_ASSERT(pending_server_shutdowns == 0);
} else {
no_more_actions();
}
break;
}
// begin server shutdown
case api_fuzzer::Action::kShutdownServer: {
if (g_server != nullptr) {
grpc_server_shutdown_and_notify(
g_server, cq,
AssertSuccessAndDecrement(&pending_server_shutdowns));
pending_server_shutdowns++;
server_shutdown = true;
} else {
no_more_actions();
}
break;
}
// cancel all calls if shutdown
case api_fuzzer::Action::kCancelAllCallsIfShutdown: {
if (g_server != nullptr && server_shutdown) {
grpc_server_cancel_all_calls(g_server);
} else {
no_more_actions();
}
break;
}
// destroy server
case api_fuzzer::Action::kDestroyServerIfReady: {
if (g_server != nullptr && server_shutdown &&
pending_server_shutdowns == 0) {
grpc_server_destroy(g_server);
g_server = nullptr;
} else {
no_more_actions();
}
break;
}
// check connectivity
case api_fuzzer::Action::kCheckConnectivity: {
if (g_channel != nullptr) {
grpc_channel_check_connectivity_state(g_channel,
action.check_connectivity());
} else {
no_more_actions();
}
break;
}
// watch connectivity
case api_fuzzer::Action::kWatchConnectivity: {
if (g_channel != nullptr) {
grpc_connectivity_state st =
grpc_channel_check_connectivity_state(g_channel, 0);
if (st != GRPC_CHANNEL_SHUTDOWN) {
gpr_timespec deadline =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(action.watch_connectivity(),
GPR_TIMESPAN));
grpc_channel_watch_connectivity_state(
g_channel, st, deadline, cq,
ValidateConnectivityWatch(deadline, &pending_channel_watches));
pending_channel_watches++;
}
} else {
no_more_actions();
}
break;
}
// create a call
case api_fuzzer::Action::kCreateCall: {
bool ok = true;
if (g_channel == nullptr) ok = false;
// If the active call is a server call, then use it as the parent call
// to exercise the propagation logic.
Call* parent_call = ActiveCall();
if (parent_call != nullptr && parent_call->type() != CallType::SERVER) {
parent_call = nullptr;
}
g_calls.emplace_back(new Call(CallType::CLIENT));
grpc_slice method =
g_calls.back()->ReadSlice(action.create_call().method());
if (GRPC_SLICE_LENGTH(method) == 0) {
ok = false;
}
grpc_slice host =
g_calls.back()->ReadSlice(action.create_call().host());
gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(action.create_call().timeout(), GPR_TIMESPAN));
if (ok) {
g_calls.back()->SetCall(grpc_channel_create_call(
g_channel, parent_call == nullptr ? nullptr : parent_call->call(),
action.create_call().propagation_mask(), cq, method, &host,
deadline, nullptr));
} else {
g_calls.pop_back();
no_more_actions();
}
break;
}
// switch the 'current' call
case api_fuzzer::Action::kChangeActiveCall: {
g_active_call++;
ActiveCall();
break;
}
// queue some ops on a call
case api_fuzzer::Action::kQueueBatch: {
auto* active_call = ActiveCall();
if (active_call == nullptr ||
active_call->type() == CallType::PENDING_SERVER ||
active_call->call() == nullptr) {
no_more_actions();
break;
}
const auto& batch = action.queue_batch().operations();
if (batch.size() > 6) {
no_more_actions();
break;
}
std::vector<grpc_op> ops;
bool ok = true;
uint8_t has_ops = 0;
std::vector<std::function<void()>> unwinders;
for (const auto& batch_op : batch) {
auto op = active_call->ReadOp(batch_op, &ok, &has_ops, &unwinders);
if (!op.has_value()) continue;
ops.push_back(*op);
}
if (g_channel == nullptr) ok = false; ApiFuzzer::Result ApiFuzzer::CancelAllCallsIfShutdown() {
if (ok) { if (server_ != nullptr && server_shutdown_) {
auto* v = active_call->FinishedBatchValidator(has_ops); grpc_server_cancel_all_calls(server_);
grpc_call_error error = grpc_call_start_batch( } else {
active_call->call(), ops.data(), ops.size(), v, nullptr); return Result::kFailed;
if (error != GRPC_CALL_OK) { }
v->Run(false); return Result::kComplete;
} }
} else {
no_more_actions(); ApiFuzzer::Result ApiFuzzer::DestroyServerIfReady() {
for (auto& unwind : unwinders) { if (server_ != nullptr && server_shutdown_ &&
unwind(); pending_server_shutdowns_ == 0) {
} grpc_server_destroy(server_);
} server_ = nullptr;
break; } else {
} return Result::kFailed;
// cancel current call }
case api_fuzzer::Action::kCancelCall: { return Result::kComplete;
auto* active_call = ActiveCall(); }
if (active_call != nullptr && active_call->call() != nullptr) {
grpc_call_cancel(active_call->call(), nullptr); ApiFuzzer::Result ApiFuzzer::CheckConnectivity(bool try_to_connect) {
} else { if (channel_ != nullptr) {
no_more_actions(); grpc_channel_check_connectivity_state(channel_, try_to_connect);
} } else {
break; return Result::kFailed;
} }
// get a calls peer return Result::kComplete;
case api_fuzzer::Action::kGetPeer: { }
auto* active_call = ActiveCall();
if (active_call != nullptr && active_call->call() != nullptr) { ApiFuzzer::Result ApiFuzzer::WatchConnectivity(uint32_t duration_us) {
free_non_null(grpc_call_get_peer(active_call->call())); if (channel_ != nullptr) {
} else { grpc_connectivity_state st =
no_more_actions(); grpc_channel_check_connectivity_state(channel_, 0);
} if (st != GRPC_CHANNEL_SHUTDOWN) {
break; gpr_timespec deadline =
} gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
// get a channels target gpr_time_from_micros(duration_us, GPR_TIMESPAN));
case api_fuzzer::Action::kGetTarget: { grpc_channel_watch_connectivity_state(
if (g_channel != nullptr) { channel_, st, deadline, cq_,
free_non_null(grpc_channel_get_target(g_channel)); ValidateConnectivityWatch(deadline, &pending_channel_watches_));
} else { pending_channel_watches_++;
no_more_actions();
}
break;
}
// send a ping on a channel
case api_fuzzer::Action::kPing: {
if (g_channel != nullptr) {
pending_pings++;
grpc_channel_ping(g_channel, cq, Decrement(&pending_pings), nullptr);
} else {
no_more_actions();
}
break;
}
// enable a tracer
case api_fuzzer::Action::kEnableTracer: {
grpc_tracer_set_enabled(action.enable_tracer().c_str(), 1);
break;
}
// disable a tracer
case api_fuzzer::Action::kDisableTracer: {
grpc_tracer_set_enabled(action.disable_tracer().c_str(), 0);
break;
}
// request a server call
case api_fuzzer::Action::kRequestCall: {
if (g_server == nullptr) {
no_more_actions();
break;
}
g_calls.emplace_back(new Call(CallType::PENDING_SERVER));
g_calls.back()->RequestCall(g_server, cq);
break;
}
// destroy a call
case api_fuzzer::Action::kDestroyCall: {
auto* active_call = ActiveCall();
if (active_call != nullptr &&
active_call->type() != CallType::PENDING_SERVER &&
active_call->call() != nullptr) {
g_calls[g_active_call]->Shutdown();
} else {
no_more_actions();
}
break;
}
// resize the buffer pool
case api_fuzzer::Action::kResizeResourceQuota: {
grpc_resource_quota_resize(g_resource_quota,
action.resize_resource_quota());
break;
}
} }
} else {
return Result::kFailed;
} }
return Result::kComplete;
}
GPR_ASSERT(g_channel == nullptr); ApiFuzzer::Result ApiFuzzer::CreateCall(
GPR_ASSERT(g_server == nullptr); const api_fuzzer::CreateCall& create_call) {
GPR_ASSERT(ActiveCall() == nullptr); bool ok = true;
GPR_ASSERT(g_calls.empty()); if (channel_ == nullptr) ok = false;
// If the active call is a server call, then use it as the parent call
// to exercise the propagation logic.
Call* parent_call = ActiveCall();
if (parent_call != nullptr && parent_call->type() != CallType::SERVER) {
parent_call = nullptr;
}
calls_.emplace_back(new Call(CallType::CLIENT));
grpc_slice method = calls_.back()->ReadSlice(create_call.method());
if (GRPC_SLICE_LENGTH(method) == 0) {
ok = false;
}
grpc_slice host = calls_.back()->ReadSlice(create_call.host());
gpr_timespec deadline =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(create_call.timeout(), GPR_TIMESPAN));
if (ok) {
calls_.back()->SetCall(grpc_channel_create_call(
channel_, parent_call == nullptr ? nullptr : parent_call->call(),
create_call.propagation_mask(), cq_, method, &host, deadline, nullptr));
} else {
calls_.pop_back();
return Result::kFailed;
}
return Result::kComplete;
}
grpc_completion_queue_shutdown(cq); ApiFuzzer::Result ApiFuzzer::ChangeActiveCall() {
GPR_ASSERT(poll_cq()); active_call_++;
grpc_completion_queue_destroy(cq); ActiveCall();
return Result::kComplete;
}
grpc_resource_quota_unref(g_resource_quota); ApiFuzzer::Result ApiFuzzer::QueueBatchForActiveCall(
grpc_shutdown_blocking(); const api_fuzzer::Batch& queue_batch) {
engine->UnsetGlobalHooks(); auto* active_call = ActiveCall();
if (active_call == nullptr ||
active_call->type() == CallType::PENDING_SERVER ||
active_call->call() == nullptr) {
return Result::kFailed;
}
const auto& batch = queue_batch.operations();
if (batch.size() > 6) {
return Result::kFailed;
}
std::vector<grpc_op> ops;
bool ok = true;
uint8_t has_ops = 0;
std::vector<std::function<void()>> unwinders;
for (const auto& batch_op : batch) {
auto op = active_call->ReadOp(batch_op, &ok, &has_ops, &unwinders);
if (!op.has_value()) continue;
ops.push_back(*op);
}
if (channel_ == nullptr) ok = false;
if (ok) {
auto* v = active_call->FinishedBatchValidator(has_ops);
grpc_call_error error = grpc_call_start_batch(
active_call->call(), ops.data(), ops.size(), v, nullptr);
if (error != GRPC_CALL_OK) {
v->Run(false);
}
} else {
for (auto& unwind : unwinders) {
unwind();
}
return Result::kFailed;
}
return Result::kComplete;
}
ApiFuzzer::Result ApiFuzzer::CancelActiveCall() {
auto* active_call = ActiveCall();
if (active_call != nullptr && active_call->call() != nullptr) {
grpc_call_cancel(active_call->call(), nullptr);
} else {
return Result::kFailed;
}
return Result::kComplete;
}
ApiFuzzer::Result ApiFuzzer::SendPingOnChannel() {
if (channel_ != nullptr) {
pending_pings_++;
grpc_channel_ping(channel_, cq_, Decrement(&pending_pings_), nullptr);
} else {
return Result::kFailed;
}
return Result::kComplete;
}
ApiFuzzer::Result ApiFuzzer::ServerRequestCall() {
if (server_ == nullptr) {
return Result::kFailed;
}
calls_.emplace_back(new Call(CallType::PENDING_SERVER));
calls_.back()->RequestCall(server_, cq_);
return Result::kComplete;
}
ApiFuzzer::Result ApiFuzzer::DestroyActiveCall() {
auto* active_call = ActiveCall();
if (active_call != nullptr &&
active_call->type() != CallType::PENDING_SERVER &&
active_call->call() != nullptr) {
calls_[active_call_]->Shutdown();
} else {
return Result::kFailed;
}
return Result::kComplete;
}
ApiFuzzer::Result ApiFuzzer::ValidatePeerForActiveCall() {
auto* active_call = ActiveCall();
if (active_call != nullptr && active_call->call() != nullptr) {
free_non_null(grpc_call_get_peer(active_call->call()));
} else {
return Result::kFailed;
}
return Result::kComplete;
}
ApiFuzzer::Result ApiFuzzer::ValidateChannelTarget() {
if (channel_ != nullptr) {
free_non_null(grpc_channel_get_target(channel_));
} else {
return Result::kFailed;
}
return Result::kComplete;
}
ApiFuzzer::Result ApiFuzzer::ResizeResourceQuota(
uint32_t resize_resource_quota) {
grpc_resource_quota_resize(resource_quota_, resize_resource_quota);
return Result::kComplete;
}
} // namespace testing
} // namespace grpc
DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) {
gpr_set_log_function(dont_log);
}
if (msg.has_config_vars()) {
grpc_core::ApplyFuzzConfigVars(msg.config_vars());
}
grpc_event_engine::experimental::SetEventEngineFactory(
[actions = msg.event_engine_actions()]() {
return std::make_unique<FuzzingEventEngine>(
FuzzingEventEngine::Options(), actions);
});
g_api_fuzzer = new grpc::testing::ApiFuzzer();
int action_index = 0;
auto no_more_actions = [&]() { action_index = msg.actions_size(); };
while (action_index < msg.actions_size() || g_api_fuzzer->Continue()) {
g_api_fuzzer->Tick();
if (action_index == msg.actions_size()) {
g_api_fuzzer->TryShutdown();
continue;
}
auto result = g_api_fuzzer->ExecuteAction(msg.actions(action_index++));
if (result == grpc::testing::ApiFuzzer::Result::kFailed) {
no_more_actions();
}
}
delete g_api_fuzzer;
} }

@ -0,0 +1,113 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "test/core/end2end/fuzzers/fuzzing_common.h"
#include <string>
#include "absl/strings/str_cat.h"
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/crash.h"
#include "test/core/end2end/fuzzers/api_fuzzer.pb.h"
namespace grpc {
namespace testing {
BasicFuzzer::Result BasicFuzzer::ExecuteAction(
const api_fuzzer::Action& action) {
switch (action.type_case()) {
case api_fuzzer::Action::TYPE_NOT_SET:
return BasicFuzzer::Result::kFailed;
// tickle completion queue
case api_fuzzer::Action::kPollCq:
return PollCq();
// create an insecure channel
case api_fuzzer::Action::kCreateChannel:
return CreateChannel(action.create_channel());
// destroy a channel
case api_fuzzer::Action::kCloseChannel:
return CloseChannel();
// bring up a server
case api_fuzzer::Action::kCreateServer:
return CreateServer(action.create_server());
// begin server shutdown
case api_fuzzer::Action::kShutdownServer:
return ShutdownServer();
// cancel all calls if server is shutdown
case api_fuzzer::Action::kCancelAllCallsIfShutdown:
return CancelAllCallsIfShutdown();
// destroy server
case api_fuzzer::Action::kDestroyServerIfReady:
return DestroyServerIfReady();
// check connectivity
case api_fuzzer::Action::kCheckConnectivity:
return CheckConnectivity(action.check_connectivity());
// watch connectivity
case api_fuzzer::Action::kWatchConnectivity:
return WatchConnectivity(action.watch_connectivity());
// create a call
case api_fuzzer::Action::kCreateCall:
return CreateCall(action.create_call());
// switch the 'current' call
case api_fuzzer::Action::kChangeActiveCall:
return ChangeActiveCall();
// queue some ops on a call
case api_fuzzer::Action::kQueueBatch:
return QueueBatchForActiveCall(action.queue_batch());
// cancel current call
case api_fuzzer::Action::kCancelCall:
return CancelActiveCall();
// get a calls peer
case api_fuzzer::Action::kGetPeer:
return ValidatePeerForActiveCall();
// get a channels target
case api_fuzzer::Action::kGetTarget:
return ValidateChannelTarget();
// send a ping on a channel
case api_fuzzer::Action::kPing:
return SendPingOnChannel();
// enable a tracer
case api_fuzzer::Action::kEnableTracer: {
grpc_tracer_set_enabled(action.enable_tracer().c_str(), 1);
break;
}
// disable a tracer
case api_fuzzer::Action::kDisableTracer: {
grpc_tracer_set_enabled(action.disable_tracer().c_str(), 0);
break;
}
// request a server call
case api_fuzzer::Action::kRequestCall:
return ServerRequestCall();
// destroy a call
case api_fuzzer::Action::kDestroyCall:
return DestroyActiveCall();
// resize the buffer pool
case api_fuzzer::Action::kResizeResourceQuota:
return ResizeResourceQuota(action.resize_resource_quota());
default:
grpc_core::Crash(absl::StrCat("Unsupported Fuzzing Action of type: ",
action.type_case()));
}
return BasicFuzzer::Result::kComplete;
}
} // namespace testing
} // namespace grpc

@ -0,0 +1,92 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_TEST_CORE_END2END_FUZZERS_FUZZING_COMMON_H
#define GRPC_TEST_CORE_END2END_FUZZERS_FUZZING_COMMON_H
#include <stdint.h>
#include "test/core/end2end/fuzzers/api_fuzzer.pb.h"
namespace grpc {
namespace testing {
class BasicFuzzer {
public:
enum Result { kPending = 0, kComplete = 1, kFailed = 2, kNotSupported = 3 };
virtual Result ExecuteAction(const api_fuzzer::Action& action);
virtual ~BasicFuzzer() = default;
private:
// Poll any created completion queue to drive the RPC forward.
virtual Result PollCq() = 0;
// Channel specific actions.
// Create an active channel with the specified parameters.
virtual Result CreateChannel(
const api_fuzzer::CreateChannel& create_channel) = 0;
// Check whether the channel is connected and optionally try to connect if it
// is not connected.
virtual Result CheckConnectivity(bool try_to_connect) = 0;
// Watch whether the channel connects within the specified duration.
virtual Result WatchConnectivity(uint32_t duration_us) = 0;
// Verify that the channel target can be reliably queried.
virtual Result ValidateChannelTarget() = 0;
// Send a http ping on the channel.
virtual Result SendPingOnChannel() = 0;
// Close the active channel.
virtual Result CloseChannel() = 0;
// Server specific actions
// Create an active server.
virtual Result CreateServer(
const api_fuzzer::CreateServer& create_server) = 0;
// Request to be notified of a new RPC on the active server.
virtual Result ServerRequestCall() = 0;
// Shutdown the active server.
virtual Result ShutdownServer() = 0;
// Destroy the active server.
virtual Result DestroyServerIfReady() = 0;
// Call specific actions.
// Create a call on the active channel with the specified parameters. Also add
// it the list of managed calls.
virtual Result CreateCall(const api_fuzzer::CreateCall& create_call) = 0;
// Choose a different active call from the list of managed calls.
virtual Result ChangeActiveCall() = 0;
// Queue a batch of operations to be executed on the active call.
virtual Result QueueBatchForActiveCall(
const api_fuzzer::Batch& queue_batch) = 0;
// Cancel the active call.
virtual Result CancelActiveCall() = 0;
// Cancel all managed calls.
virtual Result CancelAllCallsIfShutdown() = 0;
// Validate that the peer can be reliably queried for the active call.
virtual Result ValidatePeerForActiveCall() = 0;
// Cancel and destroy the active call.
virtual Result DestroyActiveCall() = 0;
// Other actions.
// Change the resource quota limits.
virtual Result ResizeResourceQuota(uint32_t resize_resource_quota) = 0;
};
} // namespace testing
} // namespace grpc
#endif // GRPC_TEST_CORE_END2END_FUZZERS_FUZZING_COMMON_H

@ -59,8 +59,11 @@ def grpc_proto_fuzzer(name, corpus, proto, proto_deps = [], external_deps = [],
Args: Args:
name: The name of the test. name: The name of the test.
corpus: The corpus for the test. corpus: The corpus for the test.
proto: The proto for the test. proto: The proto for the test. If empty, it assumes the proto dependency
proto_deps: Deps for proto. is already included in the target deps. Otherwise it creates a
new grpc_proto_library with name "_{name}_proto" and makes the
fuzz target depend on it.
proto_deps: Deps for proto. Only used if proto is not empty.
external_deps: External deps. external_deps: External deps.
srcs: The source files for the test. srcs: The source files for the test.
deps: The dependencies of the test. deps: The dependencies of the test.
@ -69,24 +72,24 @@ def grpc_proto_fuzzer(name, corpus, proto, proto_deps = [], external_deps = [],
tags: The tags for the test. tags: The tags for the test.
**kwargs: Other arguments to supply to the test. **kwargs: Other arguments to supply to the test.
""" """
PROTO_LIBRARY = "_%s_proto" % name
CORPUS_DIR = native.package_name() + "/" + corpus CORPUS_DIR = native.package_name() + "/" + corpus
deps = deps + ["@com_google_libprotobuf_mutator//:libprotobuf_mutator"]
grpc_proto_library( if proto != None:
name = PROTO_LIBRARY, PROTO_LIBRARY = "_%s_proto" % name
srcs = [proto], grpc_proto_library(
deps = proto_deps, name = PROTO_LIBRARY,
has_services = False, srcs = [proto],
) deps = proto_deps,
has_services = False,
)
deps = deps + [PROTO_LIBRARY]
grpc_cc_test( grpc_cc_test(
name = name, name = name,
srcs = srcs, srcs = srcs,
tags = tags + ["grpc-fuzzer", "no-cache"], tags = tags + ["grpc-fuzzer", "no-cache"],
deps = deps + [ deps = deps + select({
"@com_google_libprotobuf_mutator//:libprotobuf_mutator",
PROTO_LIBRARY,
] + select({
"//:grpc_build_fuzzers": [], "//:grpc_build_fuzzers": [],
"//conditions:default": ["//test/core/util:fuzzer_corpus_test"], "//conditions:default": ["//test/core/util:fuzzer_corpus_test"],
}), }),

Loading…
Cancel
Save