[EventEngine] Fix WinSocket/IOCP notification race (#34497)

It was possible for threads that call `WinSocket::NotifyOnRead` and
`WinSocket::NotifyOnWrite` to race against IOCP poller threads, causing
poller events to be missed.

In the most common usage, in some thread (E), the Endpoint would make an
async (overlapped) read or write using `WSARecv` or `WSASend`
respectively, then use the socket's `NotifyOn*` methods to have
callbacks executed when data was ready. If data was already available,
those callbacks would be scheduled for execution immediately. Meanwhile,
if overlapped events came in for some socket, some IOCP poller thread
(P) would inform the socket that data was ready, and if notification
callbacks were already present, they would be scheduled for execution
immediately. It was possible for thread (E) to see no data available,
and thread (P) to not see any notification callbacks registered. This
resulted in registered callbacks that would never be called, for data
that had already been received.
pull/34607/head
AJ Heller 1 year ago committed by GitHub
parent 3ebdb07203
commit 2f5449d36d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      src/core/lib/event_engine/windows/win_socket.cc
  2. 14
      src/core/lib/event_engine/windows/win_socket.h
  3. 6
      src/core/lib/surface/completion_queue.cc

@ -93,12 +93,20 @@ void WinSocket::NotifyOnReady(OpState& info, EventEngine::Closure* closure) {
thread_pool_->Run(closure);
return;
};
if (std::exchange(info.has_pending_iocp_, false)) {
thread_pool_->Run(closure);
} else {
EventEngine::Closure* prev = nullptr;
GPR_ASSERT(info.closure_.compare_exchange_strong(prev, closure));
{
grpc_core::MutexLock lock(&info.ready_mu_);
if (!std::exchange(info.has_pending_iocp_, false)) {
// No overlapped results have been returned for this socket. Assert that
// there is no notification callback yet, set the notification callback,
// and return.
EventEngine::Closure* prev = nullptr;
GPR_ASSERT(std::exchange(info.closure_, closure) == nullptr);
return;
}
}
// Overlapped results are already available. Schedule the callback
// immediately.
thread_pool_->Run(closure);
}
void WinSocket::NotifyOnRead(EventEngine::Closure* on_read) {
@ -117,13 +125,16 @@ WinSocket::OpState::OpState(WinSocket* win_socket) noexcept
}
void WinSocket::OpState::SetReady() {
GPR_ASSERT(!has_pending_iocp_);
auto* closure = closure_.exchange(nullptr);
if (closure) {
win_socket_->thread_pool_->Run(closure);
} else {
has_pending_iocp_ = true;
EventEngine::Closure* closure;
{
grpc_core::MutexLock lock(&ready_mu_);
GPR_ASSERT(!has_pending_iocp_);
closure = std::exchange(closure_, nullptr);
if (!closure) {
has_pending_iocp_ = true;
}
}
if (closure) win_socket_->thread_pool_->Run(closure);
}
void WinSocket::OpState::SetError(int wsa_error) {

@ -45,7 +45,7 @@ class WinSocket {
// If a callback is already primed for notification, it will be executed via
// the WinSocket's ThreadPool. Otherwise, a "pending iocp" flag will
// be set.
void SetReady();
void SetReady() ABSL_LOCKS_EXCLUDED(ready_mu_);
// Set error results for a completed op
void SetError(int wsa_error);
// Set an OverlappedResult. Useful when WSARecv returns immediately.
@ -67,7 +67,8 @@ class WinSocket {
OVERLAPPED overlapped_;
WinSocket* win_socket_ = nullptr;
std::atomic<EventEngine::Closure*> closure_{nullptr};
grpc_core::Mutex ready_mu_;
EventEngine::Closure* closure_ ABSL_GUARDED_BY(ready_mu_) = nullptr;
bool has_pending_iocp_ = false;
OverlappedResult result_;
};
@ -78,8 +79,10 @@ class WinSocket {
// - The IOCP already completed in the background, and we need to call
// the callback now.
// - The IOCP hasn't completed yet, and we're queuing it for later.
void NotifyOnRead(EventEngine::Closure* on_read);
void NotifyOnWrite(EventEngine::Closure* on_write);
void NotifyOnRead(EventEngine::Closure* on_read)
ABSL_LOCKS_EXCLUDED(read_info_.ready_mu_);
void NotifyOnWrite(EventEngine::Closure* on_write)
ABSL_LOCKS_EXCLUDED(write_info_.ready_mu_);
bool IsShutdown();
// Shutdown socket operations, but do not delete the WinSocket.
// Connections will be disconnected, and the socket will be closed.
@ -100,7 +103,8 @@ class WinSocket {
SOCKET raw_socket();
private:
void NotifyOnReady(OpState& info, EventEngine::Closure* closure);
void NotifyOnReady(OpState& info, EventEngine::Closure* closure)
ABSL_LOCKS_EXCLUDED(info.ready_mu_);
SOCKET socket_;
std::atomic<bool> is_shutdown_{false};

@ -886,12 +886,6 @@ void grpc_cq_end_op(grpc_completion_queue* cq, void* tag,
void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage,
bool internal) {
// TODO(hork): remove when the listener flake is identified
#ifdef GPR_WINDOWS
if (grpc_core::IsEventEngineListenerEnabled()) {
gpr_log(GPR_ERROR, "cq_end_op called for tag %d (0x%p)", tag, tag);
}
#endif
cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
}

Loading…
Cancel
Save