From fa162d08be739f342c8e8b60dc074d1c76fb2cd9 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Tue, 28 May 2024 19:17:17 -0700 Subject: [PATCH] passes //test/cpp/end2end:rls_end2end_test --- src/core/lib/event_engine/trace.h | 8 +- .../lib/event_engine/windows/win_socket.cc | 10 +- .../event_engine/windows/windows_engine.cc | 150 +++++++++++------- .../lib/event_engine/windows/windows_engine.h | 113 ++++++++----- .../event_engine/event_engine_test_utils.cc | 17 ++ .../event_engine/event_engine_test_utils.h | 8 + test/cpp/end2end/rls_end2end_test.cc | 4 +- 7 files changed, 204 insertions(+), 106 deletions(-) diff --git a/src/core/lib/event_engine/trace.h b/src/core/lib/event_engine/trace.h index 3d0a9e1d5df..a81302281d5 100644 --- a/src/core/lib/event_engine/trace.h +++ b/src/core/lib/event_engine/trace.h @@ -27,22 +27,22 @@ extern grpc_core::TraceFlag grpc_event_engine_endpoint_trace; #define GRPC_EVENT_ENGINE_TRACE(format, ...) \ if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { \ - gpr_log(GPR_DEBUG, "(event_engine) " format, __VA_ARGS__); \ + gpr_log(GPR_ERROR, "(event_engine) " format, __VA_ARGS__); \ } #define GRPC_EVENT_ENGINE_ENDPOINT_TRACE(format, ...) \ if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_endpoint_trace)) { \ - gpr_log(GPR_DEBUG, "(event_engine endpoint) " format, __VA_ARGS__); \ + gpr_log(GPR_ERROR, "(event_engine endpoint) " format, __VA_ARGS__); \ } #define GRPC_EVENT_ENGINE_POLLER_TRACE(format, ...) \ if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_poller_trace)) { \ - gpr_log(GPR_DEBUG, "(event_engine poller) " format, __VA_ARGS__); \ + gpr_log(GPR_ERROR, "(event_engine poller) " format, __VA_ARGS__); \ } #define GRPC_EVENT_ENGINE_DNS_TRACE(format, ...) \ if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_dns_trace)) { \ - gpr_log(GPR_DEBUG, "(event_engine dns) " format, __VA_ARGS__); \ + gpr_log(GPR_ERROR, "(event_engine dns) " format, __VA_ARGS__); \ } #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_TRACE_H diff --git a/src/core/lib/event_engine/windows/win_socket.cc b/src/core/lib/event_engine/windows/win_socket.cc index d6a75770a4b..a9e360485ff 100644 --- a/src/core/lib/event_engine/windows/win_socket.cc +++ b/src/core/lib/event_engine/windows/win_socket.cc @@ -69,13 +69,15 @@ void WinSocket::Shutdown() { sizeof(guid), &DisconnectEx, sizeof(DisconnectEx), &ioctl_num_bytes, NULL, NULL); - if (status == 0) { - DisconnectEx(socket_, NULL, 0, 0); - } else { + if (status != 0) { char* utf8_message = gpr_format_message(WSAGetLastError()); - gpr_log(GPR_INFO, "Unable to retrieve DisconnectEx pointer : %s", + gpr_log(GPR_ERROR, "Unable to retrieve DisconnectEx pointer : %s", utf8_message); gpr_free(utf8_message); + } else if (DisconnectEx(socket_, NULL, 0, 0) != 0) { + char* utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "DisconnectEx failed : %s", utf8_message); + gpr_free(utf8_message); } closesocket(socket_); GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p socket closed", this); diff --git a/src/core/lib/event_engine/windows/windows_engine.cc b/src/core/lib/event_engine/windows/windows_engine.cc index 33a2a89d930..66b9b818641 100644 --- a/src/core/lib/event_engine/windows/windows_engine.cc +++ b/src/core/lib/event_engine/windows/windows_engine.cc @@ -80,47 +80,55 @@ WindowsEventEngine::ConnectionState::ConnectionState( CHECK(socket_ != nullptr); connection_handle_ = ConnectionHandle{reinterpret_cast(this), engine_->aba_token_.fetch_add(1)}; + LOG(ERROR) << "DO NOT SUBMIT: ConnectionState owned engine::" + << engine_.use_count(); } void WindowsEventEngine::ConnectionState::Start(Duration timeout) { - socket_->NotifyOnWrite(MakeOnConnectedCallback()); - timer_handle_ = engine_->RunAfter(timeout, MakeDeadlineTimerCallback()); -} - -void WindowsEventEngine::ConnectionState::RunUserCallback( - absl::StatusOr> error) ABSL_LOCKS_EXCLUDED(mu_) { - EventEngine::OnConnectCallback cb; - { - grpc_core::MutexLock lock(&mu_); - cb = std::exchange(on_connect_user_callback_, nullptr); - } - cb(std::move(error)); -} - -EventEngine::Closure* -WindowsEventEngine::ConnectionState::MakeOnConnectedCallback() { on_connected_cb_ = - std::make_unique(engine_, shared_from_this()); - return on_connected_cb_.get(); + std::make_unique(engine_.get(), shared_from_this()); + socket_->NotifyOnWrite(on_connected_cb_.get()); + deadline_timer_cb_ = std::make_unique( + engine_.get(), shared_from_this()); + timer_handle_ = engine_->RunAfter(timeout, deadline_timer_cb_.get()); } -EventEngine::Closure* -WindowsEventEngine::ConnectionState::MakeDeadlineTimerCallback() { - deadline_timer_cb_ = - std::make_unique(engine_, shared_from_this()); - return deadline_timer_cb_.get(); +EventEngine::OnConnectCallback +WindowsEventEngine::ConnectionState::TakeCallback() { + grpc_core::MutexLock lock(&mu_); + return std::exchange(on_connect_user_callback_, nullptr); } std::unique_ptr -WindowsEventEngine::ConnectionState::MakeEndpoint(ThreadPool* thread_pool) { +WindowsEventEngine::ConnectionState::FinishConnectingAndMakeEndpoint( + ThreadPool* thread_pool) { ChannelArgsEndpointConfig cfg; + // DO NOT SUBMIT: both callbacks should have been called or destroyed. + // Need to assert that the deadline timer is cancelled + if (!engine_->Cancel(timer_handle_)) { + LOG(ERROR) << "DO NOT SUBMIT: deadline timer may still run"; + } else { + LOG(ERROR) << "DO NOT SUBMIT: deadline timer cancelled, refs are released"; + deadline_timer_cb_.reset(); + } return std::make_unique(address_, std::move(socket_), std::move(allocator_), cfg, thread_pool, engine_); } +void WindowsEventEngine::ConnectionState::AbortOnConnect() { + LOG(ERROR) << "DO NOT SUBMIT: AbortOnConnect"; + on_connected_cb_.reset(); +} + +void WindowsEventEngine::ConnectionState::AbortDeadlineTimer() { + LOG(ERROR) << "DO NOT SUBMIT: AbortDeadlineTimer"; + deadline_timer_cb_.reset(); +} + void WindowsEventEngine::ConnectionState::OnConnectedCallback::Run() { - CHECK_NE(connection_state_, nullptr) + LOG(ERROR) << "DO NOT SUBMIT: OnConnectedCallback::Run"; + DCHECK_NE(connection_state_, nullptr) << "ConnectionState::OnConnectedCallback::" << this << " has already run. It should only ever run once."; bool has_run; @@ -131,16 +139,24 @@ void WindowsEventEngine::ConnectionState::OnConnectedCallback::Run() { // This could race with the deadline timer. If so, the OnConnectCompleted // callback should not run, and the refs should be released. if (has_run) { + LOG(ERROR) << "DO NOT SUBMIT: OnConnectedCallback resetting " + "connection_state_"; connection_state_.reset(); - engine_.reset(); return; } engine_->OnConnectCompleted(std::move(connection_state_)); - // Reset to release the ref. - engine_.reset(); } +// DO NOT SUBMIT: Need to get the deadline timer to release its ref if it's +// never going to be called. bazel test --test_output=streamed +// --config=windows_dbg --runs_per_test=1 --test_env=grpc_verbosity=debug +// --test_env=GRPC_EXPERIMENTS=event_engine_client +// --test_env=GRPC_TRACE=event_engine +// --test_filter='*RlsEnd2endTest.ConnectivityStateTransientFailure*' +// --test_arg=--v=10 //test/cpp/end2end:rls_end2end_test + void WindowsEventEngine::ConnectionState::DeadlineTimerCallback::Run() { + LOG(ERROR) << "DO NOT SUBMIT: DeadlineTimerCallback::Run"; CHECK_NE(connection_state_, nullptr) << "ConnectionState::DeadlineTimerCallback::" << this << " has already run. It should only ever run once."; @@ -152,13 +168,12 @@ void WindowsEventEngine::ConnectionState::DeadlineTimerCallback::Run() { // This could race with the deadline timer. If so, the OnConnectCompleted // callback should not run, and the refs should be released. if (has_run) { + LOG(ERROR) << "DO NOT SUBMIT: DeadlineTimerCallback resetting " + "connection_state_"; connection_state_.reset(); - engine_.reset(); return; } engine_->OnDeadlineTimerFired(std::move(connection_state_)); - // Reset to release the ref. - engine_.reset(); } // ---- IOCPWorkClosure ---- @@ -358,7 +373,6 @@ bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); } void WindowsEventEngine::OnConnectCompleted( std::shared_ptr state) { absl::StatusOr> endpoint; - EventEngine::OnConnectCallback cb; { // Connection attempt complete! grpc_core::MutexLock lock(&state->mu()); @@ -376,20 +390,26 @@ void WindowsEventEngine::OnConnectCompleted( "Not accepting connection since the deadline timer has fired"); return; } + // Release refs held by the deadline timer. + state->AbortDeadlineTimer(); const auto& overlapped_result = state->socket()->write_info()->result(); if (!overlapped_result.error_status.ok()) { state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx failure"); endpoint = overlapped_result.error_status; + LOG(ERROR) << "DO NOT SUBMIT: error on connect: " << endpoint.status(); } else if (overlapped_result.wsa_error != 0) { state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx failure"); endpoint = GRPC_WSA_ERROR(overlapped_result.wsa_error, "ConnectEx"); + LOG(ERROR) << "DO NOT SUBMIT: error on connect: " << endpoint.status(); } else { - endpoint = state->MakeEndpoint(thread_pool_.get()); + endpoint = state->FinishConnectingAndMakeEndpoint(thread_pool_.get()); } } // This code should be running in a thread pool thread already, so the // callback can be run directly. - state->RunUserCallback(std::move(endpoint)); + auto cb = state->TakeCallback(); + state.reset(); + cb(std::move(endpoint)); } void WindowsEventEngine::OnDeadlineTimerFired( @@ -400,8 +420,9 @@ void WindowsEventEngine::OnDeadlineTimerFired( cancelled = CancelConnectFromDeadlineTimer(connection_state.get()); } if (cancelled) { - connection_state->RunUserCallback( - absl::DeadlineExceededError("Connection timed out")); + auto cb = connection_state->TakeCallback(); + connection_state.reset(); + cb(absl::DeadlineExceededError("Connection timed out")); return; } GRPC_EVENT_ENGINE_TRACE( @@ -509,29 +530,39 @@ EventEngine::ConnectionHandle WindowsEventEngine::Connect( address.size(), nullptr, 0, nullptr, info->overlapped()); // It wouldn't be unusual to get a success immediately. But we'll still get an // IOCP notification, so let's ignore it. - if (!success) { - int last_error = WSAGetLastError(); - if (last_error != ERROR_IO_PENDING) { - if (!Cancel(connection_state->timer_handle())) { - return EventEngine::ConnectionHandle::kInvalid; - } - connection_state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx"); - { - grpc_core::MutexLock connection_handle_lock(&connection_mu_); - CHECK(known_connection_handles_.erase( - connection_state->connection_handle()) == 1); - } - Run([connection_state = std::move(connection_state), - status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx")]() mutable { - connection_state->RunUserCallback(status); - }); - return EventEngine::ConnectionHandle::kInvalid; - } + if (success) return connection_state->connection_handle(); + // Otherwise, we need to handle an error or IO Event. + int last_error = WSAGetLastError(); + if (last_error == ERROR_IO_PENDING) { + // Overlapped I/O operation is in progress. + return connection_state->connection_handle(); } - return connection_state->connection_handle(); + // Abort the connection. + // The on-connect callback won't run, so we must clean up its state. + connection_state->AbortOnConnect(); + { + grpc_core::MutexLock connection_handle_lock(&connection_mu_); + CHECK(known_connection_handles_.erase( + connection_state->connection_handle()) == 1); + } + connection_state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx"); + if (!Cancel(connection_state->timer_handle())) { + // The deadline timer will run, or is running. + return EventEngine::ConnectionHandle::kInvalid; + } + // The deadline timer won't run, so we must clean up its state. + connection_state->AbortDeadlineTimer(); + Run([connection_state = std::move(connection_state), + status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx")]() mutable { + auto cb = connection_state->TakeCallback(); + connection_state.reset(); + cb(std::move(status)); + }); + return EventEngine::ConnectionHandle::kInvalid; } bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) { + LOG(ERROR) << "DO NOT SUBMIT: CancelConnect for handle " << handle; if (handle == EventEngine::ConnectionHandle::kInvalid) { GRPC_EVENT_ENGINE_TRACE("%s", "Attempted to cancel an invalid connection handle"); @@ -540,17 +571,20 @@ bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) { // Erase the connection handle, which may be unknown { grpc_core::MutexLock lock(&connection_mu_); - if (!known_connection_handles_.contains(handle)) { + if (known_connection_handles_.erase(handle) != 1) { GRPC_EVENT_ENGINE_TRACE( "Unknown connection handle: %s", HandleToString(handle).c_str()); return false; } - known_connection_handles_.erase(handle); } auto* connection_state = reinterpret_cast(handle.keys[0]); grpc_core::MutexLock state_lock(&connection_state->mu()); + // The connection cannot be cancelled if the deadline timer is already firing. if (!Cancel(connection_state->timer_handle())) return false; + // The deadline timer was cancelled, so we must clean up its state. The + // on-connect callback will run when the + connection_state->AbortDeadlineTimer(); return CancelConnectInternalStateLocked(connection_state); } @@ -561,6 +595,8 @@ bool WindowsEventEngine::CancelConnectFromDeadlineTimer( grpc_core::MutexLock lock(&connection_mu_); if (known_connection_handles_.erase( connection_state->connection_handle()) != 1) { + LOG(ERROR) << "DO NOT SUBMIT: could not find connection handle, " + "OnConnectCompleted must be running already"; return false; } } diff --git a/src/core/lib/event_engine/windows/windows_engine.h b/src/core/lib/event_engine/windows/windows_engine.h index 0ef63ae1206..fa206dd0aed 100644 --- a/src/core/lib/event_engine/windows/windows_engine.h +++ b/src/core/lib/event_engine/windows/windows_engine.h @@ -23,6 +23,7 @@ #include #include +#include "absl/log/log.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" @@ -45,8 +46,6 @@ namespace grpc_event_engine { namespace experimental { -// TODO(ctiller): KeepsGrpcInitialized is an interim measure to ensure that -// EventEngine is shut down before we shut down iomgr. class WindowsEventEngine : public EventEngine, public grpc_core::KeepsGrpcInitialized { public: @@ -106,9 +105,11 @@ class WindowsEventEngine : public EventEngine, IOCP* poller() { return &iocp_; } private: - // State of an active connection. - // Managed by a shared_ptr, owned exclusively by the timeout callback and the - // OnConnectCompleted callback herein. + // The state of an active connection. + // + // This object is managed by a shared_ptr, which is owned by: + // 1) the deadline timer callback, and + // 2) the OnConnectCompleted callback. class ConnectionState : public std::enable_shared_from_this { public: ConnectionState(std::shared_ptr engine, @@ -116,16 +117,34 @@ class WindowsEventEngine : public EventEngine, EventEngine::ResolvedAddress address, MemoryAllocator allocator, EventEngine::OnConnectCallback on_connect_user_callback); + ~ConnectionState() { + LOG(ERROR) << "DO NOT SUBMIT: ~ConnectionState. ee count::" + << engine_.use_count() - 1; + } // Starts the deadline timer, and sets up the socket to notify on writes. + // + // This cannot be done in the constructor since shared_from_this is required + // for the callbacks to hold a ref to this object. void Start(Duration timeout); - // Runs the user callback and resets it to nullptr to ensure it only runs + // Returns the user's callback and resets it to nullptr to ensure it only runs // once. - void RunUserCallback(absl::StatusOr> error) - ABSL_LOCKS_EXCLUDED(mu_); - - std::unique_ptr MakeEndpoint(ThreadPool* thread_pool); + // DO NOT SUBMIT: this probably should not be necessary. + OnConnectCallback TakeCallback() ABSL_LOCKS_EXCLUDED(mu_); + + // Create an Endpoint, transfering held object ownership to the endpoint. + // + // This can only be called once, and the connection state is no longer valid + // after an endpoint has been created. + // DO NOT SUBMIT: is this a good API? Who should own this responsibility? + std::unique_ptr FinishConnectingAndMakeEndpoint( + ThreadPool* thread_pool); + + // Release all refs to the on-connect callback. + void AbortOnConnect(); + // Release all refs to the deadline timer callback. + void AbortDeadlineTimer(); WinSocket* socket() { return socket_.get(); } @@ -140,68 +159,82 @@ class WindowsEventEngine : public EventEngine, friend std::ostream& operator<<(std::ostream& out, const ConnectionState& connection_state); - // Creates the notify on write callback. - // Holds refs to the ConnectionState and WindowsEventEngine - EventEngine::Closure* MakeOnConnectedCallback() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - - // Creates the deadline timer callback. - // Holds refs to the ConnectionState and WindowsEventEngine - EventEngine::Closure* MakeDeadlineTimerCallback() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - + // Stateful closure for the endpoint's on-connect callback. + // + // Once created, this closure must be Run or deleted to release the held + // refs. class OnConnectedCallback : public EventEngine::Closure { public: - OnConnectedCallback(std::shared_ptr engine, + OnConnectedCallback(WindowsEventEngine* engine, std::shared_ptr connection_state) - : engine_(std::move(engine)), - connection_state_(std::move(connection_state)) {} + : engine_(engine), connection_state_(std::move(connection_state)) {} + ~OnConnectedCallback() override { + LOG(ERROR) << "DO NOT SUBMIT: ~OnConnectedCallback"; + } - // Runs the WindowsEngine's OnConnectCompleted if the deadline timer + // Runs the WindowsEventEngine's OnConnectCompleted if the deadline timer // hasn't fired first. void Run(); private: - std::shared_ptr engine_; + WindowsEventEngine* engine_; std::shared_ptr connection_state_; }; + // Stateful closure for the deadline timer. + // + // Once created, this closure must be Run or deleted to release the held + // refs. class DeadlineTimerCallback : public EventEngine::Closure { public: - DeadlineTimerCallback(std::shared_ptr engine, + DeadlineTimerCallback(WindowsEventEngine* engine, std::shared_ptr connection_state) - : engine_(std::move(engine)), - connection_state_(std::move(connection_state)) {} + : engine_(engine), connection_state_(std::move(connection_state)) {} + ~DeadlineTimerCallback() override { + LOG(ERROR) << "DO NOT SUBMIT: ~DeadlineTimerCallback"; + } - // Runs the WindowsEngine's OnDeadlineTimerFired if the deadline timer - // hasn't fired first. + // Runs the WindowsEventEngine's OnDeadlineTimerFired if the deadline + // timer hasn't fired first. void Run(); private: - std::shared_ptr engine_; + WindowsEventEngine* engine_; std::shared_ptr connection_state_; }; // everything is guarded by mu_; grpc_core::Mutex mu_ ABSL_ACQUIRED_BEFORE(WindowsEventEngine::connection_mu_); + // Endpoint connection state. std::unique_ptr socket_ ABSL_GUARDED_BY(mu_); EventEngine::ResolvedAddress address_ ABSL_GUARDED_BY(mu_); MemoryAllocator allocator_ ABSL_GUARDED_BY(mu_); - std::shared_ptr engine_ ABSL_GUARDED_BY(mu_); EventEngine::OnConnectCallback on_connect_user_callback_ ABSL_GUARDED_BY(mu_); - EventEngine::ConnectionHandle connection_handle_ ABSL_GUARDED_BY(mu_); - EventEngine::TaskHandle timer_handle_ ABSL_GUARDED_BY(mu_) = - EventEngine::TaskHandle::kInvalid; + // This guarantees the EventEngine survives long enough to execute these + // deadline timer or on-connect callbacks. + std::shared_ptr engine_ ABSL_GUARDED_BY(mu_); + // Owned closures. These hold refs to this object. std::unique_ptr on_connected_cb_ ABSL_GUARDED_BY(mu_); std::unique_ptr deadline_timer_cb_ ABSL_GUARDED_BY(mu_); + // Their respective method handles. + EventEngine::ConnectionHandle connection_handle_ ABSL_GUARDED_BY(mu_) = + EventEngine::ConnectionHandle::kInvalid; + EventEngine::TaskHandle timer_handle_ ABSL_GUARDED_BY(mu_) = + EventEngine::TaskHandle::kInvalid; + // Flag to ensure that only one of the even closures will complete its + // responsibilities. bool has_run_ ABSL_GUARDED_BY(mu_) = false; }; + + // Required for the function to see the private ConnectionState type. friend std::ostream& operator<<(std::ostream& out, const ConnectionState& connection_state); + struct TimerClosure; + // A poll worker which schedules itself unless kicked class IOCPWorkClosure : public EventEngine::Closure { public: @@ -217,13 +250,16 @@ class WindowsEventEngine : public EventEngine, IOCP* iocp_; }; + // Called via IOCP notifications when a connection is ready to be processed. + // Either this or the deadline timer will run, never both. void OnConnectCompleted(std::shared_ptr state); + // Called after a timeout when no connection has been established. + // Either this or the on-connect callback will run, never both. void OnDeadlineTimerFired(std::shared_ptr state); - // CancelConnect called from within the deadline timer. - // In this case, the connection_state->mu_ is already locked, and timer - // cancellation is not possible. + // CancelConnect, called from within the deadline timer. + // Timer cancellation is not possible. bool CancelConnectFromDeadlineTimer(ConnectionState* connection_state) ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu_); @@ -232,7 +268,6 @@ class WindowsEventEngine : public EventEngine, bool CancelConnectInternalStateLocked(ConnectionState* connection_state) ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu_); - struct TimerClosure; EventEngine::TaskHandle RunAfterInternal(Duration when, absl::AnyInvocable cb); grpc_core::Mutex task_mu_; diff --git a/test/core/event_engine/event_engine_test_utils.cc b/test/core/event_engine/event_engine_test_utils.cc index 32ab6ac5276..5e3924d063a 100644 --- a/test/core/event_engine/event_engine_test_utils.cc +++ b/test/core/event_engine/event_engine_test_utils.cc @@ -38,6 +38,7 @@ #include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/resource_quota/memory_quota.h" @@ -88,6 +89,22 @@ void WaitForSingleOwner(std::shared_ptr engine) { } } +void WaitForSingleOwner(std::shared_ptr engine, + EventEngine::Duration timeout) { + int n = 0; + auto start = std::chrono::system_clock::now(); + while (engine.use_count() > 1) { + ++n; + if (n % 100 == 0) AsanAssertNoLeaks(); + if (std::chrono::system_clock::now() - start > timeout) { + grpc_core::Crash("Timed out waiting for a single EventEngine owner"); + } + GRPC_LOG_EVERY_N_SEC(2, GPR_INFO, "engine.use_count() = %ld", + engine.use_count()); + absl::SleepFor(absl::Milliseconds(100)); + } +} + void AppendStringToSliceBuffer(SliceBuffer* buf, absl::string_view data) { buf->Append(Slice::FromCopiedString(data)); } diff --git a/test/core/event_engine/event_engine_test_utils.h b/test/core/event_engine/event_engine_test_utils.h index debef7a0449..e1d2fc61dd2 100644 --- a/test/core/event_engine/event_engine_test_utils.h +++ b/test/core/event_engine/event_engine_test_utils.h @@ -52,6 +52,14 @@ std::string GetNextSendMessage(); // Usage: WaitForSingleOwner(std::move(engine)) void WaitForSingleOwner(std::shared_ptr engine); +// Waits until the use_count of the EventEngine shared_ptr has reached 1 +// and returns. +// Callers must give up their ref, or this method will block forever. +// This version will CRASH after the given timeout +// Usage: WaitForSingleOwner(std::move(engine), 30s) +void WaitForSingleOwner(std::shared_ptr engine, + EventEngine::Duration timeout); + // A helper method to exchange data between two endpoints. It is assumed // that both endpoints are connected. The data (specified as a string) is // written by the sender_endpoint and read by the receiver_endpoint. It diff --git a/test/cpp/end2end/rls_end2end_test.cc b/test/cpp/end2end/rls_end2end_test.cc index ca6616fe6b7..c376d9787d4 100644 --- a/test/cpp/end2end/rls_end2end_test.cc +++ b/test/cpp/end2end/rls_end2end_test.cc @@ -176,8 +176,8 @@ class RlsEnd2endTest : public ::testing::Test { static void TearDownTestSuite() { grpc_shutdown_blocking(); - WaitForSingleOwner( - grpc_event_engine::experimental::GetDefaultEventEngine()); + WaitForSingleOwner(grpc_event_engine::experimental::GetDefaultEventEngine(), + std::chrono::seconds(10)); grpc_core::CoreConfiguration::Reset(); }