[EventEngine] Lock-free fix for WinSocket/IOCP notification race (#34577)

This is a lock-free refactoring of
https://github.com/grpc/grpc/pull/34497.

---------

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/34691/head
AJ Heller 1 year ago committed by GitHub
parent e810225bc0
commit ffe6634163
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      src/core/lib/event_engine/windows/win_socket.cc
  2. 38
      src/core/lib/event_engine/windows/win_socket.h
  3. 65
      src/core/lib/event_engine/windows/windows_endpoint.cc
  4. 2
      src/core/lib/event_engine/windows/windows_endpoint.h
  5. 75
      src/core/lib/event_engine/windows/windows_engine.cc
  6. 3
      src/core/lib/event_engine/windows/windows_engine.h
  7. 5
      src/core/lib/event_engine/windows/windows_listener.cc
  8. 45
      test/core/event_engine/windows/iocp_test.cc
  9. 109
      test/core/event_engine/windows/win_socket_test.cc

@ -93,20 +93,8 @@ void WinSocket::NotifyOnReady(OpState& info, EventEngine::Closure* closure) {
thread_pool_->Run(closure);
return;
};
{
grpc_core::MutexLock lock(&info.ready_mu_);
if (!std::exchange(info.has_pending_iocp_, false)) {
// No overlapped results have been returned for this socket. Assert that
// there is no notification callback yet, set the notification callback,
// and return.
EventEngine::Closure* prev = nullptr;
GPR_ASSERT(std::exchange(info.closure_, closure) == nullptr);
return;
}
}
// Overlapped results are already available. Schedule the callback
// immediately.
thread_pool_->Run(closure);
// It is an error if any notification is already registered for this socket.
GPR_ASSERT(std::exchange(info.closure_, closure) == nullptr);
}
void WinSocket::NotifyOnRead(EventEngine::Closure* on_read) {
@ -117,6 +105,14 @@ void WinSocket::NotifyOnWrite(EventEngine::Closure* on_write) {
NotifyOnReady(write_info_, on_write);
}
void WinSocket::UnregisterReadCallback() {
GPR_ASSERT(std::exchange(read_info_.closure_, nullptr) != nullptr);
}
void WinSocket::UnregisterWriteCallback() {
GPR_ASSERT(std::exchange(write_info_.closure_, nullptr) != nullptr);
}
// ---- WinSocket::OpState ----
WinSocket::OpState::OpState(WinSocket* win_socket) noexcept
@ -125,16 +121,11 @@ WinSocket::OpState::OpState(WinSocket* win_socket) noexcept
}
void WinSocket::OpState::SetReady() {
EventEngine::Closure* closure;
{
grpc_core::MutexLock lock(&ready_mu_);
GPR_ASSERT(!has_pending_iocp_);
closure = std::exchange(closure_, nullptr);
if (!closure) {
has_pending_iocp_ = true;
}
}
if (closure) win_socket_->thread_pool_->Run(closure);
auto* closure = std::exchange(closure_, nullptr);
// If an IOCP event is returned for a socket, and no callback has been
// registered for notification, this is invalid usage.
GPR_ASSERT(closure != nullptr);
win_socket_->thread_pool_->Run(closure);
}
void WinSocket::OpState::SetError(int wsa_error) {
@ -145,6 +136,11 @@ void WinSocket::OpState::SetResult(OverlappedResult result) {
result_ = result;
}
void WinSocket::OpState::SetErrorStatus(absl::Status error_status) {
result_ = OverlappedResult{/*wsa_error=*/0, /*bytes_transferred=*/0,
/*error_status=*/error_status};
}
void WinSocket::OpState::GetOverlappedResult() {
GetOverlappedResult(win_socket_->raw_socket());
}

@ -35,6 +35,7 @@ class WinSocket {
struct OverlappedResult {
int wsa_error;
DWORD bytes_transferred;
absl::Status error_status;
};
// State related to a Read or Write socket operation
@ -45,11 +46,14 @@ class WinSocket {
// If a callback is already primed for notification, it will be executed via
// the WinSocket's ThreadPool. Otherwise, a "pending iocp" flag will
// be set.
void SetReady() ABSL_LOCKS_EXCLUDED(ready_mu_);
// Set error results for a completed op
void SetReady();
// Set WSA error results for a completed op.
void SetError(int wsa_error);
// Set an OverlappedResult. Useful when WSARecv returns immediately.
void SetResult(OverlappedResult result);
// Set error results for a completed op.
// This is a manual override, meant to override any WSA status code.
void SetErrorStatus(absl::Status error_status);
// Retrieve the results of an overlapped operation (via Winsock API) and
// store them locally.
void GetOverlappedResult();
@ -67,22 +71,27 @@ class WinSocket {
OVERLAPPED overlapped_;
WinSocket* win_socket_ = nullptr;
grpc_core::Mutex ready_mu_;
EventEngine::Closure* closure_ ABSL_GUARDED_BY(ready_mu_) = nullptr;
bool has_pending_iocp_ = false;
EventEngine::Closure* closure_ = nullptr;
OverlappedResult result_;
};
WinSocket(SOCKET socket, ThreadPool* thread_pool) noexcept;
~WinSocket();
// Calling NotifyOnRead means either of two things:
// - The IOCP already completed in the background, and we need to call
// the callback now.
// - The IOCP hasn't completed yet, and we're queuing it for later.
void NotifyOnRead(EventEngine::Closure* on_read)
ABSL_LOCKS_EXCLUDED(read_info_.ready_mu_);
void NotifyOnWrite(EventEngine::Closure* on_write)
ABSL_LOCKS_EXCLUDED(write_info_.ready_mu_);
// Provide a closure that will be called when an IOCP completion has occurred.
//
// Notification callbacks *must be registered* before any WSASend or WSARecv
// operations are started. Only one closure can be registered at a time for
// each read or send operation.
void NotifyOnRead(EventEngine::Closure* on_read);
void NotifyOnWrite(EventEngine::Closure* on_write);
// Remove the notification callback for read/write events.
//
// This method should only be called if no IOCP event is pending for the
// socket. It is UB if an IOCP event comes through and a notification is not
// registered.
void UnregisterReadCallback();
void UnregisterWriteCallback();
bool IsShutdown();
// Shutdown socket operations, but do not delete the WinSocket.
// Connections will be disconnected, and the socket will be closed.
@ -103,8 +112,7 @@ class WinSocket {
SOCKET raw_socket();
private:
void NotifyOnReady(OpState& info, EventEngine::Closure* closure)
ABSL_LOCKS_EXCLUDED(info.ready_mu_);
void NotifyOnReady(OpState& info, EventEngine::Closure* closure);
SOCKET socket_;
std::atomic<bool> is_shutdown_{false};

@ -76,10 +76,13 @@ WindowsEndpoint::~WindowsEndpoint() {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("~WindowsEndpoint::%p", this);
}
absl::Status WindowsEndpoint::AsyncIOState::DoTcpRead(SliceBuffer* buffer) {
void WindowsEndpoint::AsyncIOState::DoTcpRead(SliceBuffer* buffer) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p reading", endpoint);
if (socket->IsShutdown()) {
return absl::UnavailableError("Socket is shutting down.");
socket->read_info()->SetErrorStatus(
absl::UnavailableError("Socket is shutting down."));
thread_pool->Run(&handle_read_event);
return;
}
// Prepare the WSABUF struct
GPR_ASSERT(buffer->Count() <= kMaxWSABUFCount);
@ -101,26 +104,26 @@ absl::Status WindowsEndpoint::AsyncIOState::DoTcpRead(SliceBuffer* buffer) {
socket->read_info()->SetResult(
{/*wsa_error=*/wsa_error, /*bytes_read=*/bytes_read});
thread_pool->Run(&handle_read_event);
return absl::OkStatus();
return;
}
// If the endpoint has already received some data, and the next call would
// block, return the data in case that is all the data the reader expects.
if (handle_read_event.MaybeFinishIfDataHasAlreadyBeenRead()) {
return absl::OkStatus();
return;
}
// Otherwise, let's retry, by queuing a read.
status =
WSARecv(socket->raw_socket(), wsa_buffers, (DWORD)buffer->Count(),
&bytes_read, &flags, socket->read_info()->overlapped(), nullptr);
socket->NotifyOnRead(&handle_read_event);
status = WSARecv(socket->raw_socket(), wsa_buffers, (DWORD)buffer->Count(),
nullptr, &flags, socket->read_info()->overlapped(), nullptr);
wsa_error = status == 0 ? 0 : WSAGetLastError();
if (wsa_error != 0 && wsa_error != WSA_IO_PENDING) {
// Async read returned immediately with an error
return GRPC_WSA_ERROR(
// The async read attempt returned an error immediately.
socket->UnregisterReadCallback();
socket->read_info()->SetErrorStatus(GRPC_WSA_ERROR(
wsa_error,
absl::StrFormat("WindowsEndpont::%p Read failed", this).c_str());
absl::StrFormat("WindowsEndpont::%p Read failed", this).c_str()));
thread_pool->Run(&handle_read_event);
}
socket->NotifyOnRead(&handle_read_event);
return absl::OkStatus();
}
bool WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
@ -140,13 +143,7 @@ bool WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
buffer->AppendIndexed(Slice(allocator_.MakeSlice(min_read_size)));
}
io_state_->handle_read_event.Prime(io_state_, buffer, std::move(on_read));
auto status = io_state_->DoTcpRead(buffer);
if (!status.ok()) {
// The read could not be completed.
io_state_->thread_pool->Run(
[cb = io_state_->handle_read_event.ResetAndReturnCallback(),
status]() mutable { cb(status); });
}
io_state_->DoTcpRead(buffer);
return false;
}
@ -212,6 +209,8 @@ bool WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
}
}
auto write_info = io_state_->socket->write_info();
io_state_->handle_write_event.Prime(io_state_, data, std::move(on_writable));
io_state_->socket->NotifyOnWrite(&io_state_->handle_write_event);
status =
WSASend(io_state_->socket->raw_socket(), &buffers[async_buffers_offset],
(DWORD)(data->Count() - async_buffers_offset), nullptr, 0,
@ -219,17 +218,12 @@ bool WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
io_state_->thread_pool->Run(
[cb = std::move(on_writable), wsa_error]() mutable {
cb(GRPC_WSA_ERROR(wsa_error, "WSASend"));
});
return false;
io_state_->socket->UnregisterWriteCallback();
io_state_->socket->write_info()->SetErrorStatus(
GRPC_WSA_ERROR(wsa_error, "WSASend"));
io_state_->thread_pool->Run(&io_state_->handle_write_event);
}
}
// As all is now setup, we can now ask for the IOCP notification. It may
// trigger the callback immediately however, but no matter.
io_state_->handle_write_event.Prime(io_state_, data, std::move(on_writable));
io_state_->socket->NotifyOnWrite(&io_state_->handle_write_event);
return false;
}
const EventEngine::ResolvedAddress& WindowsEndpoint::GetPeerAddress() const {
@ -288,8 +282,12 @@ void WindowsEndpoint::HandleReadClosure::Run() {
auto io_state = std::move(io_state_);
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Read Event",
io_state->endpoint);
absl::Status status;
const auto result = io_state->socket->read_info()->result();
if (!result.error_status.ok()) {
buffer_->Clear();
return ResetAndReturnCallback()(result.error_status);
}
absl::Status status;
if (result.wsa_error != 0) {
status = GRPC_WSA_ERROR(result.wsa_error, "Async Read Error");
buffer_->Clear();
@ -317,10 +315,7 @@ void WindowsEndpoint::HandleReadClosure::Run() {
}
// Doing another read. Let's keep the AsyncIOState alive a bit longer.
io_state_ = std::move(io_state);
status = io_state_->DoTcpRead(buffer_);
if (!status.ok()) {
return ResetAndReturnCallback()(status);
}
io_state_->DoTcpRead(buffer_);
}
bool WindowsEndpoint::HandleReadClosure::MaybeFinishIfDataHasAlreadyBeenRead() {
@ -349,6 +344,10 @@ void WindowsEndpoint::HandleWriteClosure::Run() {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Write Event",
io_state->endpoint);
const auto result = io_state->socket->write_info()->result();
if (!result.error_status.ok()) {
buffer_->Clear();
return ResetAndReturnCallback()(result.error_status);
}
absl::Status status;
if (result.wsa_error != 0) {
status = GRPC_WSA_ERROR(result.wsa_error, "WSASend");

@ -97,7 +97,7 @@ class WindowsEndpoint : public EventEngine::Endpoint {
// Perform the low-level calls and execute the HandleReadClosure
// asynchronously.
absl::Status DoTcpRead(SliceBuffer* buffer);
void DoTcpRead(SliceBuffer* buffer);
WindowsEndpoint* const endpoint;
std::unique_ptr<WinSocket> socket;

@ -218,18 +218,21 @@ void WindowsEventEngine::OnConnectCompleted(
const auto& overlapped_result = state->socket->write_info()->result();
// return early if we cannot cancel the connection timeout timer.
if (!Cancel(state->timer_handle)) return;
if (overlapped_result.wsa_error != 0) {
if (!overlapped_result.error_status.ok()) {
state->socket->Shutdown(DEBUG_LOCATION, "ConnectEx failure");
endpoint = overlapped_result.error_status;
} else if (overlapped_result.wsa_error != 0) {
state->socket->Shutdown(DEBUG_LOCATION, "ConnectEx failure");
endpoint = GRPC_WSA_ERROR(overlapped_result.wsa_error, "ConnectEx");
} else {
// This code should be running in a thread pool thread already, so the
// callback can be run directly.
ChannelArgsEndpointConfig cfg;
endpoint = std::make_unique<WindowsEndpoint>(
state->address, std::move(state->socket), std::move(state->allocator),
cfg, thread_pool_.get(), shared_from_this());
}
}
// This code should be running in a thread pool thread already, so the
// callback can be run directly.
cb(std::move(endpoint));
}
@ -298,43 +301,19 @@ EventEngine::ConnectionHandle WindowsEventEngine::Connect(
});
return EventEngine::ConnectionHandle::kInvalid;
}
// Connect
auto watched_socket = iocp_.Watch(sock);
auto* info = watched_socket->write_info();
bool success =
ConnectEx(watched_socket->raw_socket(), address.address(), address.size(),
nullptr, 0, nullptr, info->overlapped());
// It wouldn't be unusual to get a success immediately. But we'll still get an
// IOCP notification, so let's ignore it.
if (!success) {
int last_error = WSAGetLastError();
if (last_error != ERROR_IO_PENDING) {
Run([on_connect = std::move(on_connect),
status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx")]() mutable {
on_connect(status);
});
watched_socket->Shutdown(DEBUG_LOCATION, "ConnectEx");
return EventEngine::ConnectionHandle::kInvalid;
}
}
GPR_ASSERT(watched_socket != nullptr);
// Prepare the socket to receive a connection
auto connection_state = std::make_shared<ConnectionState>();
grpc_core::MutexLock lock(&connection_state->mu);
connection_state->socket = iocp_.Watch(sock);
GPR_ASSERT(connection_state->socket != nullptr);
auto* info = connection_state->socket->write_info();
connection_state->address = address;
connection_state->socket = std::move(watched_socket);
connection_state->on_connected_user_callback = std::move(on_connect);
connection_state->allocator = std::move(memory_allocator);
connection_state->on_connected_user_callback = std::move(on_connect);
connection_state->on_connected =
SelfDeletingClosure::Create([this, connection_state]() mutable {
OnConnectCompleted(std::move(connection_state));
});
{
grpc_core::MutexLock conn_lock(&connection_mu_);
connection_state->connection_handle =
ConnectionHandle{reinterpret_cast<intptr_t>(connection_state.get()),
aba_token_.fetch_add(1)};
known_connection_handles_.insert(connection_state->connection_handle);
}
connection_state->timer_handle =
RunAfter(timeout, [this, connection_state]() {
grpc_core::MutexLock lock(&connection_state->mu);
@ -342,10 +321,36 @@ EventEngine::ConnectionHandle WindowsEventEngine::Connect(
connection_state->on_connected_user_callback(
absl::DeadlineExceededError("Connection timed out"));
}
// else: The connection attempt could not be canceled. We can assume the
// connection callback will be called.
// else: The connection attempt could not be canceled. We can assume
// the connection callback will be called.
});
// Connect
connection_state->socket->NotifyOnWrite(connection_state->on_connected);
bool success =
ConnectEx(connection_state->socket->raw_socket(), address.address(),
address.size(), nullptr, 0, nullptr, info->overlapped());
// It wouldn't be unusual to get a success immediately. But we'll still get an
// IOCP notification, so let's ignore it.
if (!success) {
int last_error = WSAGetLastError();
if (last_error != ERROR_IO_PENDING) {
if (!Cancel(connection_state->timer_handle)) {
return EventEngine::ConnectionHandle::kInvalid;
}
connection_state->socket->Shutdown(DEBUG_LOCATION, "ConnectEx");
Run([connection_state = std::move(connection_state),
status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx")]() mutable {
grpc_core::MutexLock lock(&connection_state->mu);
connection_state->on_connected_user_callback(status);
});
return EventEngine::ConnectionHandle::kInvalid;
}
}
connection_state->connection_handle =
ConnectionHandle{reinterpret_cast<intptr_t>(connection_state.get()),
aba_token_.fetch_add(1)};
grpc_core::MutexLock connection_handle_lock(&connection_mu_);
known_connection_handles_.insert(connection_state->connection_handle);
return connection_state->connection_handle;
}
@ -386,7 +391,7 @@ bool WindowsEventEngine::CancelConnectFromDeadlineTimer(
bool WindowsEventEngine::CancelConnectInternalStateLocked(
ConnectionState* connection_state) {
connection_state->socket->Shutdown(DEBUG_LOCATION, "CancelConnect");
// Release the connection_state shared_ptr. connection_state is now invalid.
// Release the connection_state shared_ptr owned by the connected callback.
delete connection_state->on_connected;
GRPC_EVENT_ENGINE_TRACE("Successfully cancelled connection %s",
HandleToString<EventEngine::ConnectionHandle>(

@ -100,7 +100,8 @@ class WindowsEventEngine : public EventEngine,
grpc_core::Mutex mu
ABSL_ACQUIRED_BEFORE(WindowsEventEngine::connection_mu_);
EventEngine::ConnectionHandle connection_handle ABSL_GUARDED_BY(mu);
EventEngine::TaskHandle timer_handle ABSL_GUARDED_BY(mu);
EventEngine::TaskHandle timer_handle ABSL_GUARDED_BY(mu) =
EventEngine::TaskHandle::kInvalid;
EventEngine::OnConnectCallback on_connected_user_callback
ABSL_GUARDED_BY(mu);
EventEngine::Closure* on_connected ABSL_GUARDED_BY(mu);

@ -121,6 +121,7 @@ WindowsEventEngineListener::SinglePortSocketListener::StartLocked() {
auto error = PrepareSocket(accept_socket);
if (!error.ok()) return fail(error);
// Start the "accept" asynchronously.
io_state_->listener_socket->NotifyOnRead(&io_state_->on_accept_cb);
DWORD addrlen = sizeof(sockaddr_in6) + 16;
DWORD bytes_received = 0;
int success =
@ -132,13 +133,11 @@ WindowsEventEngineListener::SinglePortSocketListener::StartLocked() {
if (success != 0) {
int last_error = WSAGetLastError();
if (last_error != ERROR_IO_PENDING) {
io_state_->listener_socket->UnregisterReadCallback();
return fail(GRPC_WSA_ERROR(last_error, "AcceptEx"));
}
}
// We're ready to do the accept. Calling NotifyOnRead may immediately process
// an accept that happened in the meantime.
io_state_->accept_socket = accept_socket;
io_state_->listener_socket->NotifyOnRead(&io_state_->on_accept_cb);
GRPC_EVENT_ENGINE_TRACE(
"SinglePortSocketListener::%p listening. listener_socket::%p", this,
io_state_->listener_socket.get());

@ -75,6 +75,14 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
char read_char_buffer[2048];
read_wsabuf.buf = read_char_buffer;
DWORD bytes_rcvd;
on_read = new AnyInvocableClosure([win_socket = wrapped_client_socket.get(),
&read_called, &read_wsabuf]() {
gpr_log(GPR_DEBUG, "Notified on read");
EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
EXPECT_STREQ(read_wsabuf.buf, "hello!");
read_called.Notify();
});
wrapped_client_socket->NotifyOnRead(on_read);
int status = WSARecv(
wrapped_client_socket->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
&flags, wrapped_client_socket->read_info()->overlapped(), NULL);
@ -85,16 +93,13 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
if (last_error != WSA_IO_PENDING) {
LogErrorMessage(last_error, "WSARecv");
}
on_read = new AnyInvocableClosure([win_socket = wrapped_client_socket.get(),
&read_called, &read_wsabuf]() {
gpr_log(GPR_DEBUG, "Notified on read");
EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
EXPECT_STREQ(read_wsabuf.buf, "hello!");
read_called.Notify();
});
wrapped_client_socket->NotifyOnRead(on_read);
}
{
on_write = new AnyInvocableClosure([&write_called] {
gpr_log(GPR_DEBUG, "Notified on write");
write_called.Notify();
});
wrapped_server_socket->NotifyOnWrite(on_write);
// Have the server send a message to the client
WSABUF write_wsabuf;
char write_char_buffer[2048] = "hello!";
@ -108,11 +113,6 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
if (status != 0) {
LogErrorMessage(WSAGetLastError(), "WSASend");
}
on_write = new AnyInvocableClosure([&write_called] {
gpr_log(GPR_DEBUG, "Notified on write");
write_called.Notify();
});
wrapped_server_socket->NotifyOnWrite(on_write);
}
// Doing work for WSASend
bool cb_invoked = false;
@ -146,11 +146,18 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
auto wrapped_client_socket = iocp.Watch(sockpair[0]);
grpc_core::Notification read_called;
DWORD flags = 0;
AnyInvocableClosure* on_read;
{
// Set the client to receive asynchronously
// Prepare a notification callback, but don't register it yet.
WSABUF read_wsabuf;
wrapped_client_socket->NotifyOnRead(
SelfDeletingClosure::Create([win_socket = wrapped_client_socket.get(),
&read_called, &read_wsabuf]() {
gpr_log(GPR_DEBUG, "Notified on read");
EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
EXPECT_STREQ(read_wsabuf.buf, "hello!");
read_called.Notify();
}));
read_wsabuf.len = 2048;
char read_char_buffer[2048];
read_wsabuf.buf = read_char_buffer;
@ -165,13 +172,6 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
if (last_error != WSA_IO_PENDING) {
LogErrorMessage(last_error, "WSARecv");
}
on_read = new AnyInvocableClosure([win_socket = wrapped_client_socket.get(),
&read_called, &read_wsabuf]() {
gpr_log(GPR_DEBUG, "Notified on read");
EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
EXPECT_STREQ(read_wsabuf.buf, "hello!");
read_called.Notify();
});
}
{
// Have the server send a message to the client. No need to track via IOCP
@ -195,11 +195,8 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
[&cb_invoked]() { cb_invoked = true; });
ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
ASSERT_TRUE(cb_invoked);
// register the closure, which should trigger it immediately.
wrapped_client_socket->NotifyOnRead(on_read);
// wait for the callbacks to run
read_called.WaitForNotification();
delete on_read;
wrapped_client_socket->Shutdown();
iocp.Shutdown();
thread_pool->Quiesce();

@ -27,6 +27,7 @@
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/win_socket.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/error.h"
#include "test/core/event_engine/windows/create_sockpair.h"
@ -38,59 +39,79 @@ using ::grpc_event_engine::experimental::ThreadPool;
using ::grpc_event_engine::experimental::WinSocket;
} // namespace
class WinSocketTest : public testing::Test {};
class WinSocketTest : public testing::Test {
public:
WinSocketTest()
: thread_pool_(grpc_event_engine::experimental::MakeThreadPool(8)) {
CreateSockpair(sockpair_, IOCP::GetDefaultSocketFlags());
wrapped_client_socket_ =
std::make_unique<WinSocket>(sockpair_[0], thread_pool_.get());
wrapped_server_socket_ =
std::make_unique<WinSocket>(sockpair_[1], thread_pool_.get());
}
~WinSocketTest() override {
wrapped_client_socket_->Shutdown();
wrapped_server_socket_->Shutdown();
thread_pool_->Quiesce();
}
protected:
std::shared_ptr<ThreadPool> thread_pool_;
SOCKET sockpair_[2];
std::unique_ptr<WinSocket> wrapped_client_socket_;
std::unique_ptr<WinSocket> wrapped_server_socket_;
};
TEST_F(WinSocketTest, ManualReadEventTriggeredWithoutIO) {
auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
SOCKET sockpair[2];
CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
WinSocket wrapped_client_socket(sockpair[0], thread_pool.get());
WinSocket wrapped_server_socket(sockpair[1], thread_pool.get());
bool read_called = false;
AnyInvocableClosure on_read([&read_called]() { read_called = true; });
wrapped_client_socket.NotifyOnRead(&on_read);
grpc_core::Notification read_called;
AnyInvocableClosure on_read([&read_called]() { read_called.Notify(); });
wrapped_client_socket_->NotifyOnRead(&on_read);
AnyInvocableClosure on_write([] { FAIL() << "No Write expected"; });
wrapped_client_socket.NotifyOnWrite(&on_write);
ASSERT_FALSE(read_called);
wrapped_client_socket.read_info()->SetReady();
absl::Time deadline = absl::Now() + absl::Seconds(10);
while (!read_called) {
absl::SleepFor(absl::Milliseconds(42));
if (deadline < absl::Now()) {
FAIL() << "Deadline exceeded";
}
}
ASSERT_TRUE(read_called);
wrapped_client_socket.Shutdown();
wrapped_server_socket.Shutdown();
thread_pool->Quiesce();
wrapped_client_socket_->NotifyOnWrite(&on_write);
wrapped_client_socket_->read_info()->SetReady();
read_called.WaitForNotification();
}
TEST_F(WinSocketTest, NotificationCalledImmediatelyOnShutdownWinSocket) {
auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
SOCKET sockpair[2];
CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
WinSocket wrapped_client_socket(sockpair[0], thread_pool.get());
wrapped_client_socket.Shutdown();
bool read_called = false;
AnyInvocableClosure closure([&wrapped_client_socket, &read_called] {
ASSERT_EQ(wrapped_client_socket.read_info()->result().bytes_transferred,
wrapped_client_socket_->Shutdown();
grpc_core::Notification read_called;
AnyInvocableClosure closure([this, &read_called] {
ASSERT_EQ(wrapped_client_socket_->read_info()->result().bytes_transferred,
0u);
ASSERT_EQ(wrapped_client_socket.read_info()->result().wsa_error,
ASSERT_EQ(wrapped_client_socket_->read_info()->result().wsa_error,
WSAESHUTDOWN);
read_called = true;
read_called.Notify();
});
wrapped_client_socket.NotifyOnRead(&closure);
absl::Time deadline = absl::Now() + absl::Seconds(3);
while (!read_called) {
absl::SleepFor(absl::Milliseconds(42));
if (deadline < absl::Now()) {
FAIL() << "Deadline exceeded";
}
}
ASSERT_TRUE(read_called);
closesocket(sockpair[1]);
thread_pool->Quiesce();
wrapped_client_socket_->NotifyOnRead(&closure);
read_called.WaitForNotification();
}
TEST_F(WinSocketTest, UnsetNotificationWorks) {
AnyInvocableClosure read_closure{
[]() { grpc_core::Crash("read callback called"); }};
wrapped_client_socket_->NotifyOnRead(&read_closure);
AnyInvocableClosure write_closure{
[]() { grpc_core::Crash("write callback called"); }};
wrapped_client_socket_->NotifyOnWrite(&write_closure);
wrapped_client_socket_->UnregisterReadCallback();
wrapped_client_socket_->UnregisterWriteCallback();
// Give this time to fail.
absl::SleepFor(absl::Seconds(1));
}
TEST_F(WinSocketTest, UnsetNotificationCanBeDoneRepeatedly) {
// This should crash if a callback is already registered.
AnyInvocableClosure closure{
[]() { grpc_core::Crash("read callback 1 called"); }};
wrapped_client_socket_->NotifyOnRead(&closure);
wrapped_client_socket_->UnregisterReadCallback();
wrapped_client_socket_->NotifyOnRead(&closure);
wrapped_client_socket_->UnregisterReadCallback();
wrapped_client_socket_->NotifyOnRead(&closure);
wrapped_client_socket_->UnregisterReadCallback();
// Give this time to fail.
absl::SleepFor(absl::Seconds(1));
}
int main(int argc, char** argv) {

Loading…
Cancel
Save