[fuzzers] Expand client_fuzzer, server_fuzzer (#34596)

Allow multiple writes, and allow those writes to know about http2
(leverages the new framing layer for chttp2 recently written)
pull/34607/head
Craig Tiller 1 year ago committed by GitHub
parent 0147cad52e
commit 0814055337
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      src/core/lib/slice/slice.h
  2. 16
      test/core/end2end/fuzzers/BUILD
  3. 12
      test/core/end2end/fuzzers/client_fuzzer.cc
  4. 88
      test/core/end2end/fuzzers/fuzzer_input.proto
  5. 185
      test/core/end2end/fuzzers/network_input.cc
  6. 31
      test/core/end2end/fuzzers/network_input.h
  7. 98
      test/core/end2end/fuzzers/server_fuzzer.cc
  8. 10
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  9. 4
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
  10. 1
      test/core/security/ssl_server_fuzzer.cc
  11. 1
      test/core/transport/chttp2/ping_configuration_test.cc
  12. 25
      test/core/util/mock_endpoint.cc
  13. 1
      test/core/util/mock_endpoint.h

@ -294,7 +294,7 @@ class GPR_MSVC_EMPTY_BASE_CLASS_WORKAROUND MutableSlice
// Split this slice in two, returning the first n bytes and leaving the
// remainder.
MutableSlice TakeFirst(size_t n) {
return MutableSlice(grpc_slice_split_head(c_slice_ptr(), n));
return MutableSlice(NoCheck{}, grpc_slice_split_head(c_slice_ptr(), n));
}
// Iterator access to the underlying bytes
@ -306,6 +306,11 @@ class GPR_MSVC_EMPTY_BASE_CLASS_WORKAROUND MutableSlice
uint8_t& operator[](size_t i) { return mutable_data()[i]; }
using slice_detail::BaseSlice::c_slice_ptr;
private:
struct NoCheck {};
MutableSlice(NoCheck, const grpc_slice& slice)
: slice_detail::BaseSlice(slice) {}
};
class GPR_MSVC_EMPTY_BASE_CLASS_WORKAROUND Slice

@ -83,6 +83,18 @@ grpc_proto_library(
],
)
grpc_cc_library(
name = "network_input",
srcs = ["network_input.cc"],
hdrs = ["network_input.h"],
deps = [
"fuzzer_input_proto",
"//:chttp2_frame",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/util:grpc_test_util_base",
],
)
grpc_proto_fuzzer(
name = "client_fuzzer",
srcs = ["client_fuzzer.cc"],
@ -94,10 +106,12 @@ grpc_proto_fuzzer(
uses_polling = False,
deps = [
"fuzzer_input_proto",
"network_input",
"//:gpr",
"//:grpc",
"//src/core:channel_args",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/util:fuzz_config_vars",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
],
@ -114,9 +128,11 @@ grpc_proto_fuzzer(
uses_polling = False,
deps = [
"fuzzer_input_proto",
"network_input",
"//:gpr",
"//:grpc",
"//src/core:channel_args",
"//test/core/util:fuzz_config_vars",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
],

@ -37,6 +37,7 @@
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/experiments/config.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -50,8 +51,10 @@
#include "src/core/lib/transport/transport_fwd.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
#include "test/core/end2end/fuzzers/network_input.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/util/fuzz_config_vars.h"
#include "test/core/util/mock_endpoint.h"
using ::grpc_event_engine::experimental::FuzzingEventEngine;
@ -70,6 +73,8 @@ DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) {
if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) {
gpr_set_log_function(dont_log);
}
grpc_core::ApplyFuzzConfigVars(msg.config_vars());
grpc_core::TestOnlyReloadExperimentsFromConfigVariables();
grpc_event_engine::experimental::SetEventEngineFactory([]() {
return std::make_unique<FuzzingEventEngine>(
FuzzingEventEngine::Options(), fuzzing_event_engine::Actions{});
@ -150,12 +155,7 @@ DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) {
int requested_calls = 1;
GPR_ASSERT(GRPC_CALL_OK == error);
if (msg.network_input().has_single_read_bytes()) {
grpc_mock_endpoint_put_read(
mock_endpoint, grpc_slice_from_copied_buffer(
msg.network_input().single_read_bytes().data(),
msg.network_input().single_read_bytes().size()));
}
grpc_core::ScheduleReads(msg.network_input(), mock_endpoint, engine.get());
grpc_event ev;
while (true) {

@ -20,9 +20,97 @@ import "test/core/end2end/fuzzers/api_fuzzer.proto";
import "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto";
import "test/core/util/fuzz_config_vars.proto";
message H2DataFrame {
uint32 stream_id = 1;
bool end_of_stream = 2;
bytes payload = 3;
}
message SimpleHeader {
string key = 1;
string value = 2;
}
message SimpleHeaders {
repeated SimpleHeader headers = 1;
}
message H2HeaderFrame {
uint32 stream_id = 1;
bool end_headers = 2;
bool end_stream = 3;
oneof payload {
bytes raw_bytes = 5;
SimpleHeaders simple_header = 6;
}
}
message H2ContinuationFrame {
uint32 stream_id = 1;
bool end_headers = 2;
oneof payload {
bytes raw_bytes = 5;
SimpleHeaders simple_header = 6;
}
}
message H2RstStreamFrame {
uint32 stream_id = 1;
uint32 error_code = 2;
}
message H2Setting {
uint32 id = 1;
uint32 value = 2;
}
message H2SettingsFrame {
bool ack = 1;
repeated H2Setting settings = 2;
}
message H2PingFrame {
bool ack = 1;
uint64 opaque = 2;
}
message H2GoawayFrame {
uint32 last_stream_id = 1;
uint32 error_code = 2;
bytes debug_data = 3;
}
message H2WindowUpdateFrame {
uint32 stream_id = 1;
uint32 increment = 2;
}
message H2ClientPrefix {}
message InputSegment {
int32 delay_ms = 1;
oneof payload {
bytes raw_bytes = 2;
H2DataFrame data = 3;
H2HeaderFrame header = 4;
H2ContinuationFrame continuation = 5;
H2RstStreamFrame rst_stream = 6;
H2SettingsFrame settings = 7;
H2PingFrame ping = 8;
H2GoawayFrame goaway = 9;
H2WindowUpdateFrame window_update = 10;
H2ClientPrefix client_prefix = 11;
}
}
message InputSegments {
repeated InputSegment segments = 1;
}
message NetworkInput {
oneof value {
bytes single_read_bytes = 1;
InputSegments input_segments = 2;
}
}

@ -0,0 +1,185 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/end2end/fuzzers/network_input.h"
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <chrono>
#include <ratio>
#include <string>
#include <utility>
#include <vector>
#include "absl/types/span.h"
#include <grpc/slice.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
#include "test/core/util/mock_endpoint.h"
namespace grpc_core {
namespace {
grpc_slice SliceFromH2Frame(Http2Frame frame) {
SliceBuffer buffer;
Serialize(absl::Span<Http2Frame>(&frame, 1), buffer);
return buffer.JoinIntoSlice().TakeCSlice();
}
SliceBuffer SliceBufferFromBytes(const std::string& bytes) {
SliceBuffer buffer;
buffer.Append(Slice::FromCopiedString(bytes));
return buffer;
}
void AppendLength(size_t length, std::vector<uint8_t>* bytes) {
VarintWriter<1> writer(length);
uint8_t buffer[8];
writer.Write(0, buffer);
bytes->insert(bytes->end(), buffer, buffer + writer.length());
}
SliceBuffer SliceBufferFromSimpleHeaders(
const fuzzer_input::SimpleHeaders& headers) {
std::vector<uint8_t> temp;
for (const auto& header : headers.headers()) {
temp.push_back(0);
AppendLength(header.key().length(), &temp);
temp.insert(temp.end(), header.key().begin(), header.key().end());
AppendLength(header.value().length(), &temp);
temp.insert(temp.end(), header.value().begin(), header.value().end());
}
SliceBuffer buffer;
buffer.Append(Slice::FromCopiedBuffer(temp.data(), temp.size()));
return buffer;
}
template <typename T>
SliceBuffer SliceBufferFromHeaderPayload(const T& payload) {
switch (payload.payload_case()) {
case T::kRawBytes:
return SliceBufferFromBytes(payload.raw_bytes());
case T::kSimpleHeader:
return SliceBufferFromSimpleHeaders(payload.simple_header());
case T::PAYLOAD_NOT_SET:
break;
}
return SliceBuffer();
}
grpc_slice SliceFromSegment(const fuzzer_input::InputSegment& segment) {
switch (segment.payload_case()) {
case fuzzer_input::InputSegment::kRawBytes:
return grpc_slice_from_copied_buffer(segment.raw_bytes().data(),
segment.raw_bytes().size());
case fuzzer_input::InputSegment::kData:
return SliceFromH2Frame(Http2DataFrame{
segment.data().stream_id(), segment.data().end_of_stream(),
SliceBufferFromBytes(segment.data().payload())});
case fuzzer_input::InputSegment::kHeader:
return SliceFromH2Frame(Http2HeaderFrame{
segment.header().stream_id(),
segment.header().end_headers(),
segment.header().end_stream(),
SliceBufferFromHeaderPayload(segment.header()),
});
case fuzzer_input::InputSegment::kContinuation:
return SliceFromH2Frame(Http2ContinuationFrame{
segment.continuation().stream_id(),
segment.continuation().end_headers(),
SliceBufferFromHeaderPayload(segment.header()),
});
case fuzzer_input::InputSegment::kRstStream:
return SliceFromH2Frame(Http2RstStreamFrame{
segment.rst_stream().stream_id(),
segment.rst_stream().error_code(),
});
case fuzzer_input::InputSegment::kSettings: {
std::vector<Http2SettingsFrame::Setting> settings;
for (const auto& setting : segment.settings().settings()) {
settings.push_back(Http2SettingsFrame::Setting{
static_cast<uint16_t>(setting.id()),
setting.value(),
});
}
return SliceFromH2Frame(Http2SettingsFrame{
segment.settings().ack(),
std::move(settings),
});
}
case fuzzer_input::InputSegment::kPing:
return SliceFromH2Frame(Http2PingFrame{
segment.ping().ack(),
segment.ping().opaque(),
});
case fuzzer_input::InputSegment::kGoaway:
return SliceFromH2Frame(Http2GoawayFrame{
segment.goaway().last_stream_id(), segment.goaway().error_code(),
Slice::FromCopiedString(segment.goaway().debug_data())});
case fuzzer_input::InputSegment::kWindowUpdate:
return SliceFromH2Frame(Http2WindowUpdateFrame{
segment.window_update().stream_id(),
segment.window_update().increment(),
});
case fuzzer_input::InputSegment::kClientPrefix:
return grpc_slice_from_static_string("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
case fuzzer_input::InputSegment::PAYLOAD_NOT_SET:
break;
}
return grpc_empty_slice();
}
} // namespace
void ScheduleReads(
const fuzzer_input::NetworkInput& network_input,
grpc_endpoint* mock_endpoint,
grpc_event_engine::experimental::FuzzingEventEngine* event_engine) {
switch (network_input.value_case()) {
case fuzzer_input::NetworkInput::kSingleReadBytes: {
grpc_mock_endpoint_put_read(
mock_endpoint, grpc_slice_from_copied_buffer(
network_input.single_read_bytes().data(),
network_input.single_read_bytes().size()));
grpc_mock_endpoint_finish_put_reads(mock_endpoint);
} break;
case fuzzer_input::NetworkInput::kInputSegments: {
int delay_ms = 0;
for (const auto& segment : network_input.input_segments().segments()) {
delay_ms += Clamp(segment.delay_ms(), 0, 1000);
event_engine->RunAfterExactly(
std::chrono::milliseconds(delay_ms), [mock_endpoint, segment] {
grpc_mock_endpoint_put_read(mock_endpoint,
SliceFromSegment(segment));
});
}
event_engine->RunAfterExactly(
std::chrono::milliseconds(delay_ms + 1), [mock_endpoint] {
grpc_mock_endpoint_finish_put_reads(mock_endpoint);
});
} break;
case fuzzer_input::NetworkInput::VALUE_NOT_SET:
grpc_mock_endpoint_finish_put_reads(mock_endpoint);
break;
}
}
} // namespace grpc_core

@ -0,0 +1,31 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_END2END_FUZZERS_NETWORK_INPUT_H
#define GRPC_TEST_CORE_END2END_FUZZERS_NETWORK_INPUT_H
#include "src/core/lib/iomgr/endpoint.h"
#include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
namespace grpc_core {
void ScheduleReads(
const fuzzer_input::NetworkInput& network_input,
grpc_endpoint* mock_endpoint,
grpc_event_engine::experimental::FuzzingEventEngine* event_engine);
}
#endif // GRPC_TEST_CORE_END2END_FUZZERS_NETWORK_INPUT_H

@ -14,8 +14,12 @@
#include <stdint.h>
#include <string>
#include <memory>
#include "absl/strings/str_cat.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
@ -26,18 +30,29 @@
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/experiments/config.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/surface/event_string.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/transport/transport_fwd.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
#include "test/core/end2end/fuzzers/network_input.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/util/fuzz_config_vars.h"
#include "test/core/util/mock_endpoint.h"
using ::grpc_event_engine::experimental::FuzzingEventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
bool squelch = true;
bool leak_check = true;
@ -48,7 +63,18 @@ static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
static void dont_log(gpr_log_func_args* /*args*/) {}
DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) {
if (squelch) gpr_set_log_function(dont_log);
if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) {
gpr_set_log_function(dont_log);
}
grpc_core::ApplyFuzzConfigVars(msg.config_vars());
grpc_core::TestOnlyReloadExperimentsFromConfigVariables();
grpc_event_engine::experimental::SetEventEngineFactory(
[actions = msg.event_engine_actions()]() {
return std::make_unique<FuzzingEventEngine>(
FuzzingEventEngine::Options(), actions);
});
auto event_engine =
std::dynamic_pointer_cast<FuzzingEventEngine>(GetDefaultEventEngine());
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
@ -56,12 +82,8 @@ DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) {
grpc_resource_quota* resource_quota =
grpc_resource_quota_create("context_list_test");
grpc_endpoint* mock_endpoint = grpc_mock_endpoint_create(discard_write);
if (msg.network_input().has_single_read_bytes()) {
grpc_mock_endpoint_put_read(
mock_endpoint, grpc_slice_from_copied_buffer(
msg.network_input().single_read_bytes().data(),
msg.network_input().single_read_bytes().size()));
}
grpc_core::ScheduleReads(msg.network_input(), mock_endpoint,
event_engine.get());
grpc_server* server = grpc_server_create(nullptr, nullptr);
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
grpc_server_register_completion_queue(server, cq, nullptr);
@ -93,6 +115,7 @@ DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) {
grpc_event ev;
while (true) {
event_engine->Tick();
grpc_core::ExecCtx::Get()->Flush();
ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME),
nullptr);
@ -114,40 +137,39 @@ DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) {
if (call1 != nullptr) grpc_call_unref(call1);
grpc_server_shutdown_and_notify(server, cq, tag(0xdead));
grpc_server_cancel_all_calls(server);
grpc_core::Timestamp deadline =
grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(5);
for (int i = 0; i <= requested_calls; i++) {
// A single grpc_completion_queue_next might not be sufficient for getting
// the tag from shutdown, because we might potentially get blocked by
// an operation happening on the timer thread.
// For example, the deadline timer might expire, leading to the timer
// thread trying to cancel the RPC and thereby acquiring a few references
// to the call. This will prevent the shutdown to complete till the timer
// thread releases those references.
// As a solution, we are going to keep performing a cq_next for a
// liberal period of 5 seconds for the timer thread to complete its work.
do {
ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME),
nullptr);
grpc_core::ExecCtx::Get()->InvalidateNow();
} while (ev.type != GRPC_OP_COMPLETE &&
grpc_core::Timestamp::Now() < deadline);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
bool got_dead = false;
while (requested_calls > 0 && !got_dead) {
event_engine->Tick();
ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME),
nullptr);
if (ev.type == GRPC_OP_COMPLETE) {
switch (reinterpret_cast<uintptr_t>(ev.tag)) {
case 1:
requested_calls--;
break;
case 0xdead:
got_dead = true;
break;
}
} else if (ev.type != GRPC_QUEUE_TIMEOUT) {
grpc_core::Crash(
absl::StrCat("Unexpected cq event: ", grpc_event_string(&ev)));
}
grpc_core::ExecCtx::Get()->InvalidateNow();
}
grpc_completion_queue_shutdown(cq);
for (int i = 0; i <= requested_calls; i++) {
do {
ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME),
nullptr);
grpc_core::ExecCtx::Get()->InvalidateNow();
} while (ev.type != GRPC_QUEUE_SHUTDOWN &&
grpc_core::Timestamp::Now() < deadline);
GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
}
do {
ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME),
nullptr);
grpc_core::ExecCtx::Get()->InvalidateNow();
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
grpc_call_details_destroy(&call_details1);
grpc_metadata_array_destroy(&request_metadata1);
grpc_server_destroy(server);
grpc_completion_queue_destroy(cq);
}
grpc_shutdown();
event_engine->TickUntilIdle();
grpc_shutdown_blocking();
event_engine->UnsetGlobalHooks();
}

@ -526,11 +526,19 @@ EventEngine::TaskHandle FuzzingEventEngine::RunAfter(
std::move(closure));
}
EventEngine::TaskHandle FuzzingEventEngine::RunAfterExactly(
Duration when, absl::AnyInvocable<void()> closure) {
grpc_core::MutexLock lock(&*mu_);
// (b/258949216): Cap it to one year to avoid integer overflow errors.
return RunAfterLocked(RunType::kExact, std::min(when, kOneYear),
std::move(closure));
}
EventEngine::TaskHandle FuzzingEventEngine::RunAfterLocked(
RunType run_type, Duration when, absl::AnyInvocable<void()> closure) {
const intptr_t id = next_task_id_;
++next_task_id_;
if (!task_delays_.empty()) {
if (run_type != RunType::kExact && !task_delays_.empty()) {
when += grpc_core::Clamp(task_delays_.front(), Duration::zero(),
max_delay_[static_cast<int>(run_type)]);
task_delays_.pop();

@ -112,6 +112,9 @@ class FuzzingEventEngine : public EventEngine {
ABSL_LOCKS_EXCLUDED(mu_) override;
bool Cancel(TaskHandle handle) ABSL_LOCKS_EXCLUDED(mu_) override;
TaskHandle RunAfterExactly(Duration when, absl::AnyInvocable<void()> closure)
ABSL_LOCKS_EXCLUDED(mu_);
Time Now() ABSL_LOCKS_EXCLUDED(mu_);
// Clear any global hooks installed by this event engine. Call prior to
@ -123,6 +126,7 @@ class FuzzingEventEngine : public EventEngine {
enum class RunType {
kWrite,
kRunAfter,
kExact,
};
// One pending task to be run.

@ -69,6 +69,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_mock_endpoint_put_read(
mock_endpoint, grpc_slice_from_copied_buffer((const char*)data, size));
grpc_mock_endpoint_finish_put_reads(mock_endpoint);
// Load key pair and establish server SSL credentials.
grpc_slice ca_slice, cert_slice, key_slice;

@ -41,6 +41,7 @@ class ConfigurationTest : public ::testing::Test {
protected:
ConfigurationTest() {
mock_endpoint_ = grpc_mock_endpoint_create(DiscardWrite);
grpc_mock_endpoint_finish_put_reads(mock_endpoint_);
args_ = args_.SetObject(ResourceQuota::Default());
args_ = args_.SetObject(
grpc_event_engine::experimental::GetDefaultEventEngine());

@ -23,6 +23,7 @@
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/gprpp/debug_location.h"
@ -39,6 +40,8 @@ typedef struct mock_endpoint {
grpc_slice_buffer read_buffer;
grpc_slice_buffer* on_read_out;
grpc_closure* on_read;
bool put_reads_done;
bool destroyed;
} mock_endpoint;
static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
@ -86,13 +89,28 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
gpr_mu_unlock(&m->mu);
}
static void me_destroy(grpc_endpoint* ep) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
static void destroy(mock_endpoint* m) {
grpc_slice_buffer_destroy(&m->read_buffer);
gpr_mu_destroy(&m->mu);
gpr_free(m);
}
static void me_destroy(grpc_endpoint* ep) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
m->destroyed = true;
if (m->put_reads_done) {
destroy(m);
}
}
void grpc_mock_endpoint_finish_put_reads(grpc_endpoint* ep) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
m->put_reads_done = true;
if (m->destroyed) {
destroy(m);
}
}
static absl::string_view me_get_peer(grpc_endpoint* /*ep*/) {
return "fake:mock_endpoint";
}
@ -124,12 +142,15 @@ grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice)) {
gpr_mu_init(&m->mu);
m->on_write = on_write;
m->on_read = nullptr;
m->put_reads_done = false;
m->destroyed = false;
return &m->base;
}
void grpc_mock_endpoint_put_read(grpc_endpoint* ep, grpc_slice slice) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
gpr_mu_lock(&m->mu);
GPR_ASSERT(!m->put_reads_done);
if (m->on_read != nullptr) {
grpc_slice_buffer_add(m->on_read_out, slice);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, absl::OkStatus());

@ -25,5 +25,6 @@
grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice));
void grpc_mock_endpoint_put_read(grpc_endpoint* ep, grpc_slice slice);
void grpc_mock_endpoint_finish_put_reads(grpc_endpoint* ep);
#endif // GRPC_TEST_CORE_UTIL_MOCK_ENDPOINT_H

Loading…
Cancel
Save