[EventEngine] Windows endpoints keep their EventEngines alive (#32560)

Discovered via `bazel test
--test_env=GRPC_EXPERIMENTS=event_engine_client
//test/core/iomgr:endpoint_pair_test`. CI experiments can be enabled
generally on Windows once a few fixes and improvements are completed.
pull/32568/head
AJ Heller 2 years ago committed by GitHub
parent cd4154f21d
commit 1548038a09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      src/core/lib/event_engine/windows/windows_endpoint.cc
  2. 3
      src/core/lib/event_engine/windows/windows_endpoint.h
  3. 3
      src/core/lib/event_engine/windows/windows_engine.cc
  4. 2
      src/core/lib/event_engine/windows/windows_listener.cc
  5. 4
      test/core/event_engine/test_suite/tests/server_test.cc
  6. 15
      test/core/event_engine/windows/windows_endpoint_test.cc

@ -42,11 +42,13 @@ constexpr int kMaxWSABUFCount = 16;
WindowsEndpoint::WindowsEndpoint(
const EventEngine::ResolvedAddress& peer_address,
std::unique_ptr<WinSocket> socket, MemoryAllocator&& allocator,
const EndpointConfig& /* config */, Executor* executor)
const EndpointConfig& /* config */, Executor* executor,
std::shared_ptr<EventEngine> engine)
: peer_address_(peer_address),
allocator_(std::move(allocator)),
executor_(executor),
io_state_(std::make_shared<AsyncIOState>(this, std::move(socket))) {
io_state_(std::make_shared<AsyncIOState>(this, std::move(socket))),
engine_(engine) {
char addr[EventEngine::ResolvedAddress::MAX_SIZE_BYTES];
int addr_len = sizeof(addr);
if (getsockname(io_state_->socket->raw_socket(),
@ -62,7 +64,8 @@ WindowsEndpoint::WindowsEndpoint(
}
WindowsEndpoint::~WindowsEndpoint() {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p destoyed", this);
io_state_->socket->Shutdown(DEBUG_LOCATION, "~WindowsEndpoint");
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p destroyed", this);
}
bool WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
@ -304,16 +307,16 @@ void WindowsEndpoint::HandleWriteClosure::Run() {
// Deletes the shared_ptr when this closure returns
auto io_state = std::move(io_state_);
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Write Event",
io_state_->endpoint);
io_state->endpoint);
auto cb = std::move(cb_);
const auto result = io_state->socket->write_info()->result();
Reset();
absl::Status status;
if (result.wsa_error != 0) {
status = GRPC_WSA_ERROR(result.wsa_error, "WSASend");
} else {
GPR_ASSERT(result.bytes_transferred == buffer_->Length());
}
Reset();
cb(status);
}

@ -29,7 +29,7 @@ class WindowsEndpoint : public EventEngine::Endpoint {
WindowsEndpoint(const EventEngine::ResolvedAddress& peer_address,
std::unique_ptr<WinSocket> socket,
MemoryAllocator&& allocator, const EndpointConfig& config,
Executor* Executor);
Executor* Executor, std::shared_ptr<EventEngine> engine);
~WindowsEndpoint() override;
bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs* args) override;
@ -93,6 +93,7 @@ class WindowsEndpoint : public EventEngine::Endpoint {
MemoryAllocator allocator_;
Executor* executor_;
std::shared_ptr<AsyncIOState> io_state_;
std::shared_ptr<EventEngine> engine_;
};
} // namespace experimental

@ -107,6 +107,7 @@ WindowsEventEngine::WindowsEventEngine()
}
WindowsEventEngine::~WindowsEventEngine() {
GRPC_EVENT_ENGINE_TRACE("~WindowsEventEngine::%p", this);
{
grpc_core::MutexLock lock(&task_mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
@ -207,7 +208,7 @@ void WindowsEventEngine::OnConnectCompleted(
ChannelArgsEndpointConfig cfg;
endpoint = std::make_unique<WindowsEndpoint>(
state->address, std::move(state->socket), std::move(state->allocator),
cfg, executor_.get());
cfg, executor_.get(), shared_from_this());
}
}
cb(std::move(endpoint));

@ -200,7 +200,7 @@ void WindowsEventEngineListener::SinglePortSocketListener::
peer_address, listener_->iocp_->Watch(io_state_->accept_socket),
listener_->memory_allocator_factory_->CreateMemoryAllocator(
absl::StrFormat("listener endpoint %s", peer_name)),
listener_->config_, listener_->executor_);
listener_->config_, listener_->executor_, listener_->engine_);
listener_->accept_cb_(
std::move(endpoint),
listener_->memory_allocator_factory_->CreateMemoryAllocator(

@ -100,7 +100,7 @@ TEST_F(EventEngineServerTest, CannotBindAfterStarted) {
// equals data read at the other end of the stream.
TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) {
grpc_core::ExecCtx ctx;
auto oracle_ee = this->NewOracleEventEngine();
std::shared_ptr<EventEngine> oracle_ee(this->NewOracleEventEngine());
std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = absl::StrCat(
@ -175,7 +175,7 @@ TEST_F(EventEngineServerTest,
grpc_core::ExecCtx ctx;
static constexpr int kNumListenerAddresses = 10; // N
static constexpr int kNumConnections = 10; // M
auto oracle_ee = this->NewOracleEventEngine();
std::shared_ptr<EventEngine> oracle_ee(this->NewOracleEventEngine());
std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
std::unique_ptr<EventEngine::Endpoint> server_endpoint;

@ -27,6 +27,7 @@
#include "src/core/lib/event_engine/thread_pool.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/windows_endpoint.h"
#include "src/core/lib/event_engine/windows/windows_engine.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "test/core/event_engine/windows/create_sockpair.h"
@ -49,14 +50,15 @@ TEST_F(WindowsEndpointTest, BasicCommunication) {
auto wrapped_client_socket = iocp.Watch(sockpair[0]);
auto wrapped_server_socket = iocp.Watch(sockpair[1]);
sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress();
auto engine = std::make_shared<WindowsEventEngine>();
EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr,
sizeof(loopback_addr));
WindowsEndpoint client(addr, std::move(wrapped_client_socket),
quota.CreateMemoryAllocator("client"),
ChannelArgsEndpointConfig(), &executor);
ChannelArgsEndpointConfig(), &executor, engine);
WindowsEndpoint server(addr, std::move(wrapped_server_socket),
quota.CreateMemoryAllocator("server"),
ChannelArgsEndpointConfig(), &executor);
ChannelArgsEndpointConfig(), &executor, engine);
// Test
std::string message = "0xDEADBEEF";
grpc_core::Notification read_done;
@ -97,11 +99,11 @@ TEST_F(WindowsEndpointTest, Conversation) {
AppState(const EventEngine::ResolvedAddress& addr,
std::unique_ptr<WinSocket> client,
std::unique_ptr<WinSocket> server, grpc_core::MemoryQuota& quota,
Executor& executor)
Executor& executor, std::shared_ptr<EventEngine> engine)
: client(addr, std::move(client), quota.CreateMemoryAllocator("client"),
ChannelArgsEndpointConfig(), &executor),
ChannelArgsEndpointConfig(), &executor, engine),
server(addr, std::move(server), quota.CreateMemoryAllocator("server"),
ChannelArgsEndpointConfig(), &executor) {}
ChannelArgsEndpointConfig(), &executor, engine) {}
grpc_core::Notification done;
WindowsEndpoint client;
WindowsEndpoint server;
@ -145,8 +147,9 @@ TEST_F(WindowsEndpointTest, Conversation) {
}
}
};
auto engine = std::make_shared<WindowsEventEngine>();
AppState state(addr, /*client=*/iocp.Watch(sockpair[0]),
/*server=*/iocp.Watch(sockpair[1]), quota, executor);
/*server=*/iocp.Watch(sockpair[1]), quota, executor, engine);
state.WriteAndQueueReader(/*writer=*/&state.client, /*reader=*/&state.server);
while (iocp.Work(100ms, []() {}) == Poller::WorkResult::kOk ||
!state.done.HasBeenNotified()) {

Loading…
Cancel
Save