[event_engine] Add a maximally threaded event engine wrapper (#32691)

Add an event manager that spawns threads just as much as it possibly
can... to expose TSAN to the myriad thread ordering problems in our code
base.

Next steps for this will be to add a new test mode for tsan + thready
event engine + a few other doodads to increase threads in the system
(party.cc in particular has a good place for a hook).

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/32698/head
Craig Tiller 2 years ago committed by GitHub
parent 43c3d1f932
commit bca85495d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 50
      CMakeLists.txt
  2. 28
      build_autogenerated.yaml
  3. 19
      test/core/event_engine/test_suite/BUILD
  4. 59
      test/core/event_engine/test_suite/thready_posix_event_engine_test.cc
  5. 33
      test/core/event_engine/thready_event_engine/BUILD
  6. 162
      test/core/event_engine/thready_event_engine/thready_event_engine.cc
  7. 106
      test/core/event_engine/thready_event_engine/thready_event_engine.h
  8. 22
      tools/run_tests/generated/tests.json

50
CMakeLists.txt generated

@ -1241,6 +1241,9 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx thread_stress_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx thready_posix_event_engine_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx time_jump_test)
endif()
@ -21172,6 +21175,53 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(thready_posix_event_engine_test
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/test_suite/event_engine_test_framework.cc
test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc
test/core/event_engine/test_suite/tests/client_test.cc
test/core/event_engine/test_suite/tests/server_test.cc
test/core/event_engine/test_suite/tests/timer_test.cc
test/core/event_engine/test_suite/thready_posix_event_engine_test.cc
test/core/event_engine/thready_event_engine/thready_event_engine.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_compile_features(thready_posix_event_engine_test PUBLIC cxx_std_14)
target_include_directories(thready_posix_event_engine_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(thready_posix_event_engine_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_unsecure
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -12028,6 +12028,34 @@ targets:
- linux
- posix
- mac
- name: thready_posix_event_engine_test
gtest: true
build: test
language: c++
headers:
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/test_suite/event_engine_test_framework.h
- test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h
- test/core/event_engine/test_suite/tests/client_test.h
- test/core/event_engine/test_suite/tests/server_test.h
- test/core/event_engine/test_suite/tests/timer_test.h
- test/core/event_engine/thready_event_engine/thready_event_engine.h
src:
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/test_suite/event_engine_test_framework.cc
- test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc
- test/core/event_engine/test_suite/tests/client_test.cc
- test/core/event_engine/test_suite/tests/server_test.cc
- test/core/event_engine/test_suite/tests/timer_test.cc
- test/core/event_engine/test_suite/thready_posix_event_engine_test.cc
- test/core/event_engine/thready_event_engine/thready_event_engine.cc
deps:
- grpc_unsecure
- grpc_test_util
platforms:
- linux
- posix
- mac
- name: time_jump_test
gtest: true
build: test

@ -54,6 +54,25 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "thready_posix_event_engine_test",
srcs = ["thready_posix_event_engine_test.cc"],
tags = [
"no_windows",
],
uses_event_engine = True,
uses_polling = True,
deps = [
"//src/core:posix_event_engine",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/event_engine/test_suite/posix:oracle_event_engine_posix",
"//test/core/event_engine/test_suite/tests:client",
"//test/core/event_engine/test_suite/tests:server",
"//test/core/event_engine/test_suite/tests:timer",
"//test/core/event_engine/thready_event_engine",
],
)
grpc_cc_test(
name = "windows_event_engine_test",
srcs = ["windows_event_engine_test.cc"],

@ -0,0 +1,59 @@
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include <gtest/gtest.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include "src/core/lib/experiments/config.h"
#include "test/core/event_engine/test_suite/event_engine_test_framework.h"
#include "test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h"
#include "test/core/event_engine/test_suite/tests/client_test.h"
#include "test/core/event_engine/test_suite/tests/server_test.h"
#include "test/core/event_engine/test_suite/tests/timer_test.h"
#include "test/core/event_engine/thready_event_engine/thready_event_engine.h"
#include "test/core/util/test_config.h"
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
SetEventEngineFactories(
[]() {
return std::make_unique<
grpc_event_engine::experimental::ThreadyEventEngine>(
std::make_unique<
grpc_event_engine::experimental::PosixEventEngine>());
},
[]() {
return std::make_unique<
grpc_event_engine::experimental::ThreadyEventEngine>(
std::make_unique<
grpc_event_engine::experimental::PosixOracleEventEngine>());
});
grpc_event_engine::experimental::InitTimerTests();
grpc_event_engine::experimental::InitClientTests();
grpc_event_engine::experimental::InitServerTests();
// TODO(vigneshbabu): remove when the experiment is over
grpc_core::ForceEnableExperiment("event_engine_client", true);
grpc_core::ForceEnableExperiment("event_engine_listener", true);
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();
int r = RUN_ALL_TESTS();
grpc_shutdown();
return r;
}

@ -0,0 +1,33 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_package")
licenses(["notice"])
grpc_package(
name = "test/core/event_engine/thready_event_engine",
visibility = "tests",
)
grpc_cc_library(
name = "thready_event_engine",
srcs = ["thready_event_engine.cc"],
hdrs = ["thready_event_engine.h"],
deps = [
"//:event_engine_base_hdrs",
"//src/core:default_event_engine",
"//src/core:time",
],
)

@ -0,0 +1,162 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/event_engine/thready_event_engine/thready_event_engine.h"
#include <memory>
#include <string>
#include <thread>
#include <type_traits>
#include <vector>
#include "src/core/lib/gprpp/crash.h"
namespace grpc_event_engine {
namespace experimental {
void ThreadyEventEngine::Asynchronously(absl::AnyInvocable<void()> fn) {
std::thread([fn = std::move(fn)]() mutable { fn(); }).detach();
}
absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
ThreadyEventEngine::CreateListener(
Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
return impl_->CreateListener(
[this, on_accept = std::make_shared<Listener::AcceptCallback>(
std::move(on_accept))](std::unique_ptr<Endpoint> endpoint,
MemoryAllocator memory_allocator) {
Asynchronously(
[on_accept, endpoint = std::move(endpoint),
memory_allocator = std::move(memory_allocator)]() mutable {
(*on_accept)(std::move(endpoint), std::move(memory_allocator));
});
},
[this,
on_shutdown = std::move(on_shutdown)](absl::Status status) mutable {
Asynchronously([on_shutdown = std::move(on_shutdown),
status = std::move(status)]() mutable {
on_shutdown(std::move(status));
});
},
config, std::move(memory_allocator_factory));
}
EventEngine::ConnectionHandle ThreadyEventEngine::Connect(
OnConnectCallback on_connect, const ResolvedAddress& addr,
const EndpointConfig& args, MemoryAllocator memory_allocator,
Duration timeout) {
return impl_->Connect(
[this, on_connect = std::move(on_connect)](
absl::StatusOr<std::unique_ptr<Endpoint>> c) mutable {
Asynchronously(
[on_connect = std::move(on_connect), c = std::move(c)]() mutable {
on_connect(std::move(c));
});
},
addr, args, std::move(memory_allocator), timeout);
}
bool ThreadyEventEngine::CancelConnect(ConnectionHandle handle) {
return impl_->CancelConnect(handle);
}
bool ThreadyEventEngine::IsWorkerThread() {
grpc_core::Crash("we should remove this");
}
std::unique_ptr<EventEngine::DNSResolver> ThreadyEventEngine::GetDNSResolver(
const DNSResolver::ResolverOptions& options) {
return std::make_unique<ThreadyDNSResolver>(impl_->GetDNSResolver(options));
}
void ThreadyEventEngine::Run(Closure* closure) {
Run([closure]() { closure->Run(); });
}
void ThreadyEventEngine::Run(absl::AnyInvocable<void()> closure) {
Asynchronously(std::move(closure));
}
EventEngine::TaskHandle ThreadyEventEngine::RunAfter(Duration when,
Closure* closure) {
return RunAfter(when, [closure]() { closure->Run(); });
}
EventEngine::TaskHandle ThreadyEventEngine::RunAfter(
Duration when, absl::AnyInvocable<void()> closure) {
return impl_->RunAfter(when, [this, closure = std::move(closure)]() mutable {
Asynchronously(std::move(closure));
});
}
bool ThreadyEventEngine::Cancel(TaskHandle handle) {
return impl_->Cancel(handle);
}
EventEngine::DNSResolver::LookupTaskHandle
ThreadyEventEngine::ThreadyDNSResolver::LookupHostname(
LookupHostnameCallback on_resolve, absl::string_view name,
absl::string_view default_port, Duration timeout) {
return impl_->LookupHostname(
[this, on_resolve = std::move(on_resolve)](
absl::StatusOr<std::vector<ResolvedAddress>> addresses) mutable {
engine_->Asynchronously([on_resolve = std::move(on_resolve),
addresses = std::move(addresses)]() mutable {
on_resolve(std::move(addresses));
});
},
name, default_port, timeout);
}
EventEngine::DNSResolver::LookupTaskHandle
ThreadyEventEngine::ThreadyDNSResolver::LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name,
Duration timeout) {
return impl_->LookupSRV(
[this, on_resolve = std::move(on_resolve)](
absl::StatusOr<std::vector<SRVRecord>> records) mutable {
return engine_->Asynchronously(
[on_resolve = std::move(on_resolve),
records = std::move(records)]() mutable {
on_resolve(std::move(records));
});
},
name, timeout);
}
EventEngine::DNSResolver::LookupTaskHandle
ThreadyEventEngine::ThreadyDNSResolver::LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name,
Duration timeout) {
return impl_->LookupTXT(
[this, on_resolve = std::move(on_resolve)](
absl::StatusOr<std::string> record) mutable {
return engine_->Asynchronously([on_resolve = std::move(on_resolve),
record = std::move(record)]() mutable {
on_resolve(std::move(record));
});
},
name, timeout);
}
bool ThreadyEventEngine::ThreadyDNSResolver::CancelLookup(
LookupTaskHandle handle) {
return impl_->CancelLookup(handle);
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,106 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_EVENT_ENGINE_THREADY_EVENT_ENGINE_THREADY_EVENT_ENGINE_H
#define GRPC_TEST_CORE_EVENT_ENGINE_THREADY_EVENT_ENGINE_THREADY_EVENT_ENGINE_H
#include <memory>
#include <utility>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
namespace grpc_event_engine {
namespace experimental {
// An EventEngine that spawns a thread at every available opportunity:
// - Run() spawns a thread
// - RunAfter() schedules a timer that spawns a thread to run the callback
// - Endpoint operations spawn threads and then call the underlying event engine
// functions
// Implemented as a decorator around a complete EventEngine so that it need not
// deal with OS details.
// This event engine is intended to be used for testing with TSAN to maximize
// its visibility into race conditions in the calling code.
class ThreadyEventEngine final : public EventEngine {
public:
explicit ThreadyEventEngine(std::shared_ptr<EventEngine> impl)
: impl_(std::move(impl)) {}
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
override;
ConnectionHandle Connect(OnConnectCallback on_connect,
const ResolvedAddress& addr,
const EndpointConfig& args,
MemoryAllocator memory_allocator,
Duration timeout) override;
bool CancelConnect(ConnectionHandle handle) override;
bool IsWorkerThread() override;
std::unique_ptr<DNSResolver> GetDNSResolver(
const DNSResolver::ResolverOptions& options) override;
void Run(Closure* closure) override;
void Run(absl::AnyInvocable<void()> closure) override;
TaskHandle RunAfter(Duration when, Closure* closure) override;
TaskHandle RunAfter(Duration when,
absl::AnyInvocable<void()> closure) override;
bool Cancel(TaskHandle handle) override;
private:
class ThreadyDNSResolver final : public DNSResolver {
public:
explicit ThreadyDNSResolver(std::unique_ptr<DNSResolver> impl)
: impl_(std::move(impl)) {}
LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view name,
absl::string_view default_port,
Duration timeout) override;
LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name,
Duration timeout) override;
LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name,
Duration timeout) override;
bool CancelLookup(LookupTaskHandle handle) override;
private:
std::unique_ptr<DNSResolver> impl_;
ThreadyEventEngine* engine_;
};
void Asynchronously(absl::AnyInvocable<void()> fn);
std::shared_ptr<EventEngine> impl_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_TEST_CORE_EVENT_ENGINE_THREADY_EVENT_ENGINE_THREADY_EVENT_ENGINE_H

@ -7905,6 +7905,28 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "thready_posix_event_engine_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save