mirror of https://github.com/grpc/grpc.git
Reland fuzzing event engine (#29943)
* Revert "Revert "[event_engine] Fuzzing event engine (#29926)" (#29942)"
This reverts commit cb2a92b5bb
.
* fix unused args
pull/29945/head
parent
27e6a094e1
commit
72e65b03cf
9 changed files with 455 additions and 0 deletions
@ -0,0 +1,42 @@ |
||||
# Copyright 2021 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_package") |
||||
|
||||
licenses(["notice"]) |
||||
|
||||
grpc_package( |
||||
name = "test/core/event_engine/fuzzing_event_engine", |
||||
visibility = "tests", |
||||
) |
||||
|
||||
grpc_cc_library( |
||||
name = "fuzzing_event_engine", |
||||
srcs = ["fuzzing_event_engine.cc"], |
||||
hdrs = ["fuzzing_event_engine.h"], |
||||
deps = [ |
||||
":fuzzing_event_engine_cc_proto", |
||||
"//:event_engine_base_hdrs", |
||||
], |
||||
) |
||||
|
||||
proto_library( |
||||
name = "fuzzing_event_engine_proto", |
||||
srcs = ["fuzzing_event_engine.proto"], |
||||
) |
||||
|
||||
cc_proto_library( |
||||
name = "fuzzing_event_engine_cc_proto", |
||||
deps = ["fuzzing_event_engine_proto"], |
||||
) |
@ -0,0 +1,131 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
namespace { |
||||
const intptr_t kTaskHandleSalt = 12345; |
||||
} |
||||
|
||||
FuzzingEventEngine::FuzzingEventEngine(Options options) |
||||
: final_tick_length_(options.final_tick_length) { |
||||
for (const auto& delay : options.actions.tick_lengths()) { |
||||
tick_increments_[delay.id()] += absl::Microseconds(delay.delay_us()); |
||||
} |
||||
for (const auto& delay : options.actions.run_delay()) { |
||||
task_delays_[delay.id()] += absl::Microseconds(delay.delay_us()); |
||||
} |
||||
} |
||||
|
||||
void FuzzingEventEngine::Tick() { |
||||
std::vector<std::function<void()>> to_run; |
||||
{ |
||||
grpc_core::MutexLock lock(&mu_); |
||||
// Increment time
|
||||
auto tick_it = tick_increments_.find(current_tick_); |
||||
if (tick_it != tick_increments_.end()) { |
||||
now_ += tick_it->second; |
||||
tick_increments_.erase(tick_it); |
||||
} else if (tick_increments_.empty()) { |
||||
now_ += final_tick_length_; |
||||
} |
||||
++current_tick_; |
||||
// Find newly expired timers.
|
||||
while (!tasks_by_time_.empty() && tasks_by_time_.begin()->first <= now_) { |
||||
tasks_by_id_.erase(tasks_by_time_.begin()->second->id); |
||||
to_run.push_back(std::move(tasks_by_time_.begin()->second->closure)); |
||||
tasks_by_time_.erase(tasks_by_time_.begin()); |
||||
} |
||||
} |
||||
for (auto& closure : to_run) { |
||||
closure(); |
||||
} |
||||
} |
||||
|
||||
absl::Time FuzzingEventEngine::Now() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
return now_; |
||||
} |
||||
|
||||
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> |
||||
FuzzingEventEngine::CreateListener(Listener::AcceptCallback, |
||||
std::function<void(absl::Status)>, |
||||
const EndpointConfig&, |
||||
std::unique_ptr<MemoryAllocatorFactory>) { |
||||
abort(); |
||||
} |
||||
|
||||
EventEngine::ConnectionHandle FuzzingEventEngine::Connect( |
||||
OnConnectCallback, const ResolvedAddress&, const EndpointConfig&, |
||||
MemoryAllocator, absl::Time) { |
||||
abort(); |
||||
} |
||||
|
||||
bool FuzzingEventEngine::CancelConnect(ConnectionHandle) { abort(); } |
||||
|
||||
bool FuzzingEventEngine::IsWorkerThread() { abort(); } |
||||
|
||||
std::unique_ptr<EventEngine::DNSResolver> FuzzingEventEngine::GetDNSResolver( |
||||
const DNSResolver::ResolverOptions&) { |
||||
abort(); |
||||
} |
||||
|
||||
void FuzzingEventEngine::Run(Closure* closure) { RunAt(Now(), closure); } |
||||
|
||||
void FuzzingEventEngine::Run(std::function<void()> closure) { |
||||
RunAt(Now(), closure); |
||||
} |
||||
|
||||
EventEngine::TaskHandle FuzzingEventEngine::RunAt(absl::Time when, |
||||
Closure* closure) { |
||||
return RunAt(when, [closure]() { closure->Run(); }); |
||||
} |
||||
|
||||
EventEngine::TaskHandle FuzzingEventEngine::RunAt( |
||||
absl::Time when, std::function<void()> closure) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
const intptr_t id = next_task_id_; |
||||
++next_task_id_; |
||||
const auto delay_it = task_delays_.find(id); |
||||
// Under fuzzer configuration control, maybe make the task run later.
|
||||
if (delay_it != task_delays_.end()) { |
||||
when += delay_it->second; |
||||
task_delays_.erase(delay_it); |
||||
} |
||||
auto task = std::make_shared<Task>(id, std::move(closure)); |
||||
tasks_by_id_.emplace(id, task); |
||||
tasks_by_time_.emplace(when, std::move(task)); |
||||
return TaskHandle{id, kTaskHandleSalt}; |
||||
} |
||||
|
||||
bool FuzzingEventEngine::Cancel(TaskHandle handle) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
GPR_ASSERT(handle.keys[1] == kTaskHandleSalt); |
||||
const intptr_t id = handle.keys[0]; |
||||
auto it = tasks_by_id_.find(id); |
||||
if (it == tasks_by_id_.end()) { |
||||
return false; |
||||
} |
||||
if (it->second == nullptr) { |
||||
return false; |
||||
} |
||||
it->second = nullptr; |
||||
return true; |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
@ -0,0 +1,94 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H |
||||
#define GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H |
||||
|
||||
#include <cstdint> |
||||
#include <map> |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
// EventEngine implementation to be used by fuzzers.
|
||||
class FuzzingEventEngine : public EventEngine { |
||||
public: |
||||
struct Options { |
||||
// After all scheduled tick lengths are completed, this is the amount of
|
||||
// time Now() will be incremented each tick.
|
||||
absl::Duration final_tick_length = absl::Seconds(1); |
||||
fuzzing_event_engine::Actions actions; |
||||
}; |
||||
explicit FuzzingEventEngine(Options options); |
||||
void Tick(); |
||||
|
||||
absl::StatusOr<std::unique_ptr<Listener>> CreateListener( |
||||
Listener::AcceptCallback on_accept, |
||||
std::function<void(absl::Status)> on_shutdown, |
||||
const EndpointConfig& config, |
||||
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) |
||||
override; |
||||
|
||||
ConnectionHandle Connect(OnConnectCallback on_connect, |
||||
const ResolvedAddress& addr, |
||||
const EndpointConfig& args, |
||||
MemoryAllocator memory_allocator, |
||||
absl::Time deadline) override; |
||||
|
||||
bool CancelConnect(ConnectionHandle handle) override; |
||||
|
||||
bool IsWorkerThread() override; |
||||
|
||||
std::unique_ptr<DNSResolver> GetDNSResolver( |
||||
const DNSResolver::ResolverOptions& options) override; |
||||
|
||||
void Run(Closure* closure) override; |
||||
void Run(std::function<void()> closure) override; |
||||
TaskHandle RunAt(absl::Time when, Closure* closure) override; |
||||
TaskHandle RunAt(absl::Time when, std::function<void()> closure) override; |
||||
bool Cancel(TaskHandle handle) override; |
||||
|
||||
absl::Time Now() ABSL_LOCKS_EXCLUDED(mu_); |
||||
|
||||
private: |
||||
struct Task { |
||||
Task(intptr_t id, std::function<void()> closure) |
||||
: id(id), closure(std::move(closure)) {} |
||||
intptr_t id; |
||||
std::function<void()> closure; |
||||
}; |
||||
|
||||
const absl::Duration final_tick_length_; |
||||
|
||||
grpc_core::Mutex mu_; |
||||
|
||||
intptr_t next_task_id_ ABSL_GUARDED_BY(mu_) = 1; |
||||
intptr_t current_tick_ ABSL_GUARDED_BY(mu_) = 0; |
||||
absl::Time now_ ABSL_GUARDED_BY(mu_) = absl::Now(); |
||||
std::map<intptr_t, absl::Duration> tick_increments_ ABSL_GUARDED_BY(mu_); |
||||
std::map<intptr_t, absl::Duration> task_delays_ ABSL_GUARDED_BY(mu_); |
||||
std::map<intptr_t, std::shared_ptr<Task>> tasks_by_id_ ABSL_GUARDED_BY(mu_); |
||||
std::multimap<absl::Time, std::shared_ptr<Task>> tasks_by_time_ |
||||
ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif |
@ -0,0 +1,29 @@ |
||||
// Copyright 2021 gRPC authors. |
||||
// |
||||
// Licensed under the Apache License, Version 2.0 (the "License"); |
||||
// you may not use this file except in compliance with the License. |
||||
// You may obtain a copy of the License at |
||||
// |
||||
// http://www.apache.org/licenses/LICENSE-2.0 |
||||
// |
||||
// Unless required by applicable law or agreed to in writing, software |
||||
// distributed under the License is distributed on an "AS IS" BASIS, |
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
// See the License for the specific language governing permissions and |
||||
// limitations under the License. |
||||
|
||||
syntax = "proto3"; |
||||
|
||||
package fuzzing_event_engine; |
||||
|
||||
message Actions { |
||||
// Map of tick event (0..??) to the amount to increment the timer by. |
||||
repeated Delay tick_lengths = 1; |
||||
// Map of task id to the amount to delay execution of the task by. |
||||
repeated Delay run_delay = 2; |
||||
}; |
||||
|
||||
message Delay { |
||||
uint32 id = 1; |
||||
uint64 delay_us = 2; |
||||
}; |
@ -0,0 +1,63 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" |
||||
|
||||
#include <thread> |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "test/core/event_engine/test_suite/event_engine_test.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
namespace { |
||||
|
||||
class ThreadedFuzzingEventEngine : public FuzzingEventEngine { |
||||
public: |
||||
ThreadedFuzzingEventEngine() |
||||
: FuzzingEventEngine([]() { |
||||
Options options; |
||||
options.final_tick_length = absl::Milliseconds(10); |
||||
return options; |
||||
}()), |
||||
main_([this]() { |
||||
while (!done_.load()) { |
||||
absl::SleepFor(absl::Milliseconds(10)); |
||||
Tick(); |
||||
} |
||||
}) {} |
||||
|
||||
~ThreadedFuzzingEventEngine() override { |
||||
done_.store(true); |
||||
main_.join(); |
||||
} |
||||
|
||||
private: |
||||
std::atomic<bool> done_{false}; |
||||
std::thread main_; |
||||
}; |
||||
|
||||
} // namespace
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
int main(int argc, char** argv) { |
||||
testing::InitGoogleTest(&argc, argv); |
||||
SetEventEngineFactory([]() { |
||||
return absl::make_unique< |
||||
grpc_event_engine::experimental::ThreadedFuzzingEventEngine>(); |
||||
}); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue