mirror of https://github.com/grpc/grpc.git
IomgrEventEngine Redux (#29693)
* Revert "Revert "IomgrEventEngine (#29616)" (#29692)"
This reverts commit 246d13e392
.
* temporarily disable EE usage to coordinate landing
* spelling
pull/29668/head
parent
d4093799ad
commit
250b8d2dae
30 changed files with 786 additions and 32 deletions
@ -0,0 +1,67 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_HANDLE_CONTAINERS_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_HANDLE_CONTAINERS_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include <cstdint> |
||||
#include <utility> |
||||
|
||||
#include "absl/container/flat_hash_set.h" |
||||
#include "absl/hash/hash.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
// Used for heterogeneous lookup of TaskHandles in abseil containers.
|
||||
template <typename TaskHandle> |
||||
struct TaskHandleComparator { |
||||
struct Hash { |
||||
using HashType = std::pair<const intptr_t, const intptr_t>; |
||||
using is_transparent = void; |
||||
size_t operator()(const TaskHandle& handle) const { |
||||
return absl::Hash<HashType>()({handle.keys[0], handle.keys[1]}); |
||||
} |
||||
}; |
||||
struct Eq { |
||||
using is_transparent = void; |
||||
bool operator()(const TaskHandle& lhs, const TaskHandle& rhs) const { |
||||
return lhs.keys[0] == rhs.keys[0] && lhs.keys[1] == rhs.keys[1]; |
||||
} |
||||
}; |
||||
}; |
||||
|
||||
using TaskHandleSet = absl::flat_hash_set< |
||||
grpc_event_engine::experimental::EventEngine::TaskHandle, |
||||
TaskHandleComparator< |
||||
grpc_event_engine::experimental::EventEngine::TaskHandle>::Hash, |
||||
TaskHandleComparator< |
||||
grpc_event_engine::experimental::EventEngine::TaskHandle>::Eq>; |
||||
|
||||
using LookupTaskHandleSet = absl::flat_hash_set< |
||||
grpc_event_engine::experimental::EventEngine::DNSResolver::LookupTaskHandle, |
||||
TaskHandleComparator<grpc_event_engine::experimental::EventEngine:: |
||||
DNSResolver::LookupTaskHandle>::Hash, |
||||
TaskHandleComparator<grpc_event_engine::experimental::EventEngine:: |
||||
DNSResolver::LookupTaskHandle>::Eq>; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_HANDLE_CONTAINERS_H
|
@ -0,0 +1,206 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine.h" |
||||
|
||||
#include <string> |
||||
#include <type_traits> |
||||
#include <utility> |
||||
|
||||
#include "absl/cleanup/cleanup.h" |
||||
#include "absl/container/flat_hash_set.h" |
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/time/clock.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/event_engine/trace.h" |
||||
#include "src/core/lib/gprpp/match.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/iomgr/executor.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
namespace { |
||||
|
||||
struct ClosureData { |
||||
grpc_timer timer; |
||||
grpc_closure closure; |
||||
absl::variant<std::function<void()>, EventEngine::Closure*> cb; |
||||
IomgrEventEngine* engine; |
||||
EventEngine::TaskHandle handle; |
||||
}; |
||||
|
||||
// Timer limits due to quirks in the iomgr implementation.
|
||||
// If deadline <= Now, the callback will be run inline, which can result in lock
|
||||
// issues. And absl::InfiniteFuture yields UB.
|
||||
absl::Time Clamp(absl::Time when) { |
||||
absl::Time max = absl::Now() + absl::Hours(8766); |
||||
absl::Time min = absl::Now() + absl::Milliseconds(2); |
||||
if (when > max) return max; |
||||
if (when < min) return min; |
||||
return when; |
||||
} |
||||
|
||||
std::string HandleToString(EventEngine::TaskHandle handle) { |
||||
return absl::StrCat("{", handle.keys[0], ",", handle.keys[1], "}"); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
IomgrEventEngine::IomgrEventEngine() {} |
||||
|
||||
IomgrEventEngine::~IomgrEventEngine() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { |
||||
for (auto handle : known_handles_) { |
||||
gpr_log(GPR_ERROR, |
||||
"(event_engine) IomgrEventEngine:%p uncleared TaskHandle at " |
||||
"shutdown:%s", |
||||
this, HandleToString(handle).c_str()); |
||||
} |
||||
} |
||||
GPR_ASSERT(GPR_LIKELY(known_handles_.empty())); |
||||
} |
||||
|
||||
bool IomgrEventEngine::Cancel(EventEngine::TaskHandle handle) { |
||||
grpc_core::ExecCtx ctx; |
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (!known_handles_.contains(handle)) return false; |
||||
auto* cd = reinterpret_cast<ClosureData*>(handle.keys[0]); |
||||
grpc_timer_cancel(&cd->timer); |
||||
known_handles_.erase(handle); |
||||
return true; |
||||
} |
||||
|
||||
EventEngine::TaskHandle IomgrEventEngine::RunAt(absl::Time when, |
||||
std::function<void()> closure) { |
||||
return RunAtInternal(when, std::move(closure)); |
||||
} |
||||
|
||||
EventEngine::TaskHandle IomgrEventEngine::RunAt(absl::Time when, |
||||
EventEngine::Closure* closure) { |
||||
return RunAtInternal(when, closure); |
||||
} |
||||
|
||||
void IomgrEventEngine::Run(std::function<void()> closure) { |
||||
RunInternal(closure); |
||||
} |
||||
|
||||
void IomgrEventEngine::Run(EventEngine::Closure* closure) { |
||||
RunInternal(closure); |
||||
} |
||||
|
||||
EventEngine::TaskHandle IomgrEventEngine::RunAtInternal( |
||||
absl::Time when, |
||||
absl::variant<std::function<void()>, EventEngine::Closure*> cb) { |
||||
when = Clamp(when); |
||||
grpc_core::ExecCtx ctx; |
||||
auto* cd = new ClosureData; |
||||
cd->cb = std::move(cb); |
||||
cd->engine = this; |
||||
GRPC_CLOSURE_INIT( |
||||
&cd->closure, |
||||
[](void* arg, grpc_error_handle error) { |
||||
auto* cd = static_cast<ClosureData*>(arg); |
||||
GRPC_EVENT_ENGINE_TRACE("IomgrEventEngine:%p executing callback:%s", |
||||
cd->engine, HandleToString(cd->handle).c_str()); |
||||
{ |
||||
grpc_core::MutexLock lock(&cd->engine->mu_); |
||||
cd->engine->known_handles_.erase(cd->handle); |
||||
} |
||||
auto cleaner = absl::MakeCleanup([cd] { delete cd; }); |
||||
if (error == GRPC_ERROR_CANCELLED) return; |
||||
grpc_core::Match( |
||||
cd->cb, [](EventEngine::Closure* cb) { cb->Run(); }, |
||||
[](std::function<void()> fn) { fn(); }); |
||||
}, |
||||
cd, nullptr); |
||||
// kludge to deal with realtime/monotonic clock conversion
|
||||
absl::Time absl_now = absl::Now(); |
||||
grpc_core::Duration duration = grpc_core::Duration::Milliseconds( |
||||
absl::ToInt64Milliseconds(when - absl_now) + 1); |
||||
grpc_core::ExecCtx::Get()->InvalidateNow(); |
||||
grpc_core::Timestamp when_internal = grpc_core::ExecCtx::Get()->Now() + |
||||
duration + |
||||
grpc_core::Duration::Milliseconds(1); |
||||
EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd), |
||||
aba_token_.fetch_add(1)}; |
||||
grpc_core::MutexLock lock(&mu_); |
||||
known_handles_.insert(handle); |
||||
cd->handle = handle; |
||||
GRPC_EVENT_ENGINE_TRACE("IomgrEventEngine:%p scheduling callback:%s", this, |
||||
HandleToString(handle).c_str()); |
||||
grpc_timer_init(&cd->timer, when_internal, &cd->closure); |
||||
return handle; |
||||
} |
||||
|
||||
void IomgrEventEngine::RunInternal( |
||||
absl::variant<std::function<void()>, EventEngine::Closure*> cb) { |
||||
auto* cd = new ClosureData; |
||||
cd->cb = std::move(cb); |
||||
cd->engine = this; |
||||
GRPC_CLOSURE_INIT( |
||||
&cd->closure, |
||||
[](void* arg, grpc_error_handle /*error*/) { |
||||
auto* cd = static_cast<ClosureData*>(arg); |
||||
auto cleaner = absl::MakeCleanup([cd] { delete cd; }); |
||||
grpc_core::Match( |
||||
cd->cb, [](EventEngine::Closure* cb) { cb->Run(); }, |
||||
[](std::function<void()> fn) { fn(); }); |
||||
}, |
||||
cd, nullptr); |
||||
// TODO(hork): have the EE spawn dedicated closure thread(s)
|
||||
grpc_core::Executor::Run(&cd->closure, GRPC_ERROR_NONE); |
||||
} |
||||
|
||||
std::unique_ptr<EventEngine::DNSResolver> IomgrEventEngine::GetDNSResolver( |
||||
EventEngine::DNSResolver::ResolverOptions const& /*options*/) { |
||||
GPR_ASSERT(false && "unimplemented"); |
||||
} |
||||
|
||||
bool IomgrEventEngine::IsWorkerThread() { |
||||
GPR_ASSERT(false && "unimplemented"); |
||||
} |
||||
|
||||
bool IomgrEventEngine::CancelConnect(EventEngine::ConnectionHandle /*handle*/) { |
||||
GPR_ASSERT(false && "unimplemented"); |
||||
} |
||||
|
||||
EventEngine::ConnectionHandle IomgrEventEngine::Connect( |
||||
OnConnectCallback /*on_connect*/, const ResolvedAddress& /*addr*/, |
||||
const EndpointConfig& /*args*/, MemoryAllocator /*memory_allocator*/, |
||||
absl::Time /*deadline*/) { |
||||
GPR_ASSERT(false && "unimplemented"); |
||||
} |
||||
|
||||
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> |
||||
IomgrEventEngine::CreateListener( |
||||
Listener::AcceptCallback /*on_accept*/, |
||||
std::function<void(absl::Status)> /*on_shutdown*/, |
||||
const EndpointConfig& /*config*/, |
||||
std::unique_ptr<MemoryAllocatorFactory> /*memory_allocator_factory*/) { |
||||
GPR_ASSERT(false && "unimplemented"); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
@ -0,0 +1,118 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_H |
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stdint.h> |
||||
|
||||
#include <atomic> |
||||
#include <functional> |
||||
#include <memory> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/string_view.h" |
||||
#include "absl/time/time.h" |
||||
#include "absl/types/variant.h" |
||||
|
||||
#include <grpc/event_engine/endpoint_config.h> |
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/event_engine/memory_allocator.h> |
||||
#include <grpc/event_engine/slice_buffer.h> |
||||
|
||||
#include "src/core/lib/event_engine/handle_containers.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
class IomgrEventEngine final : public EventEngine { |
||||
public: |
||||
class IomgrEndpoint : public EventEngine::Endpoint { |
||||
public: |
||||
~IomgrEndpoint() override; |
||||
void Read(std::function<void(absl::Status)> on_read, SliceBuffer* buffer, |
||||
const ReadArgs* args) override; |
||||
void Write(std::function<void(absl::Status)> on_writable, SliceBuffer* data, |
||||
const WriteArgs* args) override; |
||||
const ResolvedAddress& GetPeerAddress() const override; |
||||
const ResolvedAddress& GetLocalAddress() const override; |
||||
}; |
||||
class IomgrListener : public EventEngine::Listener { |
||||
public: |
||||
~IomgrListener() override; |
||||
absl::StatusOr<int> Bind(const ResolvedAddress& addr) override; |
||||
absl::Status Start() override; |
||||
}; |
||||
class IomgrDNSResolver : public EventEngine::DNSResolver { |
||||
public: |
||||
~IomgrDNSResolver() override; |
||||
LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve, |
||||
absl::string_view name, |
||||
absl::string_view default_port, |
||||
absl::Time deadline) override; |
||||
LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve, |
||||
absl::string_view name, |
||||
absl::Time deadline) override; |
||||
LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve, |
||||
absl::string_view name, |
||||
absl::Time deadline) override; |
||||
bool CancelLookup(LookupTaskHandle handle) override; |
||||
}; |
||||
|
||||
IomgrEventEngine(); |
||||
~IomgrEventEngine() override; |
||||
|
||||
absl::StatusOr<std::unique_ptr<Listener>> CreateListener( |
||||
Listener::AcceptCallback on_accept, |
||||
std::function<void(absl::Status)> on_shutdown, |
||||
const EndpointConfig& config, |
||||
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) |
||||
override; |
||||
|
||||
ConnectionHandle Connect(OnConnectCallback on_connect, |
||||
const ResolvedAddress& addr, |
||||
const EndpointConfig& args, |
||||
MemoryAllocator memory_allocator, |
||||
absl::Time deadline) override; |
||||
|
||||
bool CancelConnect(ConnectionHandle handle) override; |
||||
bool IsWorkerThread() override; |
||||
std::unique_ptr<DNSResolver> GetDNSResolver( |
||||
const DNSResolver::ResolverOptions& options) override; |
||||
void Run(Closure* closure) override; |
||||
void Run(std::function<void()> closure) override; |
||||
TaskHandle RunAt(absl::Time when, Closure* closure) override; |
||||
TaskHandle RunAt(absl::Time when, std::function<void()> closure) override; |
||||
bool Cancel(TaskHandle handle) override; |
||||
|
||||
private: |
||||
EventEngine::TaskHandle RunAtInternal( |
||||
absl::Time when, |
||||
absl::variant<std::function<void()>, EventEngine::Closure*> cb); |
||||
|
||||
void RunInternal( |
||||
absl::variant<std::function<void()>, EventEngine::Closure*> cb); |
||||
|
||||
grpc_core::Mutex mu_; |
||||
TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_); |
||||
std::atomic<intptr_t> aba_token_{0}; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_H
|
@ -0,0 +1,18 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
|
||||
grpc_core::TraceFlag grpc_event_engine_trace(false, "event_engine"); |
@ -0,0 +1,30 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_TRACE_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_TRACE_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
|
||||
extern grpc_core::TraceFlag grpc_event_engine_trace; |
||||
|
||||
#define GRPC_EVENT_ENGINE_TRACE(format, ...) \ |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { \
|
||||
gpr_log(GPR_DEBUG, "(event_engine) " format, __VA_ARGS__); \
|
||||
} |
||||
|
||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_TRACE_H
|
@ -0,0 +1,31 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/lib/event_engine/iomgr_engine.h" |
||||
#include "test/core/event_engine/test_suite/event_engine_test.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
int main(int argc, char** argv) { |
||||
testing::InitGoogleTest(&argc, argv); |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
SetEventEngineFactory([]() { |
||||
return absl::make_unique< |
||||
grpc_event_engine::experimental::IomgrEventEngine>(); |
||||
}); |
||||
grpc_init(); |
||||
auto result = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return result; |
||||
} |
Loading…
Reference in new issue