fix deadlock

pull/36709/head
AJ Heller 8 months ago
parent 2e8e4bd722
commit dd4ae2683e
  1. 14
      src/core/lib/event_engine/windows/windows_engine.cc
  2. 4
      src/core/lib/event_engine/windows/windows_engine.h

@ -83,7 +83,6 @@ WindowsEventEngine::ConnectionState::ConnectionState(
}
void WindowsEventEngine::ConnectionState::Start(Duration timeout) {
grpc_core::MutexLock lock(&mu_);
on_connected_cb_ =
std::make_unique<OnConnectedCallback>(engine_.get(), shared_from_this());
socket_->NotifyOnWrite(on_connected_cb_.get());
@ -94,7 +93,6 @@ void WindowsEventEngine::ConnectionState::Start(Duration timeout) {
EventEngine::OnConnectCallback
WindowsEventEngine::ConnectionState::TakeCallback() {
grpc_core::MutexLock lock(&mu_);
return std::exchange(on_connect_user_callback_, nullptr);
}
@ -350,6 +348,7 @@ bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
void WindowsEventEngine::OnConnectCompleted(
std::shared_ptr<ConnectionState> state) {
absl::StatusOr<std::unique_ptr<WindowsEndpoint>> endpoint;
EventEngine::OnConnectCallback cb;
{
// Connection attempt complete!
grpc_core::MutexLock lock(&state->mu());
@ -377,10 +376,10 @@ void WindowsEventEngine::OnConnectCompleted(
} else {
endpoint = state->FinishConnectingAndMakeEndpoint(thread_pool_.get());
}
cb = state->TakeCallback();
}
// This code should be running in a thread pool thread already, so the
// callback can be run directly.
auto cb = state->TakeCallback();
state.reset();
cb(std::move(endpoint));
}
@ -388,12 +387,13 @@ void WindowsEventEngine::OnConnectCompleted(
void WindowsEventEngine::OnDeadlineTimerFired(
std::shared_ptr<ConnectionState> connection_state) {
bool cancelled = false;
EventEngine::OnConnectCallback cb;
{
grpc_core::MutexLock lock(&connection_state->mu());
cancelled = CancelConnectFromDeadlineTimer(connection_state.get());
if (cancelled) cb = connection_state->TakeCallback();
}
if (cancelled) {
auto cb = connection_state->TakeCallback();
connection_state.reset();
cb(absl::DeadlineExceededError("Connection timed out"));
}
@ -527,7 +527,11 @@ EventEngine::ConnectionHandle WindowsEventEngine::Connect(
connection_state->AbortDeadlineTimer();
Run([connection_state = std::move(connection_state),
status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx")]() mutable {
auto cb = connection_state->TakeCallback();
EventEngine::OnConnectCallback cb;
{
grpc_core::MutexLock lock(&connection_state->mu());
cb = connection_state->TakeCallback();
}
connection_state.reset();
cb(std::move(status));
});

@ -120,11 +120,11 @@ class WindowsEventEngine : public EventEngine,
//
// This cannot be done in the constructor since shared_from_this is required
// for the callbacks to hold a ref to this object.
void Start(Duration timeout);
void Start(Duration timeout) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Returns the user's callback and resets it to nullptr to ensure it only
// runs once.
OnConnectCallback TakeCallback() ABSL_LOCKS_EXCLUDED(mu_);
OnConnectCallback TakeCallback() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Create an Endpoint, transfering held object ownership to the endpoint.
//

Loading…
Cancel
Save