Return unique_ptrs from IOCP::Watch (#31731)

* Return unique_ptrs from IOCP::Work

* Automated change: Fix sanity tests

* fix banned function

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/31735/head
AJ Heller 2 years ago committed by GitHub
parent e83d69bdf2
commit b66829429e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      src/core/lib/event_engine/windows/iocp.cc
  2. 2
      src/core/lib/event_engine/windows/iocp.h
  3. 88
      test/core/event_engine/windows/iocp_test.cc

@ -41,11 +41,11 @@ IOCP::IOCP(Executor* executor) noexcept
// Shutdown must be called prior to deletion
IOCP::~IOCP() {}
WinSocket* IOCP::Watch(SOCKET socket) {
WinSocket* wrapped_socket = new WinSocket(socket, executor_);
HANDLE ret =
CreateIoCompletionPort(reinterpret_cast<HANDLE>(socket), iocp_handle_,
reinterpret_cast<uintptr_t>(wrapped_socket), 0);
std::unique_ptr<WinSocket> IOCP::Watch(SOCKET socket) {
auto wrapped_socket = std::make_unique<WinSocket>(socket, executor_);
HANDLE ret = CreateIoCompletionPort(
reinterpret_cast<HANDLE>(socket), iocp_handle_,
reinterpret_cast<uintptr_t>(wrapped_socket.get()), 0);
if (!ret) {
char* utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message);

@ -46,7 +46,7 @@ class IOCP final : public Poller {
absl::FunctionRef<void()> schedule_poll_again) override;
void Kick() override;
WinSocket* Watch(SOCKET socket);
std::unique_ptr<WinSocket> Watch(SOCKET socket);
// Return the set of default flags
static DWORD GetDefaultSocketFlags();

@ -53,10 +53,8 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
IOCP iocp(&executor);
SOCKET sockpair[2];
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
WinSocket* wrapped_client_socket =
static_cast<WinSocket*>(iocp.Watch(sockpair[0]));
WinSocket* wrapped_server_socket =
static_cast<WinSocket*>(iocp.Watch(sockpair[1]));
auto wrapped_client_socket = iocp.Watch(sockpair[0]);
auto wrapped_server_socket = iocp.Watch(sockpair[1]);
grpc_core::Notification read_called;
grpc_core::Notification write_called;
DWORD flags = 0;
@ -78,13 +76,14 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
EXPECT_EQ(status, -1);
int last_error = WSAGetLastError();
ASSERT_EQ(last_error, WSA_IO_PENDING);
on_read = new AnyInvocableClosure([wrapped_client_socket, &read_called,
&read_wsabuf, &bytes_rcvd]() {
gpr_log(GPR_DEBUG, "Notified on read");
EXPECT_GE(wrapped_client_socket->read_info()->bytes_transferred(), 10);
EXPECT_STREQ(read_wsabuf.buf, "hello!");
read_called.Notify();
});
on_read =
new AnyInvocableClosure([win_socket = wrapped_client_socket.get(),
&read_called, &read_wsabuf, &bytes_rcvd]() {
gpr_log(GPR_DEBUG, "Notified on read");
EXPECT_GE(win_socket->read_info()->bytes_transferred(), 10);
EXPECT_STREQ(read_wsabuf.buf, "hello!");
read_called.Notify();
});
wrapped_client_socket->NotifyOnRead(on_read);
}
{
@ -132,8 +131,6 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
delete on_write;
wrapped_client_socket->MaybeShutdown(absl::OkStatus());
wrapped_server_socket->MaybeShutdown(absl::OkStatus());
delete wrapped_client_socket;
delete wrapped_server_socket;
executor.Quiesce();
}
@ -142,8 +139,7 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
IOCP iocp(&executor);
SOCKET sockpair[2];
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
WinSocket* wrapped_client_socket =
static_cast<WinSocket*>(iocp.Watch(sockpair[0]));
auto wrapped_client_socket = iocp.Watch(sockpair[0]);
grpc_core::Notification read_called;
DWORD flags = 0;
AnyInvocableClosure* on_read;
@ -164,13 +160,14 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
EXPECT_EQ(status, -1);
int last_error = WSAGetLastError();
ASSERT_EQ(last_error, WSA_IO_PENDING);
on_read = new AnyInvocableClosure([wrapped_client_socket, &read_called,
&read_wsabuf, &bytes_rcvd]() {
gpr_log(GPR_DEBUG, "Notified on read");
EXPECT_GE(wrapped_client_socket->read_info()->bytes_transferred(), 10);
EXPECT_STREQ(read_wsabuf.buf, "hello!");
read_called.Notify();
});
on_read =
new AnyInvocableClosure([win_socket = wrapped_client_socket.get(),
&read_called, &read_wsabuf, &bytes_rcvd]() {
gpr_log(GPR_DEBUG, "Notified on read");
EXPECT_GE(win_socket->read_info()->bytes_transferred(), 10);
EXPECT_STREQ(read_wsabuf.buf, "hello!");
read_called.Notify();
});
}
{
// Have the server send a message to the client. No need to track via IOCP
@ -198,7 +195,6 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
delete on_read;
wrapped_client_socket->MaybeShutdown(absl::OkStatus());
delete wrapped_client_socket;
executor.Quiesce();
}
@ -257,12 +253,7 @@ TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) {
SOCKET sockpair[2];
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
closesocket(sockpair[0]);
ASSERT_DEATH(
{
WinSocket* wrapped_client_socket =
static_cast<WinSocket*>(iocp.Watch(sockpair[0]));
},
"");
ASSERT_DEATH({ auto wrapped_client_socket = iocp.Watch(sockpair[0]); }, "");
executor.Quiesce();
}
@ -291,19 +282,21 @@ TEST_F(IOCPTest, StressTestThousandsOfSockets) {
for (int i = 0; i < sockets_per_thread; i++) {
SOCKET sockpair[2];
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
WinSocket* wrapped_client_socket =
static_cast<WinSocket*>(iocp.Watch(sockpair[0]));
WinSocket* wrapped_server_socket =
static_cast<WinSocket*>(iocp.Watch(sockpair[1]));
wrapped_client_socket->NotifyOnRead(
SelfDeletingClosure::Create([&read_count, wrapped_client_socket] {
auto wrapped_client_socket = iocp.Watch(sockpair[0]);
auto wrapped_server_socket = iocp.Watch(sockpair[1]);
auto* pclient = wrapped_client_socket.get();
pclient->NotifyOnRead(SelfDeletingClosure::Create(
[&read_count,
win_socket = std::move(wrapped_client_socket)]() mutable {
read_count.fetch_add(1);
wrapped_client_socket->MaybeShutdown(absl::OkStatus());
win_socket->MaybeShutdown(absl::OkStatus());
}));
wrapped_server_socket->NotifyOnWrite(
SelfDeletingClosure::Create([&write_count, wrapped_server_socket] {
auto* pserver = wrapped_server_socket.get();
pserver->NotifyOnWrite(SelfDeletingClosure::Create(
[&write_count,
win_socket = std::move(wrapped_server_socket)]() mutable {
write_count.fetch_add(1);
wrapped_server_socket->MaybeShutdown(absl::OkStatus());
win_socket->MaybeShutdown(absl::OkStatus());
}));
{
// Set the client to receive
@ -313,11 +306,10 @@ TEST_F(IOCPTest, StressTestThousandsOfSockets) {
read_wsabuf.buf = read_char_buffer;
DWORD bytes_rcvd;
DWORD flags = 0;
memset(wrapped_client_socket->read_info()->overlapped(), 0,
sizeof(OVERLAPPED));
int status = WSARecv(
wrapped_client_socket->socket(), &read_wsabuf, 1, &bytes_rcvd,
&flags, wrapped_client_socket->read_info()->overlapped(), NULL);
memset(pclient->read_info()->overlapped(), 0, sizeof(OVERLAPPED));
int status =
WSARecv(pclient->socket(), &read_wsabuf, 1, &bytes_rcvd, &flags,
pclient->read_info()->overlapped(), NULL);
// Expecting error 997, WSA_IO_PENDING
EXPECT_EQ(status, -1);
int last_error = WSAGetLastError();
@ -330,11 +322,9 @@ TEST_F(IOCPTest, StressTestThousandsOfSockets) {
write_wsabuf.len = 20;
write_wsabuf.buf = write_char_buffer;
DWORD bytes_sent;
memset(wrapped_server_socket->write_info()->overlapped(), 0,
sizeof(OVERLAPPED));
int status = WSASend(
wrapped_server_socket->socket(), &write_wsabuf, 1, &bytes_sent, 0,
wrapped_server_socket->write_info()->overlapped(), NULL);
memset(pserver->write_info()->overlapped(), 0, sizeof(OVERLAPPED));
int status = WSASend(pserver->socket(), &write_wsabuf, 1, &bytes_sent,
0, pserver->write_info()->overlapped(), NULL);
EXPECT_EQ(status, 0);
}
}

Loading…
Cancel
Save