[ObjC]cf event engine timer (#31140)

* Apple CF event engine using posix timer

* update generated files

* fix ci errors
pull/31836/merge
Hannah Shi 2 years ago committed by GitHub
parent 8ee542daba
commit 24e158d5a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      BUILD
  2. 46
      CMakeLists.txt
  3. 21
      build_autogenerated.yaml
  4. 15
      src/core/BUILD
  5. 139
      src/core/lib/event_engine/cf_engine/cf_engine.cc
  6. 75
      src/core/lib/event_engine/cf_engine/cf_engine.h
  7. 14
      test/core/event_engine/test_suite/BUILD
  8. 43
      test/core/event_engine/test_suite/cf_event_engine_test.cc
  9. 22
      tools/run_tests/generated/tests.json

@ -148,6 +148,11 @@ config_setting(
values = {"cpu": "darwin"},
)
config_setting(
name = "mac_arm64",
values = {"cpu": "darwin_arm64"},
)
config_setting(
name = "use_strict_warning",
values = {"define": "use_strict_warning=true"},

46
CMakeLists.txt generated

@ -865,6 +865,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx cel_authorization_engine_test)
add_dependencies(buildtests_cxx certificate_provider_registry_test)
add_dependencies(buildtests_cxx certificate_provider_store_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx cf_event_engine_test)
endif()
add_dependencies(buildtests_cxx cfstream_test)
add_dependencies(buildtests_cxx channel_args_test)
add_dependencies(buildtests_cxx channel_arguments_test)
@ -7600,6 +7603,49 @@ target_link_libraries(certificate_provider_store_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(cf_event_engine_test
src/core/lib/event_engine/cf_engine/cf_engine.cc
test/core/event_engine/test_suite/cf_event_engine_test.cc
test/core/event_engine/test_suite/event_engine_test.cc
test/core/event_engine/test_suite/event_engine_test_utils.cc
test/core/event_engine/test_suite/timer_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(cf_event_engine_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(cf_event_engine_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -5440,6 +5440,27 @@ targets:
- test/core/xds/certificate_provider_store_test.cc
deps:
- grpc_test_util
- name: cf_event_engine_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/cf_engine/cf_engine.h
- test/core/event_engine/test_suite/event_engine_test.h
- test/core/event_engine/test_suite/event_engine_test_utils.h
src:
- src/core/lib/event_engine/cf_engine/cf_engine.cc
- test/core/event_engine/test_suite/cf_event_engine_test.cc
- test/core/event_engine/test_suite/event_engine_test.cc
- test/core/event_engine/test_suite/event_engine_test_utils.cc
- test/core/event_engine/test_suite/timer_test.cc
deps:
- grpc_test_util
platforms:
- linux
- posix
- mac
uses_polling: false
- name: cfstream_test
gtest: true
build: test

@ -1979,6 +1979,21 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "cf_event_engine",
srcs = ["lib/event_engine/cf_engine/cf_engine.cc"],
hdrs = ["lib/event_engine/cf_engine/cf_engine.h"],
deps = [
"event_engine_common",
"event_engine_trace",
"event_engine_utils",
"init_internally",
"posix_event_engine_timer_manager",
"//:event_engine_base_hdrs",
"//:gpr",
],
)
grpc_cc_library(
name = "event_engine_tcp_socket_utils",
srcs = [

@ -0,0 +1,139 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#ifdef GPR_APPLE
#include "src/core/lib/event_engine/cf_engine/cf_engine.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/utils.h"
namespace grpc_event_engine {
namespace experimental {
struct CFEventEngine::Closure final : public EventEngine::Closure {
absl::AnyInvocable<void()> cb;
Timer timer;
CFEventEngine* engine;
EventEngine::TaskHandle handle;
void Run() override {
GRPC_EVENT_ENGINE_TRACE("CFEventEngine:%p executing callback:%s", engine,
HandleToString(handle).c_str());
{
grpc_core::MutexLock lock(&engine->mu_);
engine->known_handles_.erase(handle);
}
cb();
delete this;
}
};
CFEventEngine::CFEventEngine()
: executor_(std::make_shared<ThreadPool>()), timer_manager_(executor_) {}
CFEventEngine::~CFEventEngine() {
{
grpc_core::MutexLock lock(&mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
for (auto handle : known_handles_) {
gpr_log(GPR_ERROR,
"CFEventEngine:%p uncleared TaskHandle at shutdown:%s", this,
HandleToString(handle).c_str());
}
}
GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
timer_manager_.Shutdown();
}
executor_->Quiesce();
}
absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CFEventEngine::CreateListener(
Listener::AcceptCallback /* on_accept */,
absl::AnyInvocable<void(absl::Status)> /* on_shutdown */,
const EndpointConfig& /* config */,
std::unique_ptr<MemoryAllocatorFactory> /* memory_allocator_factory */) {
GPR_ASSERT(false && "unimplemented");
}
CFEventEngine::ConnectionHandle CFEventEngine::Connect(
OnConnectCallback /* on_connect */, const ResolvedAddress& /* addr */,
const EndpointConfig& /* args */, MemoryAllocator /* memory_allocator */,
Duration /* timeout */) {
GPR_ASSERT(false && "unimplemented");
}
bool CFEventEngine::CancelConnect(ConnectionHandle /* handle */) {
GPR_ASSERT(false && "unimplemented");
}
bool CFEventEngine::IsWorkerThread() { GPR_ASSERT(false && "unimplemented"); }
std::unique_ptr<EventEngine::DNSResolver> CFEventEngine::GetDNSResolver(
const DNSResolver::ResolverOptions& /* options */) {
GPR_ASSERT(false && "unimplemented");
}
void CFEventEngine::Run(EventEngine::Closure* /* closure */) {
GPR_ASSERT(false && "unimplemented");
}
void CFEventEngine::Run(absl::AnyInvocable<void()> /* closure */) {
GPR_ASSERT(false && "unimplemented");
}
EventEngine::TaskHandle CFEventEngine::RunAfter(Duration when,
EventEngine::Closure* closure) {
return RunAfterInternal(when, [closure]() { closure->Run(); });
}
EventEngine::TaskHandle CFEventEngine::RunAfter(
Duration when, absl::AnyInvocable<void()> closure) {
return RunAfterInternal(when, std::move(closure));
}
bool CFEventEngine::Cancel(TaskHandle handle) {
grpc_core::MutexLock lock(&mu_);
if (!known_handles_.contains(handle)) return false;
auto* cd = reinterpret_cast<Closure*>(handle.keys[0]);
bool r = timer_manager_.TimerCancel(&cd->timer);
known_handles_.erase(handle);
if (r) delete cd;
return r;
}
EventEngine::TaskHandle CFEventEngine::RunAfterInternal(
Duration when, absl::AnyInvocable<void()> cb) {
auto when_ts = ToTimestamp(timer_manager_.Now(), when);
auto* cd = new Closure;
cd->cb = std::move(cb);
cd->engine = this;
EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
aba_token_.fetch_add(1)};
grpc_core::MutexLock lock(&mu_);
known_handles_.insert(handle);
cd->handle = handle;
GRPC_EVENT_ENGINE_TRACE("CFEventEngine:%p scheduling callback:%s", this,
HandleToString(handle).c_str());
timer_manager_.TimerInit(&cd->timer, when_ts, cd);
return handle;
}
} // namespace experimental
} // namespace grpc_event_engine
#endif // GPR_APPLE

@ -0,0 +1,75 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CF_ENGINE_H
#define GRPC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CF_ENGINE_H
#include <grpc/support/port_platform.h>
#ifdef GPR_APPLE
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/surface/init_internally.h"
namespace grpc_event_engine {
namespace experimental {
class CFEventEngine : public EventEngine,
public grpc_core::KeepsGrpcInitialized {
public:
CFEventEngine();
~CFEventEngine() override;
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
override;
ConnectionHandle Connect(OnConnectCallback on_connect,
const ResolvedAddress& addr,
const EndpointConfig& args,
MemoryAllocator memory_allocator,
Duration timeout) override;
bool CancelConnect(ConnectionHandle handle) override;
bool IsWorkerThread() override;
std::unique_ptr<DNSResolver> GetDNSResolver(
const DNSResolver::ResolverOptions& options) override;
void Run(Closure* closure) override;
void Run(absl::AnyInvocable<void()> closure) override;
TaskHandle RunAfter(Duration when, Closure* closure) override;
TaskHandle RunAfter(Duration when,
absl::AnyInvocable<void()> closure) override;
bool Cancel(TaskHandle handle) override;
private:
struct Closure;
EventEngine::TaskHandle RunAfterInternal(Duration when,
absl::AnyInvocable<void()> cb);
grpc_core::Mutex mu_;
TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_);
std::atomic<intptr_t> aba_token_{0};
std::shared_ptr<ThreadPool> executor_;
TimerManager timer_manager_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GPR_APPLE
#endif // GRPC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CF_ENGINE_H

@ -112,6 +112,20 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "cf_event_engine_test",
srcs = ["cf_event_engine_test.cc"],
tags = [
"no_linux",
"no_windows",
],
uses_polling = False,
deps = [
":timer",
"//src/core:cf_event_engine",
],
)
grpc_cc_test(
name = "fuzzing_event_engine_test",
srcs = ["fuzzing_event_engine_test.cc"],

@ -0,0 +1,43 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#ifdef GPR_APPLE
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/cf_engine/cf_engine.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
#include "test/core/util/test_config.h"
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
auto factory = []() {
return std::make_unique<grpc_event_engine::experimental::CFEventEngine>();
};
SetEventEngineFactories(factory, factory);
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();
int r = RUN_ALL_TESTS();
grpc_shutdown();
return r;
}
#else // GPR_APPLE
int main(int /* argc */, char** /* argv */) { return 0; }
#endif // GPR_APPLE

@ -1487,6 +1487,28 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "cf_event_engine_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save