|
|
|
@ -26,33 +26,67 @@ |
|
|
|
|
#include <grpc/event_engine/memory_allocator.h> |
|
|
|
|
#include <grpc/event_engine/slice_buffer.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/event_engine/channel_args_endpoint_config.h" |
|
|
|
|
#include "src/core/lib/event_engine/common_closures.h" |
|
|
|
|
#include "src/core/lib/event_engine/executor/executor.h" |
|
|
|
|
#include "src/core/lib/event_engine/handle_containers.h" |
|
|
|
|
#include "src/core/lib/event_engine/posix_engine/timer_manager.h" |
|
|
|
|
#include "src/core/lib/event_engine/tcp_socket_utils.h" |
|
|
|
|
#include "src/core/lib/event_engine/trace.h" |
|
|
|
|
#include "src/core/lib/event_engine/utils.h" |
|
|
|
|
#include "src/core/lib/event_engine/windows/iocp.h" |
|
|
|
|
#include "src/core/lib/event_engine/windows/windows_endpoint.h" |
|
|
|
|
#include "src/core/lib/event_engine/windows/windows_engine.h" |
|
|
|
|
#include "src/core/lib/gprpp/sync.h" |
|
|
|
|
#include "src/core/lib/gprpp/time.h" |
|
|
|
|
#include "src/core/lib/iomgr/error.h" |
|
|
|
|
|
|
|
|
|
namespace grpc_event_engine { |
|
|
|
|
namespace experimental { |
|
|
|
|
|
|
|
|
|
// ---- IOCPWorkClosure ----
|
|
|
|
|
|
|
|
|
|
WindowsEventEngine::IOCPWorkClosure::IOCPWorkClosure(Executor* executor, |
|
|
|
|
IOCP* iocp) |
|
|
|
|
: executor_(executor), iocp_(iocp) { |
|
|
|
|
executor_->Run(this); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void WindowsEventEngine::IOCPWorkClosure::Run() { |
|
|
|
|
auto result = iocp_->Work(std::chrono::seconds(60), [this] { |
|
|
|
|
workers_.fetch_add(1); |
|
|
|
|
executor_->Run(this); |
|
|
|
|
}); |
|
|
|
|
if (result == Poller::WorkResult::kDeadlineExceeded) { |
|
|
|
|
// iocp received no messages. restart the worker
|
|
|
|
|
workers_.fetch_add(1); |
|
|
|
|
executor_->Run(this); |
|
|
|
|
} |
|
|
|
|
if (workers_.fetch_sub(1) == 1) done_signal_.Notify(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void WindowsEventEngine::IOCPWorkClosure::WaitForShutdown() { |
|
|
|
|
done_signal_.WaitForNotification(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ---- WindowsEventEngine ----
|
|
|
|
|
|
|
|
|
|
// TODO(hork): The iomgr timer and execution engine can be reused. It should
|
|
|
|
|
// be separated out from the posix_engine and instantiated as components. It is
|
|
|
|
|
// effectively copied below.
|
|
|
|
|
|
|
|
|
|
struct WindowsEventEngine::Closure final : public EventEngine::Closure { |
|
|
|
|
struct WindowsEventEngine::TimerClosure final : public EventEngine::Closure { |
|
|
|
|
absl::AnyInvocable<void()> cb; |
|
|
|
|
Timer timer; |
|
|
|
|
WindowsEventEngine* engine; |
|
|
|
|
EventEngine::TaskHandle handle; |
|
|
|
|
|
|
|
|
|
void Run() override { |
|
|
|
|
GRPC_EVENT_ENGINE_TRACE("WindowsEventEngine:%p executing callback:%s", |
|
|
|
|
engine, HandleToString(handle).c_str()); |
|
|
|
|
GRPC_EVENT_ENGINE_TRACE( |
|
|
|
|
"WindowsEventEngine:%p executing callback:%s", engine, |
|
|
|
|
HandleToString<EventEngine::TaskHandle>(handle).c_str()); |
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock lock(&engine->mu_); |
|
|
|
|
grpc_core::MutexLock lock(&engine->task_mu_); |
|
|
|
|
engine->known_handles_.erase(handle); |
|
|
|
|
} |
|
|
|
|
cb(); |
|
|
|
@ -63,7 +97,8 @@ struct WindowsEventEngine::Closure final : public EventEngine::Closure { |
|
|
|
|
WindowsEventEngine::WindowsEventEngine() |
|
|
|
|
: executor_(std::make_shared<ThreadPool>()), |
|
|
|
|
iocp_(executor_.get()), |
|
|
|
|
timer_manager_(executor_) { |
|
|
|
|
timer_manager_(executor_), |
|
|
|
|
iocp_worker_(executor_.get(), &iocp_) { |
|
|
|
|
WSADATA wsaData; |
|
|
|
|
int status = WSAStartup(MAKEWORD(2, 0), &wsaData); |
|
|
|
|
GPR_ASSERT(status == 0); |
|
|
|
@ -71,25 +106,28 @@ WindowsEventEngine::WindowsEventEngine() |
|
|
|
|
|
|
|
|
|
WindowsEventEngine::~WindowsEventEngine() { |
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
grpc_core::MutexLock lock(&task_mu_); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { |
|
|
|
|
for (auto handle : known_handles_) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"WindowsEventEngine:%p uncleared TaskHandle at shutdown:%s", |
|
|
|
|
this, HandleToString(handle).c_str()); |
|
|
|
|
this, HandleToString<EventEngine::TaskHandle>(handle).c_str()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(GPR_LIKELY(known_handles_.empty())); |
|
|
|
|
GPR_ASSERT(WSACleanup() == 0); |
|
|
|
|
timer_manager_.Shutdown(); |
|
|
|
|
} |
|
|
|
|
iocp_.Kick(); |
|
|
|
|
iocp_worker_.WaitForShutdown(); |
|
|
|
|
iocp_.Shutdown(); |
|
|
|
|
GPR_ASSERT(WSACleanup() == 0); |
|
|
|
|
timer_manager_.Shutdown(); |
|
|
|
|
executor_->Quiesce(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) { |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
grpc_core::MutexLock lock(&task_mu_); |
|
|
|
|
if (!known_handles_.contains(handle)) return false; |
|
|
|
|
auto* cd = reinterpret_cast<Closure*>(handle.keys[0]); |
|
|
|
|
auto* cd = reinterpret_cast<TimerClosure*>(handle.keys[0]); |
|
|
|
|
bool r = timer_manager_.TimerCancel(&cd->timer); |
|
|
|
|
known_handles_.erase(handle); |
|
|
|
|
if (r) delete cd; |
|
|
|
@ -117,16 +155,17 @@ void WindowsEventEngine::Run(EventEngine::Closure* closure) { |
|
|
|
|
EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal( |
|
|
|
|
Duration when, absl::AnyInvocable<void()> cb) { |
|
|
|
|
auto when_ts = ToTimestamp(timer_manager_.Now(), when); |
|
|
|
|
auto* cd = new Closure; |
|
|
|
|
auto* cd = new TimerClosure; |
|
|
|
|
cd->cb = std::move(cb); |
|
|
|
|
cd->engine = this; |
|
|
|
|
EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd), |
|
|
|
|
aba_token_.fetch_add(1)}; |
|
|
|
|
grpc_core::MutexLock lock(&mu_); |
|
|
|
|
grpc_core::MutexLock lock(&task_mu_); |
|
|
|
|
known_handles_.insert(handle); |
|
|
|
|
cd->handle = handle; |
|
|
|
|
GRPC_EVENT_ENGINE_TRACE("WindowsEventEngine:%p scheduling callback:%s", this, |
|
|
|
|
HandleToString(handle).c_str()); |
|
|
|
|
GRPC_EVENT_ENGINE_TRACE( |
|
|
|
|
"WindowsEventEngine:%p scheduling callback:%s", this, |
|
|
|
|
HandleToString<EventEngine::TaskHandle>(handle).c_str()); |
|
|
|
|
timer_manager_.TimerInit(&cd->timer, when_ts, cd); |
|
|
|
|
return handle; |
|
|
|
|
} |
|
|
|
@ -140,15 +179,194 @@ bool WindowsEventEngine::IsWorkerThread() { |
|
|
|
|
GPR_ASSERT(false && "unimplemented"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) { |
|
|
|
|
GPR_ASSERT(false && "unimplemented"); |
|
|
|
|
void WindowsEventEngine::OnConnectCompleted( |
|
|
|
|
std::shared_ptr<ConnectionState> state) { |
|
|
|
|
// Connection attempt complete!
|
|
|
|
|
grpc_core::MutexLock lock(&state->mu); |
|
|
|
|
state->on_connected = nullptr; |
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock handle_lock(&connection_mu_); |
|
|
|
|
known_connection_handles_.erase(state->connection_handle); |
|
|
|
|
} |
|
|
|
|
// return early if we cannot cancel the connection timeout timer.
|
|
|
|
|
if (!Cancel(state->timer_handle)) return; |
|
|
|
|
auto write_info = state->socket->write_info(); |
|
|
|
|
if (write_info->wsa_error() != 0) { |
|
|
|
|
auto error = GRPC_WSA_ERROR(write_info->wsa_error(), "ConnectEx"); |
|
|
|
|
state->socket->MaybeShutdown(error); |
|
|
|
|
state->on_connected_user_callback(error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// This code should be running in an executor thread already, so the callback
|
|
|
|
|
// can be run directly.
|
|
|
|
|
ChannelArgsEndpointConfig cfg; |
|
|
|
|
state->on_connected_user_callback(std::make_unique<WindowsEndpoint>( |
|
|
|
|
state->address, std::move(state->socket), std::move(state->allocator), |
|
|
|
|
cfg, executor_.get())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
EventEngine::ConnectionHandle WindowsEventEngine::Connect( |
|
|
|
|
OnConnectCallback on_connect, const ResolvedAddress& addr, |
|
|
|
|
const EndpointConfig& args, MemoryAllocator memory_allocator, |
|
|
|
|
Duration deadline) { |
|
|
|
|
GPR_ASSERT(false && "unimplemented"); |
|
|
|
|
const EndpointConfig& /* args */, MemoryAllocator memory_allocator, |
|
|
|
|
Duration timeout) { |
|
|
|
|
// TODO(hork): utilize the endpoint config
|
|
|
|
|
absl::Status status; |
|
|
|
|
int istatus; |
|
|
|
|
auto uri = ResolvedAddressToURI(addr); |
|
|
|
|
if (!uri.ok()) { |
|
|
|
|
Run([on_connect = std::move(on_connect), status = uri.status()]() mutable { |
|
|
|
|
on_connect(status); |
|
|
|
|
}); |
|
|
|
|
return invalid_connection_handle; |
|
|
|
|
} |
|
|
|
|
GRPC_EVENT_ENGINE_TRACE("EventEngine::%p connecting to %s", this, |
|
|
|
|
uri->c_str()); |
|
|
|
|
// Use dualstack sockets where available.
|
|
|
|
|
ResolvedAddress address = addr; |
|
|
|
|
ResolvedAddress addr6_v4mapped; |
|
|
|
|
if (ResolvedAddressToV4Mapped(addr, &addr6_v4mapped)) { |
|
|
|
|
address = addr6_v4mapped; |
|
|
|
|
} |
|
|
|
|
SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, |
|
|
|
|
IOCP::GetDefaultSocketFlags()); |
|
|
|
|
if (sock == INVALID_SOCKET) { |
|
|
|
|
Run([on_connect = std::move(on_connect), |
|
|
|
|
status = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket")]() mutable { |
|
|
|
|
on_connect(status); |
|
|
|
|
}); |
|
|
|
|
return invalid_connection_handle; |
|
|
|
|
} |
|
|
|
|
status = PrepareSocket(sock); |
|
|
|
|
if (!status.ok()) { |
|
|
|
|
Run([on_connect = std::move(on_connect), status]() mutable { |
|
|
|
|
on_connect(status); |
|
|
|
|
}); |
|
|
|
|
return invalid_connection_handle; |
|
|
|
|
} |
|
|
|
|
// Grab the function pointer for ConnectEx for that specific socket It may
|
|
|
|
|
// change depending on the interface.
|
|
|
|
|
LPFN_CONNECTEX ConnectEx; |
|
|
|
|
GUID guid = WSAID_CONNECTEX; |
|
|
|
|
DWORD ioctl_num_bytes; |
|
|
|
|
istatus = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, |
|
|
|
|
sizeof(guid), &ConnectEx, sizeof(ConnectEx), |
|
|
|
|
&ioctl_num_bytes, nullptr, nullptr); |
|
|
|
|
if (istatus != 0) { |
|
|
|
|
Run([on_connect = std::move(on_connect), |
|
|
|
|
status = GRPC_WSA_ERROR( |
|
|
|
|
WSAGetLastError(), |
|
|
|
|
"WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)")]() mutable { |
|
|
|
|
on_connect(status); |
|
|
|
|
}); |
|
|
|
|
return invalid_connection_handle; |
|
|
|
|
} |
|
|
|
|
// bind the local address
|
|
|
|
|
auto local_address = ResolvedAddressMakeWild6(0); |
|
|
|
|
istatus = bind(sock, local_address.address(), local_address.size()); |
|
|
|
|
if (istatus != 0) { |
|
|
|
|
Run([on_connect = std::move(on_connect), |
|
|
|
|
status = GRPC_WSA_ERROR(WSAGetLastError(), "bind")]() mutable { |
|
|
|
|
on_connect(status); |
|
|
|
|
}); |
|
|
|
|
return invalid_connection_handle; |
|
|
|
|
} |
|
|
|
|
// Connect
|
|
|
|
|
auto watched_socket = iocp_.Watch(sock); |
|
|
|
|
auto* info = watched_socket->write_info(); |
|
|
|
|
bool success = |
|
|
|
|
ConnectEx(watched_socket->socket(), address.address(), address.size(), |
|
|
|
|
nullptr, 0, nullptr, info->overlapped()); |
|
|
|
|
// It wouldn't be unusual to get a success immediately. But we'll still get an
|
|
|
|
|
// IOCP notification, so let's ignore it.
|
|
|
|
|
if (!success) { |
|
|
|
|
int last_error = WSAGetLastError(); |
|
|
|
|
if (last_error != ERROR_IO_PENDING) { |
|
|
|
|
auto status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); |
|
|
|
|
Run([on_connect = std::move(on_connect), status]() mutable { |
|
|
|
|
on_connect(status); |
|
|
|
|
}); |
|
|
|
|
watched_socket->MaybeShutdown(status); |
|
|
|
|
return invalid_connection_handle; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(watched_socket != nullptr); |
|
|
|
|
auto connection_state = std::make_shared<ConnectionState>(); |
|
|
|
|
grpc_core::MutexLock lock(&connection_state->mu); |
|
|
|
|
connection_state->address = address; |
|
|
|
|
connection_state->socket = std::move(watched_socket); |
|
|
|
|
connection_state->on_connected_user_callback = std::move(on_connect); |
|
|
|
|
connection_state->allocator = std::move(memory_allocator); |
|
|
|
|
connection_state->on_connected = |
|
|
|
|
SelfDeletingClosure::Create([this, connection_state]() mutable { |
|
|
|
|
OnConnectCompleted(std::move(connection_state)); |
|
|
|
|
}); |
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock conn_lock(&connection_mu_); |
|
|
|
|
connection_state->connection_handle = |
|
|
|
|
ConnectionHandle{reinterpret_cast<intptr_t>(connection_state.get()), |
|
|
|
|
aba_token_.fetch_add(1)}; |
|
|
|
|
known_connection_handles_.insert(connection_state->connection_handle); |
|
|
|
|
} |
|
|
|
|
connection_state->timer_handle = |
|
|
|
|
RunAfter(timeout, [this, connection_state]() { |
|
|
|
|
grpc_core::MutexLock lock(&connection_state->mu); |
|
|
|
|
if (CancelConnectFromDeadlineTimer(connection_state.get())) { |
|
|
|
|
connection_state->on_connected_user_callback( |
|
|
|
|
absl::DeadlineExceededError("Connection timed out")); |
|
|
|
|
} |
|
|
|
|
// else: The connection attempt could not be canceled. We can assume the
|
|
|
|
|
// connection callback will be called.
|
|
|
|
|
}); |
|
|
|
|
connection_state->socket->NotifyOnWrite(connection_state->on_connected); |
|
|
|
|
return connection_state->connection_handle; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) { |
|
|
|
|
if (TaskHandleComparator<ConnectionHandle>::Eq()(handle, |
|
|
|
|
invalid_connection_handle)) { |
|
|
|
|
GRPC_EVENT_ENGINE_TRACE("%s", |
|
|
|
|
"Attempted to cancel an invalid connection handle"); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
// Erase the connection handle, which may be unknown
|
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock lock(&connection_mu_); |
|
|
|
|
if (!known_connection_handles_.contains(handle)) { |
|
|
|
|
GRPC_EVENT_ENGINE_TRACE( |
|
|
|
|
"Unknown connection handle: %s", |
|
|
|
|
HandleToString<EventEngine::ConnectionHandle>(handle).c_str()); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
known_connection_handles_.erase(handle); |
|
|
|
|
} |
|
|
|
|
auto* connection_state = reinterpret_cast<ConnectionState*>(handle.keys[0]); |
|
|
|
|
grpc_core::MutexLock state_lock(&connection_state->mu); |
|
|
|
|
if (!Cancel(connection_state->timer_handle)) return false; |
|
|
|
|
return CancelConnectInternalStateLocked(connection_state); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool WindowsEventEngine::CancelConnectFromDeadlineTimer( |
|
|
|
|
ConnectionState* connection_state) { |
|
|
|
|
// Erase the connection handle, which is guaranteed to exist.
|
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock lock(&connection_mu_); |
|
|
|
|
GPR_ASSERT(known_connection_handles_.erase( |
|
|
|
|
connection_state->connection_handle) == 1); |
|
|
|
|
} |
|
|
|
|
return CancelConnectInternalStateLocked(connection_state); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool WindowsEventEngine::CancelConnectInternalStateLocked( |
|
|
|
|
ConnectionState* connection_state) { |
|
|
|
|
connection_state->socket->MaybeShutdown( |
|
|
|
|
absl::CancelledError("CancelConnect")); |
|
|
|
|
// Release the connection_state shared_ptr. connection_state is now invalid.
|
|
|
|
|
delete connection_state->on_connected; |
|
|
|
|
GRPC_EVENT_ENGINE_TRACE("Successfully cancelled connection %s", |
|
|
|
|
HandleToString<EventEngine::ConnectionHandle>( |
|
|
|
|
connection_state->connection_handle) |
|
|
|
|
.c_str()); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> |
|
|
|
|