mirror of https://github.com/grpc/grpc.git
This reverts commit 52402afdd4
.
pull/30470/head
parent
756498e157
commit
9d4e0e17fe
74 changed files with 570 additions and 2984 deletions
@ -1,38 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_EXECUTOR_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_EXECUTOR_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "absl/functional/any_invocable.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
// A subset of the EventEngine execution API. See event_engine.h for details
|
||||
class Executor { |
||||
public: |
||||
virtual ~Executor() = default; |
||||
virtual void Run(EventEngine::Closure* closure) = 0; |
||||
virtual void Run(absl::AnyInvocable<void()> closure) = 0; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_EXECUTOR_H
|
@ -1,36 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/executor/threaded_executor.h" |
||||
|
||||
#include <utility> |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
ThreadedExecutor::ThreadedExecutor(int reserve_threads) |
||||
: thread_pool_(reserve_threads){}; |
||||
|
||||
void ThreadedExecutor::Run(EventEngine::Closure* closure) { |
||||
thread_pool_.Add([closure]() { closure->Run(); }); |
||||
} |
||||
|
||||
void ThreadedExecutor::Run(absl::AnyInvocable<void()> closure) { |
||||
thread_pool_.Add(std::move(closure)); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
@ -1,44 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_THREADED_EXECUTOR_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_THREADED_EXECUTOR_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "absl/functional/any_invocable.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/event_engine/executor/executor.h" |
||||
#include "src/core/lib/event_engine/thread_pool.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
class ThreadedExecutor : public Executor { |
||||
public: |
||||
explicit ThreadedExecutor(int reserve_threads); |
||||
~ThreadedExecutor() override = default; |
||||
void Run(EventEngine::Closure* closure) override; |
||||
void Run(absl::AnyInvocable<void()> closure) override; |
||||
|
||||
private: |
||||
ThreadPool thread_pool_; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_EXECUTOR_THREADED_EXECUTOR_H
|
@ -1,125 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/thread_pool.h" |
||||
|
||||
#include <utility> |
||||
|
||||
#include "src/core/lib/gprpp/thd.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace posix_engine { |
||||
|
||||
ThreadPool::Thread::Thread(ThreadPool* pool) |
||||
: pool_(pool), |
||||
thd_( |
||||
"posix_eventengine_pool", |
||||
[](void* th) { static_cast<ThreadPool::Thread*>(th)->ThreadFunc(); }, |
||||
this) { |
||||
thd_.Start(); |
||||
} |
||||
ThreadPool::Thread::~Thread() { thd_.Join(); } |
||||
|
||||
void ThreadPool::Thread::ThreadFunc() { |
||||
pool_->ThreadFunc(); |
||||
// Now that we have killed ourselves, we should reduce the thread count
|
||||
grpc_core::MutexLock lock(&pool_->mu_); |
||||
pool_->nthreads_--; |
||||
// Move ourselves to dead list
|
||||
pool_->dead_threads_.push_back(this); |
||||
|
||||
if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { |
||||
pool_->shutdown_cv_.Signal(); |
||||
} |
||||
} |
||||
|
||||
void ThreadPool::ThreadFunc() { |
||||
for (;;) { |
||||
// Wait until work is available or we are shutting down.
|
||||
grpc_core::ReleasableMutexLock lock(&mu_); |
||||
if (!shutdown_ && callbacks_.empty()) { |
||||
// If there are too many threads waiting, then quit this thread
|
||||
if (threads_waiting_ >= reserve_threads_) { |
||||
break; |
||||
} |
||||
threads_waiting_++; |
||||
cv_.Wait(&mu_); |
||||
threads_waiting_--; |
||||
} |
||||
// Drain callbacks before considering shutdown to ensure all work
|
||||
// gets completed.
|
||||
if (!callbacks_.empty()) { |
||||
auto cb = std::move(callbacks_.front()); |
||||
callbacks_.pop(); |
||||
lock.Release(); |
||||
cb(); |
||||
} else if (shutdown_) { |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
|
||||
ThreadPool::ThreadPool(int reserve_threads) |
||||
: shutdown_(false), |
||||
reserve_threads_(reserve_threads), |
||||
nthreads_(0), |
||||
threads_waiting_(0) { |
||||
for (int i = 0; i < reserve_threads_; i++) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
nthreads_++; |
||||
new Thread(this); |
||||
} |
||||
} |
||||
|
||||
void ThreadPool::ReapThreads(std::vector<Thread*>* tlist) { |
||||
for (auto* t : *tlist) delete t; |
||||
tlist->clear(); |
||||
} |
||||
|
||||
ThreadPool::~ThreadPool() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
shutdown_ = true; |
||||
cv_.SignalAll(); |
||||
while (nthreads_ != 0) { |
||||
shutdown_cv_.Wait(&mu_); |
||||
} |
||||
ReapThreads(&dead_threads_); |
||||
} |
||||
|
||||
void ThreadPool::Add(absl::AnyInvocable<void()> callback) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
// Add works to the callbacks list
|
||||
callbacks_.push(std::move(callback)); |
||||
// Increase pool size or notify as needed
|
||||
if (threads_waiting_ == 0) { |
||||
// Kick off a new thread
|
||||
nthreads_++; |
||||
new Thread(this); |
||||
} else { |
||||
cv_.Signal(); |
||||
} |
||||
// Also use this chance to harvest dead threads
|
||||
if (!dead_threads_.empty()) { |
||||
ReapThreads(&dead_threads_); |
||||
} |
||||
} |
||||
|
||||
} // namespace posix_engine
|
||||
} // namespace grpc_event_engine
|
@ -1,55 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_SOCKET_NOTIFIER_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_SOCKET_NOTIFIER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "absl/status/status.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
// Generically wraps a socket/fd, and manages the registration of callbacks and
|
||||
// triggering of notifications on it.
|
||||
class SocketNotifier { |
||||
public: |
||||
virtual ~SocketNotifier() = default; |
||||
// Schedule on_read to be invoked when the underlying socket
|
||||
// becomes readable.
|
||||
// If the socket is already readable, the callback will be executed as soon as
|
||||
// possible.
|
||||
virtual void NotifyOnRead(EventEngine::Closure* on_read) = 0; |
||||
// Schedule on_write to be invoked when the underlying socket
|
||||
// becomes writable.
|
||||
// If the socket is already writable, the callback will be executed as soon as
|
||||
// possible.
|
||||
virtual void NotifyOnWrite(EventEngine::Closure* on_write) = 0; |
||||
// Set a readable event on the underlying socket.
|
||||
virtual void SetReadable() = 0; |
||||
// Set a writable event on the underlying socket.
|
||||
virtual void SetWritable() = 0; |
||||
// Shutdown a SocketNotifier. After this operation, NotifyXXX and SetXXX
|
||||
// operations cannot be performed.
|
||||
virtual void MaybeShutdown(absl::Status why) = 0; |
||||
// Returns true if the SocketNotifier has been shutdown.
|
||||
virtual bool IsShutdown() = 0; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_SOCKET_NOTIFIER_H
|
@ -1,49 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/event_engine/utils.h" |
||||
|
||||
#include <stdint.h> |
||||
|
||||
#include <algorithm> |
||||
#include <chrono> |
||||
|
||||
#include "absl/strings/str_cat.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/gprpp/time.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
std::string HandleToString(EventEngine::TaskHandle handle) { |
||||
return absl::StrCat("{", handle.keys[0], ",", handle.keys[1], "}"); |
||||
} |
||||
|
||||
grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now, |
||||
EventEngine::Duration delta) { |
||||
return now + |
||||
std::max(grpc_core::Duration::Milliseconds(1), |
||||
grpc_core::Duration::NanosecondsRoundUp(delta.count())) + |
||||
grpc_core::Duration::Milliseconds(1); |
||||
} |
||||
|
||||
size_t Milliseconds(EventEngine::Duration d) { |
||||
return std::chrono::duration_cast<std::chrono::milliseconds>(d).count(); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
@ -1,40 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_UTILS_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_UTILS_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include <string> |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/gprpp/time.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
std::string HandleToString(EventEngine::TaskHandle handle); |
||||
|
||||
grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now, |
||||
EventEngine::Duration delta); |
||||
|
||||
size_t Milliseconds(EventEngine::Duration d); |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_UTILS_H
|
@ -1,148 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_WINDOWS |
||||
|
||||
#include <chrono> |
||||
|
||||
#include "absl/strings/str_format.h" |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log_windows.h> |
||||
|
||||
#include "src/core/lib/event_engine/trace.h" |
||||
#include "src/core/lib/event_engine/utils.h" |
||||
#include "src/core/lib/event_engine/windows/iocp.h" |
||||
#include "src/core/lib/event_engine/windows/win_socket.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
IOCP::IOCP(Executor* executor) noexcept |
||||
: executor_(executor), |
||||
iocp_handle_(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, |
||||
(ULONG_PTR)NULL, 0)) { |
||||
GPR_ASSERT(iocp_handle_); |
||||
WSASocketFlagsInit(); |
||||
} |
||||
|
||||
// Shutdown must be called prior to deletion
|
||||
IOCP::~IOCP() {} |
||||
|
||||
WinSocket* IOCP::Watch(SOCKET socket) { |
||||
WinSocket* wrapped_socket = new WinSocket(socket, executor_); |
||||
HANDLE ret = |
||||
CreateIoCompletionPort(reinterpret_cast<HANDLE>(socket), iocp_handle_, |
||||
reinterpret_cast<uintptr_t>(wrapped_socket), 0); |
||||
if (!ret) { |
||||
char* utf8_message = gpr_format_message(WSAGetLastError()); |
||||
gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message); |
||||
gpr_free(utf8_message); |
||||
__debugbreak(); |
||||
abort(); |
||||
} |
||||
GPR_ASSERT(ret == iocp_handle_); |
||||
return wrapped_socket; |
||||
} |
||||
|
||||
void IOCP::Shutdown() { |
||||
while (outstanding_kicks_.load() > 0) { |
||||
Work(std::chrono::hours(42)); |
||||
} |
||||
GPR_ASSERT(CloseHandle(iocp_handle_)); |
||||
} |
||||
|
||||
Poller::WorkResult IOCP::Work(EventEngine::Duration timeout) { |
||||
static const absl::Status kDeadlineExceeded = absl::DeadlineExceededError( |
||||
absl::StrFormat("IOCP::%p: Received no completions", this)); |
||||
static const absl::Status kKicked = |
||||
absl::AbortedError(absl::StrFormat("IOCP::%p: Awoken from a kick", this)); |
||||
DWORD bytes = 0; |
||||
ULONG_PTR completion_key; |
||||
LPOVERLAPPED overlapped; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { |
||||
gpr_log(GPR_DEBUG, "IOCP::%p doing work", this); |
||||
} |
||||
BOOL success = GetQueuedCompletionStatus( |
||||
iocp_handle_, &bytes, &completion_key, &overlapped, |
||||
static_cast<DWORD>(Milliseconds(timeout))); |
||||
if (success == 0 && overlapped == NULL) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { |
||||
gpr_log(GPR_DEBUG, "IOCP::%p deadline exceeded", this); |
||||
} |
||||
return Poller::DeadlineExceeded{}; |
||||
} |
||||
GPR_ASSERT(completion_key && overlapped); |
||||
if (overlapped == &kick_overlap_) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { |
||||
gpr_log(GPR_DEBUG, "IOCP::%p kicked", this); |
||||
} |
||||
outstanding_kicks_.fetch_sub(1); |
||||
if (completion_key == (ULONG_PTR)&kick_token_) { |
||||
return Poller::Kicked{}; |
||||
} |
||||
gpr_log(GPR_ERROR, "Unknown custom completion key: %p", completion_key); |
||||
abort(); |
||||
} |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { |
||||
gpr_log(GPR_DEBUG, "IOCP::%p got event on OVERLAPPED::%p", this, |
||||
overlapped); |
||||
} |
||||
WinSocket* socket = reinterpret_cast<WinSocket*>(completion_key); |
||||
WinSocket::OpState* info = socket->GetOpInfoForOverlapped(overlapped); |
||||
GPR_ASSERT(info != nullptr); |
||||
if (socket->IsShutdown()) { |
||||
info->SetError(WSAESHUTDOWN); |
||||
} else { |
||||
info->GetOverlappedResult(); |
||||
} |
||||
if (info->closure() != nullptr) return Events{info->closure()}; |
||||
// No callback registered. Set ready and return an empty set
|
||||
info->SetReady(); |
||||
return Events{}; |
||||
} |
||||
|
||||
void IOCP::Kick() { |
||||
outstanding_kicks_.fetch_add(1); |
||||
GPR_ASSERT(PostQueuedCompletionStatus( |
||||
iocp_handle_, 0, reinterpret_cast<ULONG_PTR>(&kick_token_), |
||||
&kick_overlap_)); |
||||
} |
||||
|
||||
DWORD IOCP::GetDefaultSocketFlags() { |
||||
static DWORD wsa_socket_flags = WSASocketFlagsInit(); |
||||
return wsa_socket_flags; |
||||
} |
||||
|
||||
DWORD IOCP::WSASocketFlagsInit() { |
||||
DWORD wsa_socket_flags = WSA_FLAG_OVERLAPPED; |
||||
/* WSA_FLAG_NO_HANDLE_INHERIT may be not supported on the older Windows
|
||||
versions, see |
||||
https://msdn.microsoft.com/en-us/library/windows/desktop/ms742212(v=vs.85).aspx
|
||||
for details. */ |
||||
SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, |
||||
wsa_socket_flags | WSA_FLAG_NO_HANDLE_INHERIT); |
||||
if (sock != INVALID_SOCKET) { |
||||
/* Windows 7, Windows 2008 R2 with SP1 or later */ |
||||
wsa_socket_flags |= WSA_FLAG_NO_HANDLE_INHERIT; |
||||
closesocket(sock); |
||||
} |
||||
return wsa_socket_flags; |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GPR_WINDOWS
|
@ -1,68 +0,0 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_IOCP_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_IOCP_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_WINDOWS |
||||
|
||||
#include "absl/status/status.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/event_engine/executor/executor.h" |
||||
#include "src/core/lib/event_engine/poller.h" |
||||
#include "src/core/lib/event_engine/windows/win_socket.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
class IOCP final : public Poller { |
||||
public: |
||||
explicit IOCP(Executor* executor) noexcept; |
||||
~IOCP(); |
||||
// Not copyable
|
||||
IOCP(const IOCP&) = delete; |
||||
IOCP& operator=(const IOCP&) = delete; |
||||
// Not moveable
|
||||
IOCP(IOCP&& other) = delete; |
||||
IOCP& operator=(IOCP&& other) = delete; |
||||
|
||||
// interface methods
|
||||
void Shutdown(); |
||||
WorkResult Work(EventEngine::Duration timeout) override; |
||||
void Kick() override; |
||||
|
||||
WinSocket* Watch(SOCKET socket); |
||||
// Return the set of default flags
|
||||
static DWORD GetDefaultSocketFlags(); |
||||
|
||||
private: |
||||
// Initialize default flags via checking platform support
|
||||
static DWORD WSASocketFlagsInit(); |
||||
|
||||
Executor* executor_; |
||||
HANDLE iocp_handle_; |
||||
OVERLAPPED kick_overlap_; |
||||
ULONG kick_token_; |
||||
std::atomic<int> outstanding_kicks_{0}; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif |
||||
|
||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_IOCP_H
|
@ -1,200 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_WINDOWS |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log_windows.h> |
||||
|
||||
#include "src/core/lib/event_engine/executor/executor.h" |
||||
#include "src/core/lib/event_engine/trace.h" |
||||
#include "src/core/lib/event_engine/windows/win_socket.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
|
||||
#if defined(__MSYS__) && defined(GPR_ARCH_64) |
||||
/* Nasty workaround for nasty bug when using the 64 bits msys compiler
|
||||
in conjunction with Microsoft Windows headers. */ |
||||
#define GRPC_FIONBIO _IOW('f', 126, uint32_t) |
||||
#else |
||||
#define GRPC_FIONBIO FIONBIO |
||||
#endif |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
WinSocket::WinSocket(SOCKET socket, Executor* executor) noexcept |
||||
: socket_(socket), |
||||
executor_(executor), |
||||
read_info_(OpState(this)), |
||||
write_info_(OpState(this)) {} |
||||
|
||||
WinSocket::~WinSocket() { GPR_ASSERT(is_shutdown_); } |
||||
|
||||
SOCKET WinSocket::socket() { return socket_; } |
||||
|
||||
void WinSocket::MaybeShutdown(absl::Status why) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
// if already shutdown, return early. Otherwise, set the shutdown flag.
|
||||
if (is_shutdown_) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { |
||||
gpr_log(GPR_DEBUG, "WinSocket::%p already shutting down", this); |
||||
} |
||||
return; |
||||
} |
||||
is_shutdown_ = true; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { |
||||
gpr_log(GPR_DEBUG, "WinSocket::%p shutting down now. Reason: %s", this, |
||||
why.ToString().c_str()); |
||||
} |
||||
// Grab the function pointer for DisconnectEx for that specific socket.
|
||||
// It may change depending on the interface.
|
||||
GUID guid = WSAID_DISCONNECTEX; |
||||
LPFN_DISCONNECTEX DisconnectEx; |
||||
DWORD ioctl_num_bytes; |
||||
int status = WSAIoctl(socket_, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, |
||||
sizeof(guid), &DisconnectEx, sizeof(DisconnectEx), |
||||
&ioctl_num_bytes, NULL, NULL); |
||||
|
||||
if (status == 0) { |
||||
DisconnectEx(socket_, NULL, 0, 0); |
||||
} else { |
||||
char* utf8_message = gpr_format_message(WSAGetLastError()); |
||||
gpr_log(GPR_INFO, "Unable to retrieve DisconnectEx pointer : %s", |
||||
utf8_message); |
||||
gpr_free(utf8_message); |
||||
} |
||||
closesocket(socket_); |
||||
} |
||||
|
||||
void WinSocket::NotifyOnReady(OpState& info, EventEngine::Closure* closure) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (IsShutdown()) { |
||||
info.SetError(WSAESHUTDOWN); |
||||
executor_->Run(closure); |
||||
return; |
||||
}; |
||||
if (absl::exchange(info.has_pending_iocp_, false)) { |
||||
executor_->Run(closure); |
||||
} else { |
||||
info.closure_ = closure; |
||||
} |
||||
} |
||||
|
||||
void WinSocket::NotifyOnRead(EventEngine::Closure* on_read) { |
||||
NotifyOnReady(read_info_, on_read); |
||||
} |
||||
|
||||
void WinSocket::NotifyOnWrite(EventEngine::Closure* on_write) { |
||||
NotifyOnReady(write_info_, on_write); |
||||
} |
||||
|
||||
WinSocket::OpState::OpState(WinSocket* win_socket) noexcept |
||||
: win_socket_(win_socket), closure_(nullptr) {} |
||||
|
||||
void WinSocket::OpState::SetReady() { |
||||
grpc_core::MutexLock lock(&win_socket_->mu_); |
||||
GPR_ASSERT(!has_pending_iocp_); |
||||
if (closure_) { |
||||
win_socket_->executor_->Run(closure_); |
||||
} else { |
||||
has_pending_iocp_ = true; |
||||
} |
||||
} |
||||
|
||||
void WinSocket::OpState::SetError(int wsa_error) { |
||||
bytes_transferred_ = 0; |
||||
wsa_error_ = wsa_error; |
||||
} |
||||
|
||||
void WinSocket::OpState::GetOverlappedResult() { |
||||
DWORD flags = 0; |
||||
DWORD bytes; |
||||
BOOL success = WSAGetOverlappedResult(win_socket_->socket(), &overlapped_, |
||||
&bytes, FALSE, &flags); |
||||
bytes_transferred_ = bytes; |
||||
wsa_error_ = success ? 0 : WSAGetLastError(); |
||||
} |
||||
|
||||
void WinSocket::SetReadable() { read_info_.SetReady(); } |
||||
|
||||
void WinSocket::SetWritable() { write_info_.SetReady(); } |
||||
|
||||
bool WinSocket::IsShutdown() { return is_shutdown_; } |
||||
|
||||
WinSocket::OpState* WinSocket::GetOpInfoForOverlapped(OVERLAPPED* overlapped) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { |
||||
gpr_log(GPR_DEBUG, |
||||
"WinSocket::%p looking for matching OVERLAPPED::%p. " |
||||
"read(%p) write(%p)", |
||||
this, overlapped, &read_info_.overlapped_, |
||||
&write_info_.overlapped_); |
||||
} |
||||
if (overlapped == &read_info_.overlapped_) return &read_info_; |
||||
if (overlapped == &write_info_.overlapped_) return &write_info_; |
||||
return nullptr; |
||||
} |
||||
|
||||
namespace { |
||||
|
||||
grpc_error_handle grpc_tcp_set_non_block(SOCKET sock) { |
||||
int status; |
||||
uint32_t param = 1; |
||||
DWORD ret; |
||||
status = WSAIoctl(sock, GRPC_FIONBIO, ¶m, sizeof(param), NULL, 0, &ret, |
||||
NULL, NULL); |
||||
return status == 0 |
||||
? GRPC_ERROR_NONE |
||||
: GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)"); |
||||
} |
||||
|
||||
static grpc_error_handle set_dualstack(SOCKET sock) { |
||||
int status; |
||||
DWORD param = 0; |
||||
status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)¶m, |
||||
sizeof(param)); |
||||
return status == 0 |
||||
? GRPC_ERROR_NONE |
||||
: GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)"); |
||||
} |
||||
|
||||
static grpc_error_handle enable_socket_low_latency(SOCKET sock) { |
||||
int status; |
||||
BOOL param = TRUE; |
||||
status = ::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, |
||||
reinterpret_cast<char*>(¶m), sizeof(param)); |
||||
if (status == SOCKET_ERROR) { |
||||
status = WSAGetLastError(); |
||||
} |
||||
return status == 0 ? GRPC_ERROR_NONE |
||||
: GRPC_WSA_ERROR(status, "setsockopt(TCP_NODELAY)"); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
absl::Status PrepareSocket(SOCKET sock) { |
||||
absl::Status err; |
||||
err = grpc_tcp_set_non_block(sock); |
||||
if (!GRPC_ERROR_IS_NONE(err)) return err; |
||||
err = enable_socket_low_latency(sock); |
||||
if (!GRPC_ERROR_IS_NONE(err)) return err; |
||||
err = set_dualstack(sock); |
||||
if (!GRPC_ERROR_IS_NONE(err)) return err; |
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GPR_WINDOWS
|
@ -1,112 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WIN_SOCKET_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WIN_SOCKET_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_WINDOWS |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/functional/any_invocable.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/event_engine/executor/executor.h" |
||||
#include "src/core/lib/event_engine/socket_notifier.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
class WinSocket final : public SocketNotifier { |
||||
public: |
||||
// State related to a Read or Write socket operation
|
||||
class OpState { |
||||
public: |
||||
explicit OpState(WinSocket* win_socket) noexcept; |
||||
// Signal a result has returned
|
||||
// If a callback is already primed for notification, it will be executed via
|
||||
// the WinSocket's Executor. Otherwise, a "pending iocp" flag will
|
||||
// be set.
|
||||
void SetReady(); |
||||
// Set error results for a completed op
|
||||
void SetError(int wsa_error); |
||||
// Retrieve results of overlapped operation (via Winsock API)
|
||||
void GetOverlappedResult(); |
||||
// OVERLAPPED, needed for Winsock API calls
|
||||
OVERLAPPED* overlapped() { return &overlapped_; } |
||||
// Data from the previous operation, set via GetOverlappedResult
|
||||
DWORD bytes_transferred() const { return bytes_transferred_; } |
||||
// Previous error if set.
|
||||
int wsa_error() const { return wsa_error_; } |
||||
EventEngine::Closure* closure() { return closure_; } |
||||
|
||||
private: |
||||
friend class WinSocket; |
||||
|
||||
OVERLAPPED overlapped_; |
||||
WinSocket* win_socket_ = nullptr; |
||||
EventEngine::Closure* closure_ = nullptr; |
||||
bool has_pending_iocp_ = false; |
||||
DWORD bytes_transferred_; |
||||
int wsa_error_; |
||||
}; |
||||
|
||||
WinSocket(SOCKET socket, Executor* executor) noexcept; |
||||
~WinSocket(); |
||||
// Calling NotifyOnRead means either of two things:
|
||||
// - The IOCP already completed in the background, and we need to call
|
||||
// the callback now.
|
||||
// - The IOCP hasn't completed yet, and we're queuing it for later.
|
||||
void NotifyOnRead(EventEngine::Closure* on_read) override; |
||||
void NotifyOnWrite(EventEngine::Closure* on_write) override; |
||||
void SetReadable() override; |
||||
void SetWritable() override; |
||||
// Schedule a shutdown of the socket operations. Will call the pending
|
||||
// operations to abort them. We need to do that this way because of the
|
||||
// various callsites of that function, which happens to be in various
|
||||
// mutex hold states, and that'd be unsafe to call them directly.
|
||||
void MaybeShutdown(absl::Status why) override; |
||||
bool IsShutdown() override; |
||||
|
||||
// Return the appropriate OpState for a given OVERLAPPED
|
||||
// Returns nullptr if the overlapped does not match either read or write ops.
|
||||
OpState* GetOpInfoForOverlapped(OVERLAPPED* overlapped); |
||||
// Getters for the operation state data.
|
||||
OpState* read_info() { return &read_info_; } |
||||
OpState* write_info() { return &write_info_; } |
||||
// Accessor method for underlying socket
|
||||
SOCKET socket(); |
||||
|
||||
private: |
||||
void NotifyOnReady(OpState& info, EventEngine::Closure* callback); |
||||
|
||||
SOCKET socket_; |
||||
grpc_core::Mutex mu_; |
||||
bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false; |
||||
Executor* executor_; |
||||
OpState read_info_ ABSL_GUARDED_BY(mu_); |
||||
OpState write_info_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
// Attempt to configure default socket settings
|
||||
absl::Status PrepareSocket(SOCKET sock); |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif |
||||
|
||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WIN_SOCKET_H
|
@ -1,159 +0,0 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_WINDOWS |
||||
|
||||
#include <memory> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/event_engine/endpoint_config.h> |
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/event_engine/memory_allocator.h> |
||||
#include <grpc/event_engine/slice_buffer.h> |
||||
|
||||
#include "src/core/lib/event_engine/executor/threaded_executor.h" |
||||
#include "src/core/lib/event_engine/handle_containers.h" |
||||
#include "src/core/lib/event_engine/posix_engine/timer_manager.h" |
||||
#include "src/core/lib/event_engine/trace.h" |
||||
#include "src/core/lib/event_engine/utils.h" |
||||
#include "src/core/lib/event_engine/windows/iocp.h" |
||||
#include "src/core/lib/event_engine/windows/windows_engine.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
// TODO(hork): The iomgr timer and execution engine can be reused. It should
|
||||
// be separated out from the posix_engine and instantiated as components. It is
|
||||
// effectively copied below.
|
||||
|
||||
struct WindowsEventEngine::Closure final : public EventEngine::Closure { |
||||
absl::AnyInvocable<void()> cb; |
||||
posix_engine::Timer timer; |
||||
WindowsEventEngine* engine; |
||||
EventEngine::TaskHandle handle; |
||||
|
||||
void Run() override { |
||||
GRPC_EVENT_ENGINE_TRACE("WindowsEventEngine:%p executing callback:%s", |
||||
engine, HandleToString(handle).c_str()); |
||||
{ |
||||
grpc_core::MutexLock lock(&engine->mu_); |
||||
engine->known_handles_.erase(handle); |
||||
} |
||||
cb(); |
||||
delete this; |
||||
} |
||||
}; |
||||
|
||||
WindowsEventEngine::WindowsEventEngine() : iocp_(&executor_) { |
||||
WSADATA wsaData; |
||||
int status = WSAStartup(MAKEWORD(2, 0), &wsaData); |
||||
GPR_ASSERT(status == 0); |
||||
} |
||||
|
||||
WindowsEventEngine::~WindowsEventEngine() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { |
||||
for (auto handle : known_handles_) { |
||||
gpr_log(GPR_ERROR, |
||||
"WindowsEventEngine:%p uncleared TaskHandle at shutdown:%s", this, |
||||
HandleToString(handle).c_str()); |
||||
} |
||||
} |
||||
GPR_ASSERT(GPR_LIKELY(known_handles_.empty())); |
||||
GPR_ASSERT(WSACleanup() == 0); |
||||
} |
||||
|
||||
bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (!known_handles_.contains(handle)) return false; |
||||
auto* cd = reinterpret_cast<Closure*>(handle.keys[0]); |
||||
bool r = timer_manager_.TimerCancel(&cd->timer); |
||||
known_handles_.erase(handle); |
||||
if (r) delete cd; |
||||
return r; |
||||
} |
||||
|
||||
EventEngine::TaskHandle WindowsEventEngine::RunAfter( |
||||
Duration when, absl::AnyInvocable<void()> closure) { |
||||
return RunAfterInternal(when, std::move(closure)); |
||||
} |
||||
|
||||
EventEngine::TaskHandle WindowsEventEngine::RunAfter( |
||||
Duration when, EventEngine::Closure* closure) { |
||||
return RunAfterInternal(when, [closure]() { closure->Run(); }); |
||||
} |
||||
|
||||
void WindowsEventEngine::Run(absl::AnyInvocable<void()> closure) { |
||||
executor_.Run(std::move(closure)); |
||||
} |
||||
|
||||
void WindowsEventEngine::Run(EventEngine::Closure* closure) { |
||||
executor_.Run(closure); |
||||
} |
||||
|
||||
EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal( |
||||
Duration when, absl::AnyInvocable<void()> cb) { |
||||
auto when_ts = ToTimestamp(timer_manager_.Now(), when); |
||||
auto* cd = new Closure; |
||||
cd->cb = std::move(cb); |
||||
cd->engine = this; |
||||
EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd), |
||||
aba_token_.fetch_add(1)}; |
||||
grpc_core::MutexLock lock(&mu_); |
||||
known_handles_.insert(handle); |
||||
cd->handle = handle; |
||||
GRPC_EVENT_ENGINE_TRACE("WindowsEventEngine:%p scheduling callback:%s", this, |
||||
HandleToString(handle).c_str()); |
||||
timer_manager_.TimerInit(&cd->timer, when_ts, cd); |
||||
return handle; |
||||
} |
||||
|
||||
std::unique_ptr<EventEngine::DNSResolver> WindowsEventEngine::GetDNSResolver( |
||||
EventEngine::DNSResolver::ResolverOptions const& /*options*/) { |
||||
GPR_ASSERT(false && "unimplemented"); |
||||
} |
||||
|
||||
bool WindowsEventEngine::IsWorkerThread() { |
||||
GPR_ASSERT(false && "unimplemented"); |
||||
} |
||||
|
||||
bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) { |
||||
GPR_ASSERT(false && "unimplemented"); |
||||
} |
||||
|
||||
EventEngine::ConnectionHandle WindowsEventEngine::Connect( |
||||
OnConnectCallback on_connect, const ResolvedAddress& addr, |
||||
const EndpointConfig& args, MemoryAllocator memory_allocator, |
||||
Duration deadline) { |
||||
GPR_ASSERT(false && "unimplemented"); |
||||
} |
||||
|
||||
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> |
||||
WindowsEventEngine::CreateListener( |
||||
Listener::AcceptCallback on_accept, |
||||
absl::AnyInvocable<void(absl::Status)> on_shutdown, |
||||
const EndpointConfig& config, |
||||
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) { |
||||
GPR_ASSERT(false && "unimplemented"); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
#endif // GPR_WINDOWS
|
@ -1,120 +0,0 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H |
||||
#define GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H |
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_WINDOWS |
||||
|
||||
#include <memory> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/event_engine/endpoint_config.h> |
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/event_engine/memory_allocator.h> |
||||
#include <grpc/event_engine/slice_buffer.h> |
||||
|
||||
#include "src/core/lib/event_engine/executor/threaded_executor.h" |
||||
#include "src/core/lib/event_engine/handle_containers.h" |
||||
#include "src/core/lib/event_engine/posix_engine/timer_manager.h" |
||||
#include "src/core/lib/event_engine/windows/iocp.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
class WindowsEventEngine : public EventEngine { |
||||
public: |
||||
class WindowsEndpoint : public EventEngine::Endpoint { |
||||
public: |
||||
~WindowsEndpoint() override; |
||||
void Read(absl::AnyInvocable<void(absl::Status)> on_read, |
||||
SliceBuffer* buffer, const ReadArgs* args) override; |
||||
void Write(absl::AnyInvocable<void(absl::Status)> on_writable, |
||||
SliceBuffer* data, const WriteArgs* args) override; |
||||
const ResolvedAddress& GetPeerAddress() const override; |
||||
const ResolvedAddress& GetLocalAddress() const override; |
||||
}; |
||||
class WindowsListener : public EventEngine::Listener { |
||||
public: |
||||
~WindowsListener() override; |
||||
absl::StatusOr<int> Bind(const ResolvedAddress& addr) override; |
||||
absl::Status Start() override; |
||||
}; |
||||
class WindowsDNSResolver : public EventEngine::DNSResolver { |
||||
public: |
||||
~WindowsDNSResolver() override; |
||||
LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve, |
||||
absl::string_view name, |
||||
absl::string_view default_port, |
||||
Duration timeout) override; |
||||
LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve, |
||||
absl::string_view name, |
||||
Duration timeout) override; |
||||
LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve, |
||||
absl::string_view name, |
||||
Duration timeout) override; |
||||
bool CancelLookup(LookupTaskHandle handle) override; |
||||
}; |
||||
|
||||
WindowsEventEngine(); |
||||
~WindowsEventEngine() override; |
||||
|
||||
absl::StatusOr<std::unique_ptr<Listener>> CreateListener( |
||||
Listener::AcceptCallback on_accept, |
||||
absl::AnyInvocable<void(absl::Status)> on_shutdown, |
||||
const EndpointConfig& config, |
||||
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) |
||||
override; |
||||
|
||||
ConnectionHandle Connect(OnConnectCallback on_connect, |
||||
const ResolvedAddress& addr, |
||||
const EndpointConfig& args, |
||||
MemoryAllocator memory_allocator, |
||||
Duration timeout) override; |
||||
|
||||
bool CancelConnect(ConnectionHandle handle) override; |
||||
bool IsWorkerThread() override; |
||||
std::unique_ptr<DNSResolver> GetDNSResolver( |
||||
const DNSResolver::ResolverOptions& options) override; |
||||
void Run(Closure* closure) override; |
||||
void Run(absl::AnyInvocable<void()> closure) override; |
||||
TaskHandle RunAfter(Duration when, Closure* closure) override; |
||||
TaskHandle RunAfter(Duration when, |
||||
absl::AnyInvocable<void()> closure) override; |
||||
bool Cancel(TaskHandle handle) override; |
||||
|
||||
private: |
||||
struct Closure; |
||||
EventEngine::TaskHandle RunAfterInternal(Duration when, |
||||
absl::AnyInvocable<void()> cb); |
||||
grpc_core::Mutex mu_; |
||||
TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_); |
||||
std::atomic<intptr_t> aba_token_{0}; |
||||
|
||||
posix_engine::TimerManager timer_manager_; |
||||
ThreadedExecutor executor_{2}; |
||||
IOCP iocp_; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif |
||||
|
||||
#endif // GRPC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H
|
@ -1,37 +0,0 @@ |
||||
// Copyright 2022 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/lib/event_engine/windows/windows_engine.h" |
||||
#include "test/core/event_engine/test_suite/event_engine_test.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
#ifdef GPR_WINDOWS |
||||
|
||||
int main(int argc, char** argv) { |
||||
testing::InitGoogleTest(&argc, argv); |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
auto factory = []() { |
||||
return absl::make_unique< |
||||
grpc_event_engine::experimental::WindowsEventEngine>(); |
||||
}; |
||||
SetEventEngineFactories(factory, factory); |
||||
return RUN_ALL_TESTS(); |
||||
} |
||||
|
||||
#else // not GPR_WINDOWS
|
||||
|
||||
int main(int /* argc */, char** /* argv */) { return 0; } |
||||
|
||||
#endif // GPR_WINDOWS
|
@ -1,81 +0,0 @@ |
||||
# Copyright 2022 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test") |
||||
|
||||
licenses(["notice"]) |
||||
|
||||
grpc_cc_test( |
||||
name = "iocp_test", |
||||
timeout = "short", |
||||
srcs = ["iocp_test.cc"], |
||||
external_deps = [ |
||||
"gtest", |
||||
"absl/types:variant", |
||||
], |
||||
language = "C++", |
||||
tags = [ |
||||
"no_linux", |
||||
"no_mac", |
||||
"no_test_ios", |
||||
], |
||||
uses_event_engine = False, |
||||
uses_polling = False, |
||||
deps = [ |
||||
"create_sockpair", |
||||
"//:common_event_engine_closures", |
||||
"//:event_engine_utils", |
||||
"//:gpr_platform", |
||||
"//:windows_iocp", |
||||
"//test/core/util:grpc_test_util", |
||||
], |
||||
) |
||||
|
||||
grpc_cc_test( |
||||
name = "win_socket_test", |
||||
timeout = "short", |
||||
srcs = ["win_socket_test.cc"], |
||||
external_deps = ["gtest"], |
||||
language = "C++", |
||||
tags = [ |
||||
"no_linux", |
||||
"no_mac", |
||||
"no_test_ios", |
||||
], |
||||
uses_event_engine = False, |
||||
uses_polling = False, |
||||
deps = [ |
||||
"create_sockpair", |
||||
"//:common_event_engine_closures", |
||||
"//:gpr_platform", |
||||
"//:windows_event_engine", |
||||
"//test/core/util:grpc_test_util", |
||||
], |
||||
) |
||||
|
||||
grpc_cc_library( |
||||
name = "create_sockpair", |
||||
srcs = ["create_sockpair.cc"], |
||||
hdrs = ["create_sockpair.h"], |
||||
external_deps = ["absl/status"], |
||||
language = "C++", |
||||
tags = [ |
||||
"no_linux", |
||||
"no_mac", |
||||
], |
||||
deps = [ |
||||
"//:gpr_platform", |
||||
"//:grpc_base", |
||||
], |
||||
) |
@ -1,80 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
#ifdef GPR_WINDOWS |
||||
|
||||
#include <winsock2.h> |
||||
#include <ws2tcpip.h> |
||||
|
||||
#include "absl/status/status.h" |
||||
|
||||
#include "src/core/lib/event_engine/windows/win_socket.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "test/core/event_engine/windows/create_sockpair.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
void CreateSockpair(SOCKET sockpair[2], DWORD flags) { |
||||
SOCKET svr_sock = INVALID_SOCKET; |
||||
SOCKET lst_sock = INVALID_SOCKET; |
||||
SOCKET cli_sock = INVALID_SOCKET; |
||||
SOCKADDR_IN addr; |
||||
int addr_len = sizeof(addr); |
||||
|
||||
lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, flags); |
||||
GPR_ASSERT(lst_sock != INVALID_SOCKET); |
||||
|
||||
memset(&addr, 0, sizeof(addr)); |
||||
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); |
||||
addr.sin_family = AF_INET; |
||||
GPR_ASSERT(bind(lst_sock, (sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR); |
||||
GPR_ASSERT(listen(lst_sock, SOMAXCONN) != SOCKET_ERROR); |
||||
GPR_ASSERT(getsockname(lst_sock, (sockaddr*)&addr, &addr_len) != |
||||
SOCKET_ERROR); |
||||
|
||||
cli_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, flags); |
||||
GPR_ASSERT(cli_sock != INVALID_SOCKET); |
||||
|
||||
GPR_ASSERT(WSAConnect(cli_sock, (sockaddr*)&addr, addr_len, NULL, NULL, NULL, |
||||
NULL) == 0); |
||||
svr_sock = accept(lst_sock, (sockaddr*)&addr, &addr_len); |
||||
GPR_ASSERT(svr_sock != INVALID_SOCKET); |
||||
|
||||
closesocket(lst_sock); |
||||
// TODO(hork): see if we can migrate this to IPv6, or break up the socket prep
|
||||
// stages.
|
||||
// Historical note: This method creates an ipv4 sockpair, which cannot
|
||||
// be made dual stack. This was silently preventing TCP_NODELAY from being
|
||||
// enabled, but not causing an unrecoverable error. So this is left as a
|
||||
// logged status. WSAEINVAL is expected.
|
||||
auto status = PrepareSocket(cli_sock); |
||||
// if (!status.ok()) {
|
||||
// gpr_log(GPR_DEBUG, "Error preparing client socket: %s",
|
||||
// status.ToString().c_str());
|
||||
// }
|
||||
status = PrepareSocket(svr_sock); |
||||
// if (!status.ok()) {
|
||||
// gpr_log(GPR_DEBUG, "Error preparing server socket: %s",
|
||||
// status.ToString().c_str());
|
||||
// }
|
||||
|
||||
sockpair[0] = svr_sock; |
||||
sockpair[1] = cli_sock; |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GPR_WINDOWS
|
@ -1,31 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_TEST_CORE_EVENT_ENGINE_WINDOWS_CREATE_SOCKPAIR_H |
||||
#define GRPC_TEST_CORE_EVENT_ENGINE_WINDOWS_CREATE_SOCKPAIR_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
#ifdef GPR_WINDOWS |
||||
|
||||
#include <winsock2.h> |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
void CreateSockpair(SOCKET sockpair[2], DWORD flags); |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GPR_WINDOWS
|
||||
#endif // GRPC_TEST_CORE_EVENT_ENGINE_WINDOWS_CREATE_SOCKPAIR_H
|
@ -1,359 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_WINDOWS |
||||
#include <thread> |
||||
|
||||
#include <gmock/gmock.h> |
||||
#include <gtest/gtest.h> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/time/time.h" |
||||
#include "absl/types/variant.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/log_windows.h> |
||||
|
||||
#include "src/core/lib/event_engine/common_closures.h" |
||||
#include "src/core/lib/event_engine/executor/threaded_executor.h" |
||||
#include "src/core/lib/event_engine/poller.h" |
||||
#include "src/core/lib/event_engine/promise.h" |
||||
#include "src/core/lib/event_engine/windows/iocp.h" |
||||
#include "src/core/lib/event_engine/windows/win_socket.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "test/core/event_engine/windows/create_sockpair.h" |
||||
|
||||
namespace { |
||||
using ::grpc_event_engine::experimental::AnyInvocableClosure; |
||||
using ::grpc_event_engine::experimental::CreateSockpair; |
||||
using ::grpc_event_engine::experimental::EventEngine; |
||||
using ::grpc_event_engine::experimental::IOCP; |
||||
using ::grpc_event_engine::experimental::Poller; |
||||
using ::grpc_event_engine::experimental::Promise; |
||||
using ::grpc_event_engine::experimental::SelfDeletingClosure; |
||||
using ::grpc_event_engine::experimental::ThreadedExecutor; |
||||
using ::grpc_event_engine::experimental::WinSocket; |
||||
} // namespace
|
||||
|
||||
class IOCPTest : public testing::Test {}; |
||||
|
||||
TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) { |
||||
ThreadedExecutor executor{2}; |
||||
IOCP iocp(&executor); |
||||
SOCKET sockpair[2]; |
||||
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags()); |
||||
WinSocket* wrapped_client_socket = |
||||
static_cast<WinSocket*>(iocp.Watch(sockpair[0])); |
||||
WinSocket* wrapped_server_socket = |
||||
static_cast<WinSocket*>(iocp.Watch(sockpair[1])); |
||||
Promise<bool> read_called{false}; |
||||
Promise<bool> write_called{false}; |
||||
DWORD flags = 0; |
||||
AnyInvocableClosure* on_read; |
||||
AnyInvocableClosure* on_write; |
||||
{ |
||||
// When the client gets some data, ensure it matches what we expect.
|
||||
WSABUF read_wsabuf; |
||||
read_wsabuf.len = 2048; |
||||
char read_char_buffer[2048]; |
||||
read_wsabuf.buf = read_char_buffer; |
||||
DWORD bytes_rcvd; |
||||
memset(wrapped_client_socket->read_info()->overlapped(), 0, |
||||
sizeof(OVERLAPPED)); |
||||
int status = |
||||
WSARecv(wrapped_client_socket->socket(), &read_wsabuf, 1, &bytes_rcvd, |
||||
&flags, wrapped_client_socket->read_info()->overlapped(), NULL); |
||||
// Expecting error 997, WSA_IO_PENDING
|
||||
EXPECT_EQ(status, -1); |
||||
int last_error = WSAGetLastError(); |
||||
ASSERT_EQ(last_error, WSA_IO_PENDING); |
||||
on_read = new AnyInvocableClosure([wrapped_client_socket, &read_called, |
||||
&read_wsabuf, &bytes_rcvd]() { |
||||
gpr_log(GPR_DEBUG, "Notified on read"); |
||||
EXPECT_GE(wrapped_client_socket->read_info()->bytes_transferred(), 10); |
||||
EXPECT_STREQ(read_wsabuf.buf, "hello!"); |
||||
read_called.Set(true); |
||||
}); |
||||
wrapped_client_socket->NotifyOnRead(on_read); |
||||
} |
||||
{ |
||||
// Have the server send a message to the client
|
||||
WSABUF write_wsabuf; |
||||
char write_char_buffer[2048] = "hello!"; |
||||
write_wsabuf.len = 2048; |
||||
write_wsabuf.buf = write_char_buffer; |
||||
DWORD bytes_sent; |
||||
memset(wrapped_server_socket->write_info()->overlapped(), 0, |
||||
sizeof(OVERLAPPED)); |
||||
int status = |
||||
WSASend(wrapped_server_socket->socket(), &write_wsabuf, 1, &bytes_sent, |
||||
0, wrapped_server_socket->write_info()->overlapped(), NULL); |
||||
EXPECT_EQ(status, 0); |
||||
if (status != 0) { |
||||
int error_num = WSAGetLastError(); |
||||
char* utf8_message = gpr_format_message(error_num); |
||||
gpr_log(GPR_INFO, "Error sending data: (%d) %s", error_num, utf8_message); |
||||
gpr_free(utf8_message); |
||||
} |
||||
on_write = new AnyInvocableClosure([&write_called] { |
||||
gpr_log(GPR_DEBUG, "Notified on write"); |
||||
write_called.Set(true); |
||||
}); |
||||
wrapped_server_socket->NotifyOnWrite(on_write); |
||||
} |
||||
// Doing work for WSASend
|
||||
auto work_result = iocp.Work(std::chrono::seconds(10)); |
||||
ASSERT_TRUE(absl::holds_alternative<Poller::Events>(work_result)); |
||||
Poller::Events closures = absl::get<Poller::Events>(work_result); |
||||
ASSERT_EQ(closures.size(), 1); |
||||
executor.Run(closures[0]); |
||||
// Doing work for WSARecv
|
||||
work_result = iocp.Work(std::chrono::seconds(10)); |
||||
ASSERT_TRUE(absl::holds_alternative<Poller::Events>(work_result)); |
||||
closures = absl::get<Poller::Events>(work_result); |
||||
ASSERT_EQ(closures.size(), 1); |
||||
executor.Run(closures[0]); |
||||
// wait for the callbacks to run
|
||||
ASSERT_TRUE(read_called.WaitWithTimeout(absl::Seconds(10))); |
||||
ASSERT_TRUE(write_called.WaitWithTimeout(absl::Seconds(10))); |
||||
|
||||
delete on_read; |
||||
delete on_write; |
||||
wrapped_client_socket->MaybeShutdown(absl::OkStatus()); |
||||
wrapped_server_socket->MaybeShutdown(absl::OkStatus()); |
||||
delete wrapped_client_socket; |
||||
delete wrapped_server_socket; |
||||
} |
||||
|
||||
TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) { |
||||
ThreadedExecutor executor{2}; |
||||
IOCP iocp(&executor); |
||||
SOCKET sockpair[2]; |
||||
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags()); |
||||
WinSocket* wrapped_client_socket = |
||||
static_cast<WinSocket*>(iocp.Watch(sockpair[0])); |
||||
Promise<bool> read_called{false}; |
||||
DWORD flags = 0; |
||||
AnyInvocableClosure* on_read; |
||||
{ |
||||
// Set the client to receive asynchronously
|
||||
// Prepare a notification callback, but don't register it yet.
|
||||
WSABUF read_wsabuf; |
||||
read_wsabuf.len = 2048; |
||||
char read_char_buffer[2048]; |
||||
read_wsabuf.buf = read_char_buffer; |
||||
DWORD bytes_rcvd; |
||||
memset(wrapped_client_socket->read_info()->overlapped(), 0, |
||||
sizeof(OVERLAPPED)); |
||||
int status = |
||||
WSARecv(wrapped_client_socket->socket(), &read_wsabuf, 1, &bytes_rcvd, |
||||
&flags, wrapped_client_socket->read_info()->overlapped(), NULL); |
||||
// Expecting error 997, WSA_IO_PENDING
|
||||
EXPECT_EQ(status, -1); |
||||
int last_error = WSAGetLastError(); |
||||
ASSERT_EQ(last_error, WSA_IO_PENDING); |
||||
on_read = new AnyInvocableClosure([wrapped_client_socket, &read_called, |
||||
&read_wsabuf, &bytes_rcvd]() { |
||||
gpr_log(GPR_DEBUG, "Notified on read"); |
||||
EXPECT_GE(wrapped_client_socket->read_info()->bytes_transferred(), 10); |
||||
EXPECT_STREQ(read_wsabuf.buf, "hello!"); |
||||
read_called.Set(true); |
||||
}); |
||||
} |
||||
{ |
||||
// Have the server send a message to the client. No need to track via IOCP
|
||||
WSABUF write_wsabuf; |
||||
char write_char_buffer[2048] = "hello!"; |
||||
write_wsabuf.len = 2048; |
||||
write_wsabuf.buf = write_char_buffer; |
||||
DWORD bytes_sent; |
||||
OVERLAPPED write_overlapped; |
||||
memset(&write_overlapped, 0, sizeof(OVERLAPPED)); |
||||
int status = WSASend(sockpair[1], &write_wsabuf, 1, &bytes_sent, 0, |
||||
&write_overlapped, NULL); |
||||
EXPECT_EQ(status, 0); |
||||
} |
||||
// IOCP::Work without any notification callbacks should return no Events.
|
||||
auto work_result = iocp.Work(std::chrono::seconds(2)); |
||||
ASSERT_TRUE(absl::holds_alternative<Poller::Events>(work_result)); |
||||
Poller::Events closures = absl::get<Poller::Events>(work_result); |
||||
ASSERT_EQ(closures.size(), 0); |
||||
// register the closure, which should trigger it immediately.
|
||||
wrapped_client_socket->NotifyOnRead(on_read); |
||||
// wait for the callbacks to run
|
||||
ASSERT_TRUE(read_called.WaitWithTimeout(absl::Seconds(10))); |
||||
|
||||
delete on_read; |
||||
wrapped_client_socket->MaybeShutdown(absl::OkStatus()); |
||||
delete wrapped_client_socket; |
||||
} |
||||
|
||||
TEST_F(IOCPTest, KickWorks) { |
||||
ThreadedExecutor executor{2}; |
||||
IOCP iocp(&executor); |
||||
Promise<bool> kicked{false}; |
||||
executor.Run([&iocp, &kicked] { |
||||
Poller::WorkResult result = iocp.Work(std::chrono::seconds(30)); |
||||
ASSERT_TRUE(absl::holds_alternative<Poller::Kicked>(result)); |
||||
kicked.Set(true); |
||||
}); |
||||
executor.Run([&iocp] { |
||||
// give the worker thread a chance to start
|
||||
absl::SleepFor(absl::Milliseconds(42)); |
||||
iocp.Kick(); |
||||
}); |
||||
// wait for the callbacks to run
|
||||
ASSERT_TRUE(kicked.WaitWithTimeout(absl::Seconds(10))); |
||||
} |
||||
|
||||
TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) { |
||||
// TODO(hork): evaluate if a kick count is going to be useful.
|
||||
// This documents the existing poller's behavior of maintaining a kick count,
|
||||
// but it's unclear if it's going to be needed.
|
||||
ThreadedExecutor executor{2}; |
||||
IOCP iocp(&executor); |
||||
// kick twice
|
||||
iocp.Kick(); |
||||
iocp.Kick(); |
||||
// Assert the next two WorkResults are kicks
|
||||
auto result = iocp.Work(std::chrono::milliseconds(1)); |
||||
ASSERT_TRUE(absl::holds_alternative<Poller::Kicked>(result)); |
||||
result = iocp.Work(std::chrono::milliseconds(1)); |
||||
ASSERT_TRUE(absl::holds_alternative<Poller::Kicked>(result)); |
||||
// followed by a DeadlineExceeded
|
||||
result = iocp.Work(std::chrono::milliseconds(1)); |
||||
ASSERT_TRUE(absl::holds_alternative<Poller::DeadlineExceeded>(result)); |
||||
} |
||||
|
||||
TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) { |
||||
ThreadedExecutor executor{2}; |
||||
IOCP iocp(&executor); |
||||
SOCKET sockpair[2]; |
||||
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags()); |
||||
closesocket(sockpair[0]); |
||||
ASSERT_DEATH( |
||||
{ |
||||
WinSocket* wrapped_client_socket = |
||||
static_cast<WinSocket*>(iocp.Watch(sockpair[0])); |
||||
}, |
||||
""); |
||||
} |
||||
|
||||
TEST_F(IOCPTest, StressTestThousandsOfSockets) { |
||||
// Start 100 threads, each with their own IOCP
|
||||
// On each thread, create 50 socket pairs (100 sockets) and have them exchange
|
||||
// a message before shutting down.
|
||||
int thread_count = 100; |
||||
int sockets_per_thread = 50; |
||||
std::atomic<int> read_count{0}; |
||||
std::atomic<int> write_count{0}; |
||||
std::vector<std::thread> threads; |
||||
threads.reserve(thread_count); |
||||
for (int thread_n = 0; thread_n < thread_count; thread_n++) { |
||||
threads.emplace_back([thread_n, sockets_per_thread, &read_count, |
||||
&write_count] { |
||||
ThreadedExecutor executor{2}; |
||||
IOCP iocp(&executor); |
||||
// Start a looping worker thread with a moderate timeout
|
||||
std::thread iocp_worker([&iocp, &executor] { |
||||
Poller::WorkResult result; |
||||
do { |
||||
result = iocp.Work(std::chrono::seconds(1)); |
||||
if (absl::holds_alternative<Poller::Events>(result)) { |
||||
for (auto& event : absl::get<Poller::Events>(result)) { |
||||
executor.Run(event); |
||||
} |
||||
} |
||||
} while (!absl::holds_alternative<Poller::DeadlineExceeded>(result)); |
||||
}); |
||||
for (int i = 0; i < sockets_per_thread; i++) { |
||||
SOCKET sockpair[2]; |
||||
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags()); |
||||
WinSocket* wrapped_client_socket = |
||||
static_cast<WinSocket*>(iocp.Watch(sockpair[0])); |
||||
WinSocket* wrapped_server_socket = |
||||
static_cast<WinSocket*>(iocp.Watch(sockpair[1])); |
||||
wrapped_client_socket->NotifyOnRead( |
||||
SelfDeletingClosure::Create([&read_count, wrapped_client_socket] { |
||||
read_count.fetch_add(1); |
||||
wrapped_client_socket->MaybeShutdown(absl::OkStatus()); |
||||
})); |
||||
wrapped_server_socket->NotifyOnWrite( |
||||
SelfDeletingClosure::Create([&write_count, wrapped_server_socket] { |
||||
write_count.fetch_add(1); |
||||
wrapped_server_socket->MaybeShutdown(absl::OkStatus()); |
||||
})); |
||||
{ |
||||
// Set the client to receive
|
||||
WSABUF read_wsabuf; |
||||
read_wsabuf.len = 20; |
||||
char read_char_buffer[20]; |
||||
read_wsabuf.buf = read_char_buffer; |
||||
DWORD bytes_rcvd; |
||||
DWORD flags = 0; |
||||
memset(wrapped_client_socket->read_info()->overlapped(), 0, |
||||
sizeof(OVERLAPPED)); |
||||
int status = WSARecv( |
||||
wrapped_client_socket->socket(), &read_wsabuf, 1, &bytes_rcvd, |
||||
&flags, wrapped_client_socket->read_info()->overlapped(), NULL); |
||||
// Expecting error 997, WSA_IO_PENDING
|
||||
EXPECT_EQ(status, -1); |
||||
int last_error = WSAGetLastError(); |
||||
ASSERT_EQ(last_error, WSA_IO_PENDING); |
||||
} |
||||
{ |
||||
// Have the server send a message to the client.
|
||||
WSABUF write_wsabuf; |
||||
char write_char_buffer[20] = "hello!"; |
||||
write_wsabuf.len = 20; |
||||
write_wsabuf.buf = write_char_buffer; |
||||
DWORD bytes_sent; |
||||
memset(wrapped_server_socket->write_info()->overlapped(), 0, |
||||
sizeof(OVERLAPPED)); |
||||
int status = WSASend( |
||||
wrapped_server_socket->socket(), &write_wsabuf, 1, &bytes_sent, 0, |
||||
wrapped_server_socket->write_info()->overlapped(), NULL); |
||||
EXPECT_EQ(status, 0); |
||||
} |
||||
} |
||||
iocp_worker.join(); |
||||
}); |
||||
} |
||||
for (auto& t : threads) { |
||||
t.join(); |
||||
} |
||||
absl::Time deadline = absl::Now() + absl::Seconds(30); |
||||
while (read_count.load() != thread_count * sockets_per_thread || |
||||
write_count.load() != thread_count * sockets_per_thread) { |
||||
absl::SleepFor(absl::Milliseconds(50)); |
||||
if (deadline < absl::Now()) { |
||||
FAIL() << "Deadline exceeded with " << read_count.load() << " reads and " |
||||
<< write_count.load() << " writes"; |
||||
} |
||||
} |
||||
ASSERT_EQ(read_count.load(), thread_count * sockets_per_thread); |
||||
ASSERT_EQ(write_count.load(), thread_count * sockets_per_thread); |
||||
} |
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
grpc_init(); |
||||
int status = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return status; |
||||
} |
||||
#else // not GPR_WINDOWS
|
||||
int main(int /* argc */, char** /* argv */) { return 0; } |
||||
#endif |
@ -1,102 +0,0 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_WINDOWS |
||||
#include <gtest/gtest.h> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/time/time.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/log_windows.h> |
||||
|
||||
#include "src/core/lib/event_engine/common_closures.h" |
||||
#include "src/core/lib/event_engine/executor/threaded_executor.h" |
||||
#include "src/core/lib/event_engine/windows/iocp.h" |
||||
#include "src/core/lib/event_engine/windows/win_socket.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "test/core/event_engine/windows/create_sockpair.h" |
||||
|
||||
namespace { |
||||
using ::grpc_event_engine::experimental::AnyInvocableClosure; |
||||
using ::grpc_event_engine::experimental::CreateSockpair; |
||||
using ::grpc_event_engine::experimental::IOCP; |
||||
using ::grpc_event_engine::experimental::ThreadedExecutor; |
||||
using ::grpc_event_engine::experimental::WinSocket; |
||||
} // namespace
|
||||
|
||||
class WinSocketTest : public testing::Test {}; |
||||
|
||||
TEST_F(WinSocketTest, ManualReadEventTriggeredWithoutIO) { |
||||
ThreadedExecutor executor{2}; |
||||
SOCKET sockpair[2]; |
||||
CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags()); |
||||
WinSocket wrapped_client_socket(sockpair[0], &executor); |
||||
WinSocket wrapped_server_socket(sockpair[1], &executor); |
||||
bool read_called = false; |
||||
AnyInvocableClosure on_read([&read_called]() { read_called = true; }); |
||||
wrapped_client_socket.NotifyOnRead(&on_read); |
||||
AnyInvocableClosure on_write([] { FAIL() << "No Write expected"; }); |
||||
wrapped_client_socket.NotifyOnWrite(&on_write); |
||||
ASSERT_FALSE(read_called); |
||||
wrapped_client_socket.SetReadable(); |
||||
absl::Time deadline = absl::Now() + absl::Seconds(10); |
||||
while (!read_called) { |
||||
absl::SleepFor(absl::Milliseconds(42)); |
||||
if (deadline < absl::Now()) { |
||||
FAIL() << "Deadline exceeded"; |
||||
} |
||||
} |
||||
ASSERT_TRUE(read_called); |
||||
wrapped_client_socket.MaybeShutdown(absl::CancelledError("done")); |
||||
wrapped_server_socket.MaybeShutdown(absl::CancelledError("done")); |
||||
} |
||||
|
||||
TEST_F(WinSocketTest, NotificationCalledImmediatelyOnShutdownWinSocket) { |
||||
ThreadedExecutor executor{2}; |
||||
SOCKET sockpair[2]; |
||||
CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags()); |
||||
WinSocket wrapped_client_socket(sockpair[0], &executor); |
||||
wrapped_client_socket.MaybeShutdown(absl::CancelledError("testing")); |
||||
bool read_called = false; |
||||
AnyInvocableClosure closure([&wrapped_client_socket, &read_called] { |
||||
ASSERT_EQ(wrapped_client_socket.read_info()->bytes_transferred(), 0); |
||||
ASSERT_EQ(wrapped_client_socket.read_info()->wsa_error(), WSAESHUTDOWN); |
||||
read_called = true; |
||||
}); |
||||
wrapped_client_socket.NotifyOnRead(&closure); |
||||
absl::Time deadline = absl::Now() + absl::Seconds(3); |
||||
while (!read_called) { |
||||
absl::SleepFor(absl::Milliseconds(42)); |
||||
if (deadline < absl::Now()) { |
||||
FAIL() << "Deadline exceeded"; |
||||
} |
||||
} |
||||
ASSERT_TRUE(read_called); |
||||
closesocket(sockpair[1]); |
||||
} |
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
grpc_init(); |
||||
int status = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return status; |
||||
} |
||||
|
||||
#else // not GPR_WINDOWS
|
||||
int main(int /* argc */, char** /* argv */) { return 0; } |
||||
#endif |
Loading…
Reference in new issue