mirror of https://github.com/grpc/grpc.git
Make GetDefaultEventEngine return a shared_ptr (#30280)
This works around valgrind memory leaks by giving EventEngines a fixed lifetime. We eventually want ref-counted EventEngines internally, so this is a step in the right direction as well.pull/30555/head^2
parent
82bf9d22fb
commit
4df74f2b4c
42 changed files with 614 additions and 180 deletions
@ -0,0 +1,13 @@ |
|||||||
|
{ |
||||||
|
static Posix NativeDNSResolver |
||||||
|
Memcheck:Leak |
||||||
|
match-leak-kinds: possible |
||||||
|
... |
||||||
|
fun:pthread_create@@GLIBC_2.2.5 |
||||||
|
... |
||||||
|
fun:_ZN17grpc_event_engine12experimental21GetDefaultEventEngineEv |
||||||
|
fun:_ZN9grpc_core17NativeDNSResolverC1Ev |
||||||
|
fun:_ZN9grpc_core17NativeDNSResolver11GetOrCreateEv |
||||||
|
... |
||||||
|
} |
||||||
|
|
@ -0,0 +1,129 @@ |
|||||||
|
// Copyright 2022 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <thread> |
||||||
|
|
||||||
|
#include <gmock/gmock.h> |
||||||
|
#include <gtest/gtest.h> |
||||||
|
|
||||||
|
#include <grpc/event_engine/event_engine.h> |
||||||
|
#include <grpc/grpc.h> |
||||||
|
|
||||||
|
#include "src/core/lib/event_engine/default_event_engine.h" |
||||||
|
#include "src/core/lib/gprpp/sync.h" |
||||||
|
#include "test/core/util/test_config.h" |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
using ::grpc_event_engine::experimental::EventEngine; |
||||||
|
using ::grpc_event_engine::experimental::GetDefaultEventEngine; |
||||||
|
|
||||||
|
class DefaultEngineTest : public testing::Test { |
||||||
|
protected: |
||||||
|
// Does nothing, fills space that a nullptr could not
|
||||||
|
class FakeEventEngine : public EventEngine { |
||||||
|
public: |
||||||
|
FakeEventEngine() = default; |
||||||
|
~FakeEventEngine() override = default; |
||||||
|
absl::StatusOr<std::unique_ptr<Listener>> CreateListener( |
||||||
|
Listener::AcceptCallback /* on_accept */, |
||||||
|
absl::AnyInvocable<void(absl::Status)> /* on_shutdown */, |
||||||
|
const grpc_event_engine::experimental::EndpointConfig& /* config */, |
||||||
|
std::unique_ptr< |
||||||
|
grpc_event_engine::experimental:: |
||||||
|
MemoryAllocatorFactory> /* memory_allocator_factory */) |
||||||
|
override { |
||||||
|
return absl::UnimplementedError("test"); |
||||||
|
}; |
||||||
|
ConnectionHandle Connect( |
||||||
|
OnConnectCallback /* on_connect */, const ResolvedAddress& /* addr */, |
||||||
|
const grpc_event_engine::experimental::EndpointConfig& /* args */, |
||||||
|
grpc_event_engine::experimental::MemoryAllocator /* memory_allocator */, |
||||||
|
Duration /* timeout */) override { |
||||||
|
return {-1, -1}; |
||||||
|
}; |
||||||
|
bool CancelConnect(ConnectionHandle /* handle */) override { |
||||||
|
return false; |
||||||
|
}; |
||||||
|
bool IsWorkerThread() override { return false; }; |
||||||
|
std::unique_ptr<DNSResolver> GetDNSResolver( |
||||||
|
const DNSResolver::ResolverOptions& /* options */) override { |
||||||
|
return nullptr; |
||||||
|
}; |
||||||
|
void Run(Closure* /* closure */) override{}; |
||||||
|
void Run(absl::AnyInvocable<void()> /* closure */) override{}; |
||||||
|
TaskHandle RunAfter(Duration /* when */, Closure* /* closure */) override { |
||||||
|
return {-1, -1}; |
||||||
|
} |
||||||
|
TaskHandle RunAfter(Duration /* when */, |
||||||
|
absl::AnyInvocable<void()> /* closure */) override { |
||||||
|
return {-1, -1}; |
||||||
|
} |
||||||
|
bool Cancel(TaskHandle /* handle */) override { return false; }; |
||||||
|
}; |
||||||
|
}; |
||||||
|
|
||||||
|
TEST_F(DefaultEngineTest, SharedPtrGlobalEventEngineLifetimesAreValid) { |
||||||
|
int create_count = 0; |
||||||
|
grpc_event_engine::experimental::SetEventEngineFactory([&create_count] { |
||||||
|
++create_count; |
||||||
|
return absl::make_unique<FakeEventEngine>(); |
||||||
|
}); |
||||||
|
std::shared_ptr<EventEngine> ee2; |
||||||
|
{ |
||||||
|
std::shared_ptr<EventEngine> ee1 = GetDefaultEventEngine(); |
||||||
|
ASSERT_EQ(1, create_count); |
||||||
|
ee2 = GetDefaultEventEngine(); |
||||||
|
ASSERT_EQ(1, create_count); |
||||||
|
ASSERT_EQ(ee2.use_count(), 2); |
||||||
|
} |
||||||
|
// Ensure the first shared_ptr did not delete the global
|
||||||
|
ASSERT_TRUE(ee2.unique()); |
||||||
|
ASSERT_FALSE(ee2->IsWorkerThread()); // useful for ASAN
|
||||||
|
// destroy the global engine via the last shared_ptr, and create a new one.
|
||||||
|
ee2.reset(); |
||||||
|
ee2 = GetDefaultEventEngine(); |
||||||
|
ASSERT_EQ(2, create_count); |
||||||
|
ASSERT_TRUE(ee2.unique()); |
||||||
|
grpc_event_engine::experimental::RevertToDefaultEventEngineFactory(); |
||||||
|
} |
||||||
|
|
||||||
|
TEST_F(DefaultEngineTest, StressTestSharedPtr) { |
||||||
|
constexpr int thread_count = 13; |
||||||
|
constexpr absl::Duration spin_time = absl::Seconds(3); |
||||||
|
std::vector<std::thread> threads; |
||||||
|
threads.reserve(thread_count); |
||||||
|
for (int i = 0; i < thread_count; i++) { |
||||||
|
threads.emplace_back([&spin_time] { |
||||||
|
auto timeout = absl::Now() + spin_time; |
||||||
|
do { |
||||||
|
GetDefaultEventEngine().reset(); |
||||||
|
} while (timeout > absl::Now()); |
||||||
|
}); |
||||||
|
} |
||||||
|
for (auto& thd : threads) { |
||||||
|
thd.join(); |
||||||
|
} |
||||||
|
} |
||||||
|
} // namespace
|
||||||
|
|
||||||
|
int main(int argc, char** argv) { |
||||||
|
testing::InitGoogleTest(&argc, argv); |
||||||
|
grpc::testing::TestEnvironment env(&argc, argv); |
||||||
|
grpc_init(); |
||||||
|
auto result = RUN_ALL_TESTS(); |
||||||
|
grpc_shutdown(); |
||||||
|
return result; |
||||||
|
} |
@ -0,0 +1,79 @@ |
|||||||
|
// Copyright 2022 The gRPC Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <gtest/gtest.h> |
||||||
|
|
||||||
|
#include <grpc/event_engine/event_engine.h> |
||||||
|
#include <grpc/grpc.h> |
||||||
|
|
||||||
|
#include "src/core/lib/event_engine/default_event_engine.h" |
||||||
|
#include "test/core/event_engine/util/aborting_event_engine.h" |
||||||
|
#include "test/core/util/test_config.h" |
||||||
|
|
||||||
|
namespace { |
||||||
|
using ::grpc_event_engine::experimental::AbortingEventEngine; |
||||||
|
using ::grpc_event_engine::experimental::EventEngine; |
||||||
|
using ::grpc_event_engine::experimental::EventEngineFactoryReset; |
||||||
|
using ::grpc_event_engine::experimental::GetDefaultEventEngine; |
||||||
|
using ::grpc_event_engine::experimental::SetEventEngineFactory; |
||||||
|
|
||||||
|
class EventEngineFactoryTest : public testing::Test { |
||||||
|
public: |
||||||
|
EventEngineFactoryTest() = default; |
||||||
|
~EventEngineFactoryTest() { EventEngineFactoryReset(); } |
||||||
|
}; |
||||||
|
|
||||||
|
TEST_F(EventEngineFactoryTest, CustomFactoryIsUsed) { |
||||||
|
int counter{0}; |
||||||
|
SetEventEngineFactory([&counter] { |
||||||
|
++counter; |
||||||
|
return absl::make_unique<AbortingEventEngine>(); |
||||||
|
}); |
||||||
|
auto ee1 = GetDefaultEventEngine(); |
||||||
|
ASSERT_EQ(counter, 1); |
||||||
|
auto ee2 = GetDefaultEventEngine(); |
||||||
|
ASSERT_EQ(counter, 1); |
||||||
|
ASSERT_EQ(ee1, ee2); |
||||||
|
} |
||||||
|
|
||||||
|
TEST_F(EventEngineFactoryTest, FactoryResetWorks) { |
||||||
|
// eliminate a global default if one has been created already.
|
||||||
|
EventEngineFactoryReset(); |
||||||
|
int counter{0}; |
||||||
|
SetEventEngineFactory([&counter]() -> std::unique_ptr<EventEngine> { |
||||||
|
// called at most twice;
|
||||||
|
EXPECT_LE(++counter, 2); |
||||||
|
return absl::make_unique<AbortingEventEngine>(); |
||||||
|
}); |
||||||
|
auto custom_ee = GetDefaultEventEngine(); |
||||||
|
ASSERT_EQ(counter, 1); |
||||||
|
auto same_ee = GetDefaultEventEngine(); |
||||||
|
ASSERT_EQ(custom_ee, same_ee); |
||||||
|
ASSERT_EQ(counter, 1); |
||||||
|
EventEngineFactoryReset(); |
||||||
|
auto default_ee = GetDefaultEventEngine(); |
||||||
|
ASSERT_NE(custom_ee, default_ee); |
||||||
|
} |
||||||
|
} // namespace
|
||||||
|
|
||||||
|
int main(int argc, char** argv) { |
||||||
|
testing::InitGoogleTest(&argc, argv); |
||||||
|
grpc::testing::TestEnvironment env(&argc, argv); |
||||||
|
grpc_init(); |
||||||
|
auto result = RUN_ALL_TESTS(); |
||||||
|
grpc_shutdown(); |
||||||
|
return result; |
||||||
|
} |
Loading…
Reference in new issue