From a2c89d0b24ccdf86aa4b96572116d6eb3eb001bd Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Fri, 14 Apr 2023 13:41:13 -0700 Subject: [PATCH] [fuzzing] Define a common fuzzing interface and move API fuzzer to it (#32853) Requires cherrypick for grpc_fuzzer.bzl file. --- test/core/end2end/fuzzers/BUILD | 32 +- test/core/end2end/fuzzers/api_fuzzer.cc | 875 +++++++++++--------- test/core/end2end/fuzzers/fuzzing_common.cc | 113 +++ test/core/end2end/fuzzers/fuzzing_common.h | 92 ++ test/core/util/grpc_fuzzer.bzl | 29 +- 5 files changed, 709 insertions(+), 432 deletions(-) create mode 100644 test/core/end2end/fuzzers/fuzzing_common.cc create mode 100644 test/core/end2end/fuzzers/fuzzing_common.h diff --git a/test/core/end2end/fuzzers/BUILD b/test/core/end2end/fuzzers/BUILD index 0906719a8c5..04d3235bc5f 100644 --- a/test/core/end2end/fuzzers/BUILD +++ b/test/core/end2end/fuzzers/BUILD @@ -12,24 +12,42 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("//bazel:grpc_build_system.bzl", "grpc_package") +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_package", "grpc_proto_library") load("//test/core/util:grpc_fuzzer.bzl", "grpc_fuzzer", "grpc_proto_fuzzer") grpc_package(name = "test/core/end2end/fuzzers") licenses(["notice"]) +grpc_proto_library( + name = "api_fuzzer_proto", + srcs = ["api_fuzzer.proto"], + has_services = False, + deps = [ + "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto", + "//test/core/util:fuzz_config_vars_proto", + ], +) + +grpc_cc_library( + name = "fuzzing_common", + srcs = ["fuzzing_common.cc"], + hdrs = ["fuzzing_common.h"], + external_deps = ["absl/strings"], + deps = [ + "api_fuzzer_proto", + "//:gpr", + "//:grpc", + ], +) + grpc_proto_fuzzer( name = "api_fuzzer", size = "enormous", srcs = ["api_fuzzer.cc"], corpus = "api_fuzzer_corpus", language = "C++", - proto = "api_fuzzer.proto", - proto_deps = [ - "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto", - "//test/core/util:fuzz_config_vars_proto", - ], + proto = None, tags = [ "no_mac", "no_windows", @@ -37,6 +55,8 @@ grpc_proto_fuzzer( uses_event_engine = False, uses_polling = False, deps = [ + "api_fuzzer_proto", + "fuzzing_common", "//:gpr", "//:grpc", "//src/core:channel_args", diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 05e3b958c55..cc1aa0a3294 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -73,6 +73,7 @@ #include "src/libfuzzer/libfuzzer_macro.h" #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/end2end/fuzzers/api_fuzzer.pb.h" +#include "test/core/end2end/fuzzers/fuzzing_common.h" #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" #include "test/core/util/fuzz_config_vars.h" @@ -95,15 +96,6 @@ bool leak_check = true; static void dont_log(gpr_log_func_args* /*args*/) {} -//////////////////////////////////////////////////////////////////////////////// -// global state - -static grpc_server* g_server; -static grpc_channel* g_channel; -static grpc_resource_quota* g_resource_quota; -static std::vector g_channel_actions; -static std::atomic g_channel_force_delete{false}; - //////////////////////////////////////////////////////////////////////////////// // dns resolution @@ -249,6 +241,73 @@ grpc_ares_request* my_dns_lookup_ares( static void my_cancel_ares_request(grpc_ares_request* request) { GPR_ASSERT(request == nullptr); } + +//////////////////////////////////////////////////////////////////////////////// +// globals + +class Call; + +namespace grpc { +namespace testing { + +class ApiFuzzer : public BasicFuzzer { + public: + ApiFuzzer(); + ~ApiFuzzer() override; + Call* ActiveCall(); + bool Continue(); + void TryShutdown(); + void Tick(); + grpc_server* Server() { return server_; } + const std::vector& ChannelActions() { + return channel_actions_; + } + + private: + Result PollCq() override; + Result CreateChannel( + const api_fuzzer::CreateChannel& create_channel) override; + + Result CloseChannel() override; + Result CreateServer(const api_fuzzer::CreateServer& create_server) override; + Result ShutdownServer() override; + Result CancelAllCallsIfShutdown() override; + Result DestroyServerIfReady() override; + Result CheckConnectivity(bool try_to_connect) override; + Result WatchConnectivity(uint32_t duration_us) override; + Result CreateCall(const api_fuzzer::CreateCall& create_call) override; + Result QueueBatchForActiveCall(const api_fuzzer::Batch& queue_batch) override; + Result ChangeActiveCall() override; + Result CancelActiveCall() override; + Result SendPingOnChannel() override; + Result ServerRequestCall() override; + Result DestroyActiveCall() override; + Result ValidatePeerForActiveCall() override; + Result ValidateChannelTarget() override; + Result ResizeResourceQuota(uint32_t resize_resource_quota) override; + + std::shared_ptr engine_; + grpc_completion_queue* cq_ = nullptr; + grpc_server* server_ = nullptr; + grpc_channel* channel_ = nullptr; + grpc_resource_quota* resource_quota_; + std::vector channel_actions_; + std::atomic channel_force_delete_{false}; + std::vector> calls_; + size_t active_call_ = 0; + bool server_shutdown_ = false; + int pending_server_shutdowns_ = 0; + int pending_channel_watches_ = 0; + int pending_pings_ = 0; +}; + +} // namespace testing +} // namespace grpc + +namespace { +grpc::testing::ApiFuzzer* g_api_fuzzer = nullptr; +} + //////////////////////////////////////////////////////////////////////////////// // client connection @@ -262,15 +321,16 @@ typedef struct { } future_connect; static void do_connect(future_connect fc) { - if (g_server != nullptr) { + if (g_api_fuzzer->Server() != nullptr) { grpc_endpoint* client; grpc_endpoint* server; grpc_passthru_endpoint_create(&client, &server, nullptr, true); *fc.ep = client; - start_scheduling_grpc_passthru_endpoint_channel_effects(client, - g_channel_actions); + start_scheduling_grpc_passthru_endpoint_channel_effects( + client, g_api_fuzzer->ChannelActions()); - grpc_core::Server* core_server = grpc_core::Server::FromC(g_server); + grpc_core::Server* core_server = + grpc_core::Server::FromC(g_api_fuzzer->Server()); grpc_transport* transport = grpc_create_chttp2_transport( core_server->channel_args(), server, false); GPR_ASSERT(GRPC_LOG_IF_ERROR( @@ -520,8 +580,8 @@ class Call : public std::enable_shared_from_this { case api_fuzzer::BatchOp::kReceiveMessage: // Allow only one active pending_recv_message_op to exist. Otherwise if // the previous enqueued recv_message_op is not complete by the time - // we get here, then under certain conditions, enqueing this op will - // over-write the internal call->receiving_buffer maintained by grpc + // we get here, then under certain conditions, enqueuing this op will + // overwrite the internal call->receiving_buffer maintained by grpc // leading to a memory leak. if (call_closed_ || pending_recv_message_op_) { *batch_is_ok = false; @@ -613,22 +673,6 @@ class Call : public std::enable_shared_from_this { std::vector unref_slices_; }; -static std::vector> g_calls; -static size_t g_active_call = 0; - -static Call* ActiveCall() { - while (!g_calls.empty()) { - if (g_active_call >= g_calls.size()) { - g_active_call = 0; - } - if (g_calls[g_active_call] != nullptr && !g_calls[g_active_call]->done()) { - return g_calls[g_active_call].get(); - } - g_calls.erase(g_calls.begin() + g_active_call); - } - return nullptr; -} - Call::~Call() { if (call_ != nullptr) { grpc_call_unref(call_); @@ -653,7 +697,8 @@ Call::~Call() { } template -grpc_channel_args* ReadArgs(const ChannelArgContainer& args) { +grpc_channel_args* ReadArgs(grpc_resource_quota* resource_quota, + const ChannelArgContainer& args) { grpc_channel_args* res = static_cast(gpr_malloc(sizeof(grpc_channel_args))); res->args = @@ -671,9 +716,9 @@ grpc_channel_args* ReadArgs(const ChannelArgContainer& args) { break; case api_fuzzer::ChannelArg::kResourceQuota: if (args[i].key() != GRPC_ARG_RESOURCE_QUOTA) continue; - grpc_resource_quota_ref(g_resource_quota); + grpc_resource_quota_ref(resource_quota); res->args[j].type = GRPC_ARG_POINTER; - res->args[j].value.pointer.p = g_resource_quota; + res->args[j].value.pointer.p = resource_quota; res->args[j].value.pointer.vtable = grpc_resource_quota_arg_vtable(); break; case api_fuzzer::ChannelArg::VALUE_NOT_SET: @@ -802,19 +847,11 @@ static grpc_channel_credentials* ReadChannelCreds( } } -DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { - if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) { - gpr_set_log_function(dont_log); - } - if (msg.has_config_vars()) { - grpc_core::ApplyFuzzConfigVars(msg.config_vars()); - } - grpc_event_engine::experimental::SetEventEngineFactory( - [actions = msg.event_engine_actions()]() { - return std::make_unique( - FuzzingEventEngine::Options(), actions); - }); - auto engine = +namespace grpc { +namespace testing { + +ApiFuzzer::ApiFuzzer() { + engine_ = std::dynamic_pointer_cast(GetDefaultEventEngine()); grpc_init(); grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable); @@ -824,392 +861,404 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { grpc_core::Executor::SetThreadingAll(false); } grpc_core::ResetDNSResolver( - std::make_unique(engine.get())); + std::make_unique(engine_.get())); grpc_dns_lookup_hostname_ares = my_dns_lookup_ares; grpc_cancel_ares_request = my_cancel_ares_request; - GPR_ASSERT(g_channel == nullptr); - GPR_ASSERT(g_server == nullptr); + GPR_ASSERT(channel_ == nullptr); + GPR_ASSERT(server_ == nullptr); - bool server_shutdown = false; - int pending_server_shutdowns = 0; - int pending_channel_watches = 0; - int pending_pings = 0; + resource_quota_ = grpc_resource_quota_create("api_fuzzer"); + cq_ = grpc_completion_queue_create_for_next(nullptr); +} - g_resource_quota = grpc_resource_quota_create("api_fuzzer"); +Call* ApiFuzzer::ActiveCall() { + while (!calls_.empty()) { + if (active_call_ >= calls_.size()) { + active_call_ = 0; + } + if (calls_[active_call_] != nullptr && !calls_[active_call_]->done()) { + return calls_[active_call_].get(); + } + calls_.erase(calls_.begin() + active_call_); + } + return nullptr; +} - grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); +bool ApiFuzzer::Continue() { + return channel_ != nullptr || server_ != nullptr || + pending_channel_watches_ > 0 || pending_pings_ > 0 || + ActiveCall() != nullptr; +} - int action_index = 0; - auto no_more_actions = [&]() { action_index = msg.actions_size(); }; - auto poll_cq = [&]() -> bool { - grpc_event ev = grpc_completion_queue_next( - cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); - switch (ev.type) { - case GRPC_OP_COMPLETE: { - static_cast(ev.tag)->Run(ev.success); - break; - } - case GRPC_QUEUE_TIMEOUT: - break; - case GRPC_QUEUE_SHUTDOWN: - return true; +void ApiFuzzer::TryShutdown() { + engine_->FuzzingDone(); + if (channel_ != nullptr) { + grpc_channel_destroy(channel_); + channel_ = nullptr; + } + if (server_ != nullptr) { + if (!server_shutdown_) { + grpc_server_shutdown_and_notify( + server_, cq_, AssertSuccessAndDecrement(&pending_server_shutdowns_)); + server_shutdown_ = true; + pending_server_shutdowns_++; + } else if (pending_server_shutdowns_ == 0) { + grpc_server_destroy(server_); + server_ = nullptr; } - return false; - }; + } + for (auto& call : calls_) { + if (call == nullptr) continue; + if (call->type() == CallType::PENDING_SERVER) continue; + call->Shutdown(); + } - while (action_index < msg.actions_size() || g_channel != nullptr || - g_server != nullptr || pending_channel_watches > 0 || - pending_pings > 0 || ActiveCall() != nullptr) { - engine->Tick(); + grpc_timer_manager_tick(); + GPR_ASSERT(PollCq() == Result::kPending); +} - if (action_index == msg.actions_size()) { - engine->FuzzingDone(); - if (g_channel != nullptr) { - grpc_channel_destroy(g_channel); - g_channel = nullptr; - } - if (g_server != nullptr) { - if (!server_shutdown) { - grpc_server_shutdown_and_notify( - g_server, cq, - AssertSuccessAndDecrement(&pending_server_shutdowns)); - server_shutdown = true; - pending_server_shutdowns++; - } else if (pending_server_shutdowns == 0) { - grpc_server_destroy(g_server); - g_server = nullptr; - } - } - for (auto& call : g_calls) { - if (call == nullptr) continue; - if (call->type() == CallType::PENDING_SERVER) continue; - call->Shutdown(); - } +ApiFuzzer::~ApiFuzzer() { + GPR_ASSERT(channel_ == nullptr); + GPR_ASSERT(server_ == nullptr); + GPR_ASSERT(ActiveCall() == nullptr); + GPR_ASSERT(calls_.empty()); - grpc_timer_manager_tick(); - GPR_ASSERT(!poll_cq()); - continue; + grpc_completion_queue_shutdown(cq_); + GPR_ASSERT(PollCq() == Result::kComplete); + grpc_completion_queue_destroy(cq_); + + grpc_resource_quota_unref(resource_quota_); + grpc_shutdown_blocking(); + engine_->UnsetGlobalHooks(); +} + +void ApiFuzzer::Tick() { + engine_->Tick(); + grpc_timer_manager_tick(); + if (channel_force_delete_.exchange(false) && channel_) { + grpc_channel_destroy(channel_); + channel_ = nullptr; + channel_actions_.clear(); + } +} + +ApiFuzzer::Result ApiFuzzer::PollCq() { + grpc_event ev = grpc_completion_queue_next( + cq_, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); + switch (ev.type) { + case GRPC_OP_COMPLETE: { + static_cast(ev.tag)->Run(ev.success); + break; } + case GRPC_QUEUE_TIMEOUT: + break; + case GRPC_QUEUE_SHUTDOWN: + return Result::kComplete; + } + return Result::kPending; +} +ApiFuzzer::Result ApiFuzzer::CreateChannel( + const api_fuzzer::CreateChannel& create_channel) { + if (!create_channel.channel_actions_size() || channel_ != nullptr) { + return Result::kFailed; + } else { + grpc_channel_args* args = + ReadArgs(resource_quota_, create_channel.channel_args()); + grpc_channel_credentials* creds = + create_channel.has_channel_creds() + ? ReadChannelCreds(create_channel.channel_creds()) + : grpc_insecure_credentials_create(); + channel_ = + grpc_channel_create(create_channel.target().c_str(), creds, args); + grpc_channel_credentials_release(creds); + channel_actions_.clear(); + for (int i = 0; i < create_channel.channel_actions_size(); i++) { + const api_fuzzer::ChannelAction& channel_action = + create_channel.channel_actions(i); + channel_actions_.push_back({ + std::min(channel_action.wait_ms(), kMaxWaitMs), + std::min(channel_action.add_n_bytes_writable(), + kMaxAddNWritableBytes), + std::min(channel_action.add_n_bytes_readable(), + kMaxAddNReadableBytes), + }); + } + GPR_ASSERT(channel_ != nullptr); + channel_force_delete_ = false; + { + grpc_core::ExecCtx exec_ctx; + grpc_channel_args_destroy(args); + } + } + return Result::kComplete; +} - grpc_timer_manager_tick(); +ApiFuzzer::Result ApiFuzzer::CloseChannel() { + if (channel_ != nullptr) { + grpc_channel_destroy(channel_); + channel_ = nullptr; + } else { + return Result::kFailed; + } + return Result::kComplete; +} - if (g_channel_force_delete.exchange(false) && g_channel) { - grpc_channel_destroy(g_channel); - g_channel = nullptr; - g_channel_actions.clear(); +ApiFuzzer::Result ApiFuzzer::CreateServer( + const api_fuzzer::CreateServer& create_server) { + if (server_ == nullptr) { + grpc_channel_args* args = + ReadArgs(resource_quota_, create_server.channel_args()); + server_ = grpc_server_create(args, nullptr); + GPR_ASSERT(server_ != nullptr); + { + grpc_core::ExecCtx exec_ctx; + grpc_channel_args_destroy(args); } + grpc_server_register_completion_queue(server_, cq_, nullptr); + grpc_server_start(server_); + server_shutdown_ = false; + GPR_ASSERT(pending_server_shutdowns_ == 0); + } else { + return Result::kFailed; + } + return Result::kComplete; +} - const api_fuzzer::Action& action = msg.actions(action_index); - action_index++; - switch (action.type_case()) { - case api_fuzzer::Action::TYPE_NOT_SET: - no_more_actions(); - break; - // tickle completion queue - case api_fuzzer::Action::kPollCq: { - GPR_ASSERT(!poll_cq()); - break; - } - // create an insecure channel - case api_fuzzer::Action::kCreateChannel: { - if (!action.create_channel().channel_actions_size() || - g_channel != nullptr) { - no_more_actions(); - } else { - grpc_channel_args* args = - ReadArgs(action.create_channel().channel_args()); - grpc_channel_credentials* creds = - action.create_channel().has_channel_creds() - ? ReadChannelCreds(action.create_channel().channel_creds()) - : grpc_insecure_credentials_create(); - g_channel = grpc_channel_create( - action.create_channel().target().c_str(), creds, args); - grpc_channel_credentials_release(creds); - g_channel_actions.clear(); - for (int i = 0; i < action.create_channel().channel_actions_size(); - i++) { - const api_fuzzer::ChannelAction& channel_action = - action.create_channel().channel_actions(i); - g_channel_actions.push_back({ - std::min(channel_action.wait_ms(), kMaxWaitMs), - std::min(channel_action.add_n_bytes_writable(), - kMaxAddNWritableBytes), - std::min(channel_action.add_n_bytes_readable(), - kMaxAddNReadableBytes), - }); - } - GPR_ASSERT(g_channel != nullptr); - g_channel_force_delete = false; - { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(args); - } - } - break; - } - // destroy a channel - case api_fuzzer::Action::kCloseChannel: { - if (g_channel != nullptr) { - grpc_channel_destroy(g_channel); - g_channel = nullptr; - } else { - no_more_actions(); - } - break; - } - // bring up a server - case api_fuzzer::Action::kCreateServer: { - if (g_server == nullptr) { - grpc_channel_args* args = - ReadArgs(action.create_server().channel_args()); - g_server = grpc_server_create(args, nullptr); - GPR_ASSERT(g_server != nullptr); - { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(args); - } - grpc_server_register_completion_queue(g_server, cq, nullptr); - grpc_server_start(g_server); - server_shutdown = false; - GPR_ASSERT(pending_server_shutdowns == 0); - } else { - no_more_actions(); - } - break; - } - // begin server shutdown - case api_fuzzer::Action::kShutdownServer: { - if (g_server != nullptr) { - grpc_server_shutdown_and_notify( - g_server, cq, - AssertSuccessAndDecrement(&pending_server_shutdowns)); - pending_server_shutdowns++; - server_shutdown = true; - } else { - no_more_actions(); - } - break; - } - // cancel all calls if shutdown - case api_fuzzer::Action::kCancelAllCallsIfShutdown: { - if (g_server != nullptr && server_shutdown) { - grpc_server_cancel_all_calls(g_server); - } else { - no_more_actions(); - } - break; - } - // destroy server - case api_fuzzer::Action::kDestroyServerIfReady: { - if (g_server != nullptr && server_shutdown && - pending_server_shutdowns == 0) { - grpc_server_destroy(g_server); - g_server = nullptr; - } else { - no_more_actions(); - } - break; - } - // check connectivity - case api_fuzzer::Action::kCheckConnectivity: { - if (g_channel != nullptr) { - grpc_channel_check_connectivity_state(g_channel, - action.check_connectivity()); - } else { - no_more_actions(); - } - break; - } - // watch connectivity - case api_fuzzer::Action::kWatchConnectivity: { - if (g_channel != nullptr) { - grpc_connectivity_state st = - grpc_channel_check_connectivity_state(g_channel, 0); - if (st != GRPC_CHANNEL_SHUTDOWN) { - gpr_timespec deadline = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(action.watch_connectivity(), - GPR_TIMESPAN)); - grpc_channel_watch_connectivity_state( - g_channel, st, deadline, cq, - ValidateConnectivityWatch(deadline, &pending_channel_watches)); - pending_channel_watches++; - } - } else { - no_more_actions(); - } - break; - } - // create a call - case api_fuzzer::Action::kCreateCall: { - bool ok = true; - if (g_channel == nullptr) ok = false; - // If the active call is a server call, then use it as the parent call - // to exercise the propagation logic. - Call* parent_call = ActiveCall(); - if (parent_call != nullptr && parent_call->type() != CallType::SERVER) { - parent_call = nullptr; - } - g_calls.emplace_back(new Call(CallType::CLIENT)); - grpc_slice method = - g_calls.back()->ReadSlice(action.create_call().method()); - if (GRPC_SLICE_LENGTH(method) == 0) { - ok = false; - } - grpc_slice host = - g_calls.back()->ReadSlice(action.create_call().host()); - gpr_timespec deadline = gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(action.create_call().timeout(), GPR_TIMESPAN)); - - if (ok) { - g_calls.back()->SetCall(grpc_channel_create_call( - g_channel, parent_call == nullptr ? nullptr : parent_call->call(), - action.create_call().propagation_mask(), cq, method, &host, - deadline, nullptr)); - } else { - g_calls.pop_back(); - no_more_actions(); - } - break; - } - // switch the 'current' call - case api_fuzzer::Action::kChangeActiveCall: { - g_active_call++; - ActiveCall(); - break; - } - // queue some ops on a call - case api_fuzzer::Action::kQueueBatch: { - auto* active_call = ActiveCall(); - if (active_call == nullptr || - active_call->type() == CallType::PENDING_SERVER || - active_call->call() == nullptr) { - no_more_actions(); - break; - } - const auto& batch = action.queue_batch().operations(); - if (batch.size() > 6) { - no_more_actions(); - break; - } - std::vector ops; - bool ok = true; - uint8_t has_ops = 0; - std::vector> unwinders; - for (const auto& batch_op : batch) { - auto op = active_call->ReadOp(batch_op, &ok, &has_ops, &unwinders); - if (!op.has_value()) continue; - ops.push_back(*op); - } +ApiFuzzer::Result ApiFuzzer::ShutdownServer() { + if (server_ != nullptr) { + grpc_server_shutdown_and_notify( + server_, cq_, AssertSuccessAndDecrement(&pending_server_shutdowns_)); + pending_server_shutdowns_++; + server_shutdown_ = true; + } else { + return Result::kFailed; + } + return Result::kComplete; +} - if (g_channel == nullptr) ok = false; - if (ok) { - auto* v = active_call->FinishedBatchValidator(has_ops); - grpc_call_error error = grpc_call_start_batch( - active_call->call(), ops.data(), ops.size(), v, nullptr); - if (error != GRPC_CALL_OK) { - v->Run(false); - } - } else { - no_more_actions(); - for (auto& unwind : unwinders) { - unwind(); - } - } - break; - } - // cancel current call - case api_fuzzer::Action::kCancelCall: { - auto* active_call = ActiveCall(); - if (active_call != nullptr && active_call->call() != nullptr) { - grpc_call_cancel(active_call->call(), nullptr); - } else { - no_more_actions(); - } - break; - } - // get a calls peer - case api_fuzzer::Action::kGetPeer: { - auto* active_call = ActiveCall(); - if (active_call != nullptr && active_call->call() != nullptr) { - free_non_null(grpc_call_get_peer(active_call->call())); - } else { - no_more_actions(); - } - break; - } - // get a channels target - case api_fuzzer::Action::kGetTarget: { - if (g_channel != nullptr) { - free_non_null(grpc_channel_get_target(g_channel)); - } else { - no_more_actions(); - } - break; - } - // send a ping on a channel - case api_fuzzer::Action::kPing: { - if (g_channel != nullptr) { - pending_pings++; - grpc_channel_ping(g_channel, cq, Decrement(&pending_pings), nullptr); - } else { - no_more_actions(); - } - break; - } - // enable a tracer - case api_fuzzer::Action::kEnableTracer: { - grpc_tracer_set_enabled(action.enable_tracer().c_str(), 1); - break; - } - // disable a tracer - case api_fuzzer::Action::kDisableTracer: { - grpc_tracer_set_enabled(action.disable_tracer().c_str(), 0); - break; - } - // request a server call - case api_fuzzer::Action::kRequestCall: { - if (g_server == nullptr) { - no_more_actions(); - break; - } - g_calls.emplace_back(new Call(CallType::PENDING_SERVER)); - g_calls.back()->RequestCall(g_server, cq); - break; - } - // destroy a call - case api_fuzzer::Action::kDestroyCall: { - auto* active_call = ActiveCall(); - if (active_call != nullptr && - active_call->type() != CallType::PENDING_SERVER && - active_call->call() != nullptr) { - g_calls[g_active_call]->Shutdown(); - } else { - no_more_actions(); - } - break; - } - // resize the buffer pool - case api_fuzzer::Action::kResizeResourceQuota: { - grpc_resource_quota_resize(g_resource_quota, - action.resize_resource_quota()); - break; - } +ApiFuzzer::Result ApiFuzzer::CancelAllCallsIfShutdown() { + if (server_ != nullptr && server_shutdown_) { + grpc_server_cancel_all_calls(server_); + } else { + return Result::kFailed; + } + return Result::kComplete; +} + +ApiFuzzer::Result ApiFuzzer::DestroyServerIfReady() { + if (server_ != nullptr && server_shutdown_ && + pending_server_shutdowns_ == 0) { + grpc_server_destroy(server_); + server_ = nullptr; + } else { + return Result::kFailed; + } + return Result::kComplete; +} + +ApiFuzzer::Result ApiFuzzer::CheckConnectivity(bool try_to_connect) { + if (channel_ != nullptr) { + grpc_channel_check_connectivity_state(channel_, try_to_connect); + } else { + return Result::kFailed; + } + return Result::kComplete; +} + +ApiFuzzer::Result ApiFuzzer::WatchConnectivity(uint32_t duration_us) { + if (channel_ != nullptr) { + grpc_connectivity_state st = + grpc_channel_check_connectivity_state(channel_, 0); + if (st != GRPC_CHANNEL_SHUTDOWN) { + gpr_timespec deadline = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(duration_us, GPR_TIMESPAN)); + grpc_channel_watch_connectivity_state( + channel_, st, deadline, cq_, + ValidateConnectivityWatch(deadline, &pending_channel_watches_)); + pending_channel_watches_++; } + } else { + return Result::kFailed; } + return Result::kComplete; +} - GPR_ASSERT(g_channel == nullptr); - GPR_ASSERT(g_server == nullptr); - GPR_ASSERT(ActiveCall() == nullptr); - GPR_ASSERT(g_calls.empty()); +ApiFuzzer::Result ApiFuzzer::CreateCall( + const api_fuzzer::CreateCall& create_call) { + bool ok = true; + if (channel_ == nullptr) ok = false; + // If the active call is a server call, then use it as the parent call + // to exercise the propagation logic. + Call* parent_call = ActiveCall(); + if (parent_call != nullptr && parent_call->type() != CallType::SERVER) { + parent_call = nullptr; + } + calls_.emplace_back(new Call(CallType::CLIENT)); + grpc_slice method = calls_.back()->ReadSlice(create_call.method()); + if (GRPC_SLICE_LENGTH(method) == 0) { + ok = false; + } + grpc_slice host = calls_.back()->ReadSlice(create_call.host()); + gpr_timespec deadline = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(create_call.timeout(), GPR_TIMESPAN)); + + if (ok) { + calls_.back()->SetCall(grpc_channel_create_call( + channel_, parent_call == nullptr ? nullptr : parent_call->call(), + create_call.propagation_mask(), cq_, method, &host, deadline, nullptr)); + } else { + calls_.pop_back(); + return Result::kFailed; + } + return Result::kComplete; +} - grpc_completion_queue_shutdown(cq); - GPR_ASSERT(poll_cq()); - grpc_completion_queue_destroy(cq); +ApiFuzzer::Result ApiFuzzer::ChangeActiveCall() { + active_call_++; + ActiveCall(); + return Result::kComplete; +} - grpc_resource_quota_unref(g_resource_quota); - grpc_shutdown_blocking(); - engine->UnsetGlobalHooks(); +ApiFuzzer::Result ApiFuzzer::QueueBatchForActiveCall( + const api_fuzzer::Batch& queue_batch) { + auto* active_call = ActiveCall(); + if (active_call == nullptr || + active_call->type() == CallType::PENDING_SERVER || + active_call->call() == nullptr) { + return Result::kFailed; + } + const auto& batch = queue_batch.operations(); + if (batch.size() > 6) { + return Result::kFailed; + } + std::vector ops; + bool ok = true; + uint8_t has_ops = 0; + std::vector> unwinders; + for (const auto& batch_op : batch) { + auto op = active_call->ReadOp(batch_op, &ok, &has_ops, &unwinders); + if (!op.has_value()) continue; + ops.push_back(*op); + } + + if (channel_ == nullptr) ok = false; + if (ok) { + auto* v = active_call->FinishedBatchValidator(has_ops); + grpc_call_error error = grpc_call_start_batch( + active_call->call(), ops.data(), ops.size(), v, nullptr); + if (error != GRPC_CALL_OK) { + v->Run(false); + } + } else { + for (auto& unwind : unwinders) { + unwind(); + } + return Result::kFailed; + } + return Result::kComplete; +} + +ApiFuzzer::Result ApiFuzzer::CancelActiveCall() { + auto* active_call = ActiveCall(); + if (active_call != nullptr && active_call->call() != nullptr) { + grpc_call_cancel(active_call->call(), nullptr); + } else { + return Result::kFailed; + } + return Result::kComplete; +} + +ApiFuzzer::Result ApiFuzzer::SendPingOnChannel() { + if (channel_ != nullptr) { + pending_pings_++; + grpc_channel_ping(channel_, cq_, Decrement(&pending_pings_), nullptr); + } else { + return Result::kFailed; + } + return Result::kComplete; +} + +ApiFuzzer::Result ApiFuzzer::ServerRequestCall() { + if (server_ == nullptr) { + return Result::kFailed; + } + calls_.emplace_back(new Call(CallType::PENDING_SERVER)); + calls_.back()->RequestCall(server_, cq_); + return Result::kComplete; +} + +ApiFuzzer::Result ApiFuzzer::DestroyActiveCall() { + auto* active_call = ActiveCall(); + if (active_call != nullptr && + active_call->type() != CallType::PENDING_SERVER && + active_call->call() != nullptr) { + calls_[active_call_]->Shutdown(); + } else { + return Result::kFailed; + } + return Result::kComplete; +} + +ApiFuzzer::Result ApiFuzzer::ValidatePeerForActiveCall() { + auto* active_call = ActiveCall(); + if (active_call != nullptr && active_call->call() != nullptr) { + free_non_null(grpc_call_get_peer(active_call->call())); + } else { + return Result::kFailed; + } + return Result::kComplete; +} + +ApiFuzzer::Result ApiFuzzer::ValidateChannelTarget() { + if (channel_ != nullptr) { + free_non_null(grpc_channel_get_target(channel_)); + } else { + return Result::kFailed; + } + return Result::kComplete; +} + +ApiFuzzer::Result ApiFuzzer::ResizeResourceQuota( + uint32_t resize_resource_quota) { + grpc_resource_quota_resize(resource_quota_, resize_resource_quota); + return Result::kComplete; +} + +} // namespace testing +} // namespace grpc + +DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { + if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) { + gpr_set_log_function(dont_log); + } + if (msg.has_config_vars()) { + grpc_core::ApplyFuzzConfigVars(msg.config_vars()); + } + grpc_event_engine::experimental::SetEventEngineFactory( + [actions = msg.event_engine_actions()]() { + return std::make_unique( + FuzzingEventEngine::Options(), actions); + }); + + g_api_fuzzer = new grpc::testing::ApiFuzzer(); + int action_index = 0; + auto no_more_actions = [&]() { action_index = msg.actions_size(); }; + + while (action_index < msg.actions_size() || g_api_fuzzer->Continue()) { + g_api_fuzzer->Tick(); + + if (action_index == msg.actions_size()) { + g_api_fuzzer->TryShutdown(); + continue; + } + + auto result = g_api_fuzzer->ExecuteAction(msg.actions(action_index++)); + if (result == grpc::testing::ApiFuzzer::Result::kFailed) { + no_more_actions(); + } + } + delete g_api_fuzzer; } diff --git a/test/core/end2end/fuzzers/fuzzing_common.cc b/test/core/end2end/fuzzers/fuzzing_common.cc new file mode 100644 index 00000000000..8cb93fdb4d4 --- /dev/null +++ b/test/core/end2end/fuzzers/fuzzing_common.cc @@ -0,0 +1,113 @@ +// +// +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include "test/core/end2end/fuzzers/fuzzing_common.h" + +#include + +#include "absl/strings/str_cat.h" + +#include + +#include "src/core/lib/gprpp/crash.h" +#include "test/core/end2end/fuzzers/api_fuzzer.pb.h" + +namespace grpc { +namespace testing { + +BasicFuzzer::Result BasicFuzzer::ExecuteAction( + const api_fuzzer::Action& action) { + switch (action.type_case()) { + case api_fuzzer::Action::TYPE_NOT_SET: + return BasicFuzzer::Result::kFailed; + // tickle completion queue + case api_fuzzer::Action::kPollCq: + return PollCq(); + // create an insecure channel + case api_fuzzer::Action::kCreateChannel: + return CreateChannel(action.create_channel()); + // destroy a channel + case api_fuzzer::Action::kCloseChannel: + return CloseChannel(); + // bring up a server + case api_fuzzer::Action::kCreateServer: + return CreateServer(action.create_server()); + // begin server shutdown + case api_fuzzer::Action::kShutdownServer: + return ShutdownServer(); + // cancel all calls if server is shutdown + case api_fuzzer::Action::kCancelAllCallsIfShutdown: + return CancelAllCallsIfShutdown(); + // destroy server + case api_fuzzer::Action::kDestroyServerIfReady: + return DestroyServerIfReady(); + // check connectivity + case api_fuzzer::Action::kCheckConnectivity: + return CheckConnectivity(action.check_connectivity()); + // watch connectivity + case api_fuzzer::Action::kWatchConnectivity: + return WatchConnectivity(action.watch_connectivity()); + // create a call + case api_fuzzer::Action::kCreateCall: + return CreateCall(action.create_call()); + // switch the 'current' call + case api_fuzzer::Action::kChangeActiveCall: + return ChangeActiveCall(); + // queue some ops on a call + case api_fuzzer::Action::kQueueBatch: + return QueueBatchForActiveCall(action.queue_batch()); + // cancel current call + case api_fuzzer::Action::kCancelCall: + return CancelActiveCall(); + // get a calls peer + case api_fuzzer::Action::kGetPeer: + return ValidatePeerForActiveCall(); + // get a channels target + case api_fuzzer::Action::kGetTarget: + return ValidateChannelTarget(); + // send a ping on a channel + case api_fuzzer::Action::kPing: + return SendPingOnChannel(); + // enable a tracer + case api_fuzzer::Action::kEnableTracer: { + grpc_tracer_set_enabled(action.enable_tracer().c_str(), 1); + break; + } + // disable a tracer + case api_fuzzer::Action::kDisableTracer: { + grpc_tracer_set_enabled(action.disable_tracer().c_str(), 0); + break; + } + // request a server call + case api_fuzzer::Action::kRequestCall: + return ServerRequestCall(); + // destroy a call + case api_fuzzer::Action::kDestroyCall: + return DestroyActiveCall(); + // resize the buffer pool + case api_fuzzer::Action::kResizeResourceQuota: + return ResizeResourceQuota(action.resize_resource_quota()); + default: + grpc_core::Crash(absl::StrCat("Unsupported Fuzzing Action of type: ", + action.type_case())); + } + return BasicFuzzer::Result::kComplete; +} + +} // namespace testing +} // namespace grpc diff --git a/test/core/end2end/fuzzers/fuzzing_common.h b/test/core/end2end/fuzzers/fuzzing_common.h new file mode 100644 index 00000000000..8aff64f9104 --- /dev/null +++ b/test/core/end2end/fuzzers/fuzzing_common.h @@ -0,0 +1,92 @@ +// +// +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPC_TEST_CORE_END2END_FUZZERS_FUZZING_COMMON_H +#define GRPC_TEST_CORE_END2END_FUZZERS_FUZZING_COMMON_H + +#include + +#include "test/core/end2end/fuzzers/api_fuzzer.pb.h" + +namespace grpc { +namespace testing { + +class BasicFuzzer { + public: + enum Result { kPending = 0, kComplete = 1, kFailed = 2, kNotSupported = 3 }; + virtual Result ExecuteAction(const api_fuzzer::Action& action); + virtual ~BasicFuzzer() = default; + + private: + // Poll any created completion queue to drive the RPC forward. + virtual Result PollCq() = 0; + + // Channel specific actions. + // Create an active channel with the specified parameters. + virtual Result CreateChannel( + const api_fuzzer::CreateChannel& create_channel) = 0; + // Check whether the channel is connected and optionally try to connect if it + // is not connected. + virtual Result CheckConnectivity(bool try_to_connect) = 0; + // Watch whether the channel connects within the specified duration. + virtual Result WatchConnectivity(uint32_t duration_us) = 0; + // Verify that the channel target can be reliably queried. + virtual Result ValidateChannelTarget() = 0; + // Send a http ping on the channel. + virtual Result SendPingOnChannel() = 0; + // Close the active channel. + virtual Result CloseChannel() = 0; + + // Server specific actions + // Create an active server. + virtual Result CreateServer( + const api_fuzzer::CreateServer& create_server) = 0; + // Request to be notified of a new RPC on the active server. + virtual Result ServerRequestCall() = 0; + // Shutdown the active server. + virtual Result ShutdownServer() = 0; + // Destroy the active server. + virtual Result DestroyServerIfReady() = 0; + + // Call specific actions. + // Create a call on the active channel with the specified parameters. Also add + // it the list of managed calls. + virtual Result CreateCall(const api_fuzzer::CreateCall& create_call) = 0; + // Choose a different active call from the list of managed calls. + virtual Result ChangeActiveCall() = 0; + // Queue a batch of operations to be executed on the active call. + virtual Result QueueBatchForActiveCall( + const api_fuzzer::Batch& queue_batch) = 0; + // Cancel the active call. + virtual Result CancelActiveCall() = 0; + // Cancel all managed calls. + virtual Result CancelAllCallsIfShutdown() = 0; + // Validate that the peer can be reliably queried for the active call. + virtual Result ValidatePeerForActiveCall() = 0; + // Cancel and destroy the active call. + virtual Result DestroyActiveCall() = 0; + + // Other actions. + // Change the resource quota limits. + virtual Result ResizeResourceQuota(uint32_t resize_resource_quota) = 0; +}; + +} // namespace testing +} // namespace grpc + +#endif // GRPC_TEST_CORE_END2END_FUZZERS_FUZZING_COMMON_H diff --git a/test/core/util/grpc_fuzzer.bzl b/test/core/util/grpc_fuzzer.bzl index 07ee2e4d63f..05f9f9a001e 100644 --- a/test/core/util/grpc_fuzzer.bzl +++ b/test/core/util/grpc_fuzzer.bzl @@ -59,8 +59,11 @@ def grpc_proto_fuzzer(name, corpus, proto, proto_deps = [], external_deps = [], Args: name: The name of the test. corpus: The corpus for the test. - proto: The proto for the test. - proto_deps: Deps for proto. + proto: The proto for the test. If empty, it assumes the proto dependency + is already included in the target deps. Otherwise it creates a + new grpc_proto_library with name "_{name}_proto" and makes the + fuzz target depend on it. + proto_deps: Deps for proto. Only used if proto is not empty. external_deps: External deps. srcs: The source files for the test. deps: The dependencies of the test. @@ -69,24 +72,24 @@ def grpc_proto_fuzzer(name, corpus, proto, proto_deps = [], external_deps = [], tags: The tags for the test. **kwargs: Other arguments to supply to the test. """ - PROTO_LIBRARY = "_%s_proto" % name CORPUS_DIR = native.package_name() + "/" + corpus + deps = deps + ["@com_google_libprotobuf_mutator//:libprotobuf_mutator"] - grpc_proto_library( - name = PROTO_LIBRARY, - srcs = [proto], - deps = proto_deps, - has_services = False, - ) + if proto != None: + PROTO_LIBRARY = "_%s_proto" % name + grpc_proto_library( + name = PROTO_LIBRARY, + srcs = [proto], + deps = proto_deps, + has_services = False, + ) + deps = deps + [PROTO_LIBRARY] grpc_cc_test( name = name, srcs = srcs, tags = tags + ["grpc-fuzzer", "no-cache"], - deps = deps + [ - "@com_google_libprotobuf_mutator//:libprotobuf_mutator", - PROTO_LIBRARY, - ] + select({ + deps = deps + select({ "//:grpc_build_fuzzers": [], "//conditions:default": ["//test/core/util:fuzzer_corpus_test"], }),