diff --git a/src/core/lib/event_engine/windows/win_socket.cc b/src/core/lib/event_engine/windows/win_socket.cc index 61917e7ac33..6480a01d8e3 100644 --- a/src/core/lib/event_engine/windows/win_socket.cc +++ b/src/core/lib/event_engine/windows/win_socket.cc @@ -93,12 +93,20 @@ void WinSocket::NotifyOnReady(OpState& info, EventEngine::Closure* closure) { thread_pool_->Run(closure); return; }; - if (std::exchange(info.has_pending_iocp_, false)) { - thread_pool_->Run(closure); - } else { - EventEngine::Closure* prev = nullptr; - GPR_ASSERT(info.closure_.compare_exchange_strong(prev, closure)); + { + grpc_core::MutexLock lock(&info.ready_mu_); + if (!std::exchange(info.has_pending_iocp_, false)) { + // No overlapped results have been returned for this socket. Assert that + // there is no notification callback yet, set the notification callback, + // and return. + EventEngine::Closure* prev = nullptr; + GPR_ASSERT(std::exchange(info.closure_, closure) == nullptr); + return; + } } + // Overlapped results are already available. Schedule the callback + // immediately. + thread_pool_->Run(closure); } void WinSocket::NotifyOnRead(EventEngine::Closure* on_read) { @@ -117,13 +125,16 @@ WinSocket::OpState::OpState(WinSocket* win_socket) noexcept } void WinSocket::OpState::SetReady() { - GPR_ASSERT(!has_pending_iocp_); - auto* closure = closure_.exchange(nullptr); - if (closure) { - win_socket_->thread_pool_->Run(closure); - } else { - has_pending_iocp_ = true; + EventEngine::Closure* closure; + { + grpc_core::MutexLock lock(&ready_mu_); + GPR_ASSERT(!has_pending_iocp_); + closure = std::exchange(closure_, nullptr); + if (!closure) { + has_pending_iocp_ = true; + } } + if (closure) win_socket_->thread_pool_->Run(closure); } void WinSocket::OpState::SetError(int wsa_error) { diff --git a/src/core/lib/event_engine/windows/win_socket.h b/src/core/lib/event_engine/windows/win_socket.h index 9bb1b8d0116..9b96495d3ee 100644 --- a/src/core/lib/event_engine/windows/win_socket.h +++ b/src/core/lib/event_engine/windows/win_socket.h @@ -45,7 +45,7 @@ class WinSocket { // If a callback is already primed for notification, it will be executed via // the WinSocket's ThreadPool. Otherwise, a "pending iocp" flag will // be set. - void SetReady(); + void SetReady() ABSL_LOCKS_EXCLUDED(ready_mu_); // Set error results for a completed op void SetError(int wsa_error); // Set an OverlappedResult. Useful when WSARecv returns immediately. @@ -67,7 +67,8 @@ class WinSocket { OVERLAPPED overlapped_; WinSocket* win_socket_ = nullptr; - std::atomic closure_{nullptr}; + grpc_core::Mutex ready_mu_; + EventEngine::Closure* closure_ ABSL_GUARDED_BY(ready_mu_) = nullptr; bool has_pending_iocp_ = false; OverlappedResult result_; }; @@ -78,8 +79,10 @@ class WinSocket { // - The IOCP already completed in the background, and we need to call // the callback now. // - The IOCP hasn't completed yet, and we're queuing it for later. - void NotifyOnRead(EventEngine::Closure* on_read); - void NotifyOnWrite(EventEngine::Closure* on_write); + void NotifyOnRead(EventEngine::Closure* on_read) + ABSL_LOCKS_EXCLUDED(read_info_.ready_mu_); + void NotifyOnWrite(EventEngine::Closure* on_write) + ABSL_LOCKS_EXCLUDED(write_info_.ready_mu_); bool IsShutdown(); // Shutdown socket operations, but do not delete the WinSocket. // Connections will be disconnected, and the socket will be closed. @@ -100,7 +103,8 @@ class WinSocket { SOCKET raw_socket(); private: - void NotifyOnReady(OpState& info, EventEngine::Closure* closure); + void NotifyOnReady(OpState& info, EventEngine::Closure* closure) + ABSL_LOCKS_EXCLUDED(info.ready_mu_); SOCKET socket_; std::atomic is_shutdown_{false}; diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 0d4b185d5f1..5ef0142604c 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -886,12 +886,6 @@ void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool internal) { -// TODO(hork): remove when the listener flake is identified -#ifdef GPR_WINDOWS - if (grpc_core::IsEventEngineListenerEnabled()) { - gpr_log(GPR_ERROR, "cq_end_op called for tag %d (0x%p)", tag, tag); - } -#endif cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); }