[EventEngine] Windows Endpoint: optimize reads by chaining synchronous WSARecv operations (#32563)

Built on https://github.com/grpc/grpc/pull/32560

When calling EventEngine::Read, if a synchronous WSARecv call completes
successfully and 1) the read buffer is not full, and 2) the stream
remains open, then the endpoint will now chain execution of more
synchronous WSARecvs. The chain is broken and the on_read callback is
called when either there are errors, the next call would block, the
buffer is full, or the stream is closed.

Something like this is helpful to prevent excessive read callback
execution under a flood of tiny payloads, presuming messages are not
being combined as one would usually expect (see
`//test/core/iomgr:endpoint_pair_test`, and Nagle's algorithm).
pull/32616/head
AJ Heller 2 years ago committed by GitHub
parent 1088046a57
commit ae55fb04ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 165
      src/core/lib/event_engine/windows/windows_endpoint.cc
  2. 14
      src/core/lib/event_engine/windows/windows_endpoint.h
  3. 2
      src/core/lib/event_engine/windows/windows_listener.cc

@ -37,6 +37,14 @@ namespace {
constexpr size_t kDefaultTargetReadSize = 8192;
constexpr int kMaxWSABUFCount = 16;
void DumpSliceBuffer(SliceBuffer* buffer, absl::string_view context_string) {
for (size_t i = 0; i < buffer->Count(); i++) {
auto str = buffer->MutableSliceAt(i).as_string_view();
gpr_log(GPR_INFO, "%s: %.*s", context_string.data(), str.length(),
str.data());
}
}
} // namespace
WindowsEndpoint::WindowsEndpoint(
@ -68,26 +76,14 @@ WindowsEndpoint::~WindowsEndpoint() {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p destroyed", this);
}
bool WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* /* args */) {
absl::Status WindowsEndpoint::DoTcpRead(SliceBuffer* buffer) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p reading", this);
if (io_state_->socket->IsShutdown()) {
executor_->Run([on_read = std::move(on_read)]() mutable {
on_read(absl::UnavailableError("Socket is shutting down."));
});
return false;
return absl::UnavailableError("Socket is shutting down.");
}
// Prepare the WSABUF struct
WSABUF wsa_buffers[kMaxWSABUFCount];
// TODO(hork): introduce a last_read_buffer to save unused sliced.
buffer->Clear();
// TODO(hork): sometimes args->read_hint_bytes is 1, which is not useful.
// Choose an appropriate size.
size_t min_read_size = kDefaultTargetReadSize;
if (buffer->Length() < min_read_size && buffer->Count() < kMaxWSABUFCount) {
buffer->AppendIndexed(Slice(allocator_.MakeSlice(min_read_size)));
}
GPR_ASSERT(buffer->Count() <= kMaxWSABUFCount);
WSABUF wsa_buffers[kMaxWSABUFCount];
for (size_t i = 0; i < buffer->Count(); i++) {
auto& slice = buffer->MutableSliceAt(i);
wsa_buffers[i].buf = (char*)slice.begin();
@ -95,49 +91,63 @@ bool WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
}
DWORD bytes_read = 0;
DWORD flags = 0;
// First let's try a synchronous, non-blocking read.
// First try a synchronous, non-blocking read.
int status =
WSARecv(io_state_->socket->raw_socket(), wsa_buffers,
(DWORD)buffer->Count(), &bytes_read, &flags, nullptr, nullptr);
int wsa_error = status == 0 ? 0 : WSAGetLastError();
// Did we get data immediately ? Yay.
if (wsa_error != WSAEWOULDBLOCK) {
// Data or some error was returned immediately.
io_state_->socket->read_info()->SetResult(
{/*wsa_error=*/wsa_error, /*bytes_read=*/bytes_read});
absl::Status result;
if (bytes_read == 0) {
result = absl::UnavailableError("End of TCP stream");
grpc_core::StatusSetInt(&result, grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
buffer->Clear();
} else {
result = absl::OkStatus();
// prune slicebuffer
if (bytes_read != buffer->Length()) {
buffer->RemoveLastNBytes(buffer->Length() - bytes_read);
}
}
executor_->Run(
[result, on_read = std::move(on_read)]() mutable { on_read(result); });
return false;
executor_->Run(&io_state_->handle_read_event);
return absl::OkStatus();
}
// If the endpoint has already received some data, and the next call would
// block, return the data in case that is all the data the reader expects.
if (io_state_->handle_read_event.MaybeFinishIfDataHasAlreadyBeenRead()) {
return absl::OkStatus();
}
// Otherwise, let's retry, by queuing a read.
memset(io_state_->socket->read_info()->overlapped(), 0, sizeof(OVERLAPPED));
status = WSARecv(io_state_->socket->raw_socket(), wsa_buffers,
(DWORD)buffer->Count(), &bytes_read, &flags,
io_state_->socket->read_info()->overlapped(), nullptr);
wsa_error = status == 0 ? 0 : WSAGetLastError();
if (wsa_error != 0 && wsa_error != WSA_IO_PENDING) {
// Async read returned immediately with an error
executor_->Run([this, on_read = std::move(on_read), wsa_error]() mutable {
on_read(GRPC_WSA_ERROR(
wsa_error,
absl::StrFormat("WindowsEndpont::%p Read failed", this).c_str()));
return GRPC_WSA_ERROR(
wsa_error,
absl::StrFormat("WindowsEndpont::%p Read failed", this).c_str());
}
io_state_->socket->NotifyOnRead(&io_state_->handle_read_event);
return absl::OkStatus();
}
bool WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* /* args */) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p reading", this);
if (io_state_->socket->IsShutdown()) {
executor_->Run([on_read = std::move(on_read)]() mutable {
on_read(absl::UnavailableError("Socket is shutting down."));
});
return false;
}
buffer->Clear();
io_state_->handle_read_event.DonateSpareSlices(buffer);
// TODO(hork): sometimes args->read_hint_bytes is 1, which is not useful.
// Choose an appropriate size.
size_t min_read_size = kDefaultTargetReadSize;
if (buffer->Length() < min_read_size && buffer->Count() < kMaxWSABUFCount) {
buffer->AppendIndexed(Slice(allocator_.MakeSlice(min_read_size)));
}
io_state_->handle_read_event.Prime(io_state_, buffer, std::move(on_read));
io_state_->socket->NotifyOnRead(&io_state_->handle_read_event);
auto status = DoTcpRead(buffer);
if (!status.ok()) {
// The read could not be completed.
io_state_->endpoint->executor_->Run([this, status]() {
io_state_->handle_read_event.ExecuteCallbackAndReset(status);
});
}
return false;
}
@ -201,12 +211,10 @@ bool WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
}
}
auto write_info = io_state_->socket->write_info();
memset(write_info->overlapped(), 0, sizeof(OVERLAPPED));
status =
WSASend(io_state_->socket->raw_socket(), &buffers[async_buffers_offset],
(DWORD)(data->Count() - async_buffers_offset), nullptr, 0,
write_info->overlapped(), nullptr);
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
@ -270,37 +278,66 @@ void WindowsEndpoint::HandleReadClosure::Run() {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Read Event",
io_state->endpoint);
absl::Status status;
auto cb_cleanup = absl::MakeCleanup([this, &status]() {
auto cb = std::move(cb_);
Reset();
cb(status);
});
const auto result = io_state->socket->read_info()->result();
if (result.wsa_error != 0) {
status = GRPC_WSA_ERROR(result.wsa_error, "Async Read Error");
buffer_->Clear();
return;
return ExecuteCallbackAndReset(status);
}
if (result.bytes_transferred > 0) {
GPR_ASSERT(result.bytes_transferred <= buffer_->Length());
if (result.bytes_transferred != buffer_->Length()) {
buffer_->RemoveLastNBytes(buffer_->Length() - result.bytes_transferred);
}
GPR_ASSERT(result.bytes_transferred == buffer_->Length());
if (result.bytes_transferred == 0) {
// Either the endpoint is shut down or we've seen the end of the stream
if (grpc_event_engine_endpoint_data_trace.enabled()) {
for (size_t i = 0; i < buffer_->Count(); i++) {
auto str = buffer_->RefSlice(i).as_string_view();
gpr_log(GPR_INFO, "WindowsEndpoint::%p READ (peer=%s): %.*s",
io_state->endpoint,
io_state->endpoint->peer_address_string_.c_str(), str.length(),
str.data());
}
DumpSliceBuffer(
buffer_, absl::StrFormat("WindowsEndpoint::%p READ (peer=%s)",
io_state->endpoint,
io_state->endpoint->peer_address_string_));
}
return;
status = absl::UnavailableError("End of TCP stream");
grpc_core::StatusSetInt(&status, grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
buffer_->Swap(last_read_buffer_);
return ExecuteCallbackAndReset(status);
}
GPR_DEBUG_ASSERT(result.bytes_transferred > 0);
GPR_DEBUG_ASSERT(result.bytes_transferred <= buffer_->Length());
buffer_->MoveFirstNBytesIntoSliceBuffer(result.bytes_transferred,
last_read_buffer_);
if (buffer_->Length() == 0) {
buffer_->Swap(last_read_buffer_);
return ExecuteCallbackAndReset(status);
}
// Doing another read. Let's keep the AsyncIOState alive a bit longer.
io_state_ = std::move(io_state);
status = io_state_->endpoint->DoTcpRead(buffer_);
if (!status.ok()) {
ExecuteCallbackAndReset(status);
}
// Either the endpoint is shut down or we've seen the end of the stream
buffer_->Clear();
status = absl::UnavailableError("End of TCP stream");
}
bool WindowsEndpoint::HandleReadClosure::MaybeFinishIfDataHasAlreadyBeenRead() {
if (last_read_buffer_.Length() > 0) {
buffer_->Swap(last_read_buffer_);
io_state_->endpoint->executor_->Run(
[this]() { ExecuteCallbackAndReset(absl::OkStatus()); });
return true;
}
return false;
}
void WindowsEndpoint::HandleReadClosure::ExecuteCallbackAndReset(
absl::Status status) {
auto cb = std::move(cb_);
Reset();
cb(status);
}
void WindowsEndpoint::HandleReadClosure::DonateSpareSlices(
SliceBuffer* buffer) {
// Donee buffer must be empty.
GPR_ASSERT(buffer->Length() == 0);
// HandleReadClosure must be in the reset state.
GPR_ASSERT(buffer_ == nullptr);
buffer->Swap(last_read_buffer_);
}
void WindowsEndpoint::HandleWriteClosure::Run() {

@ -49,11 +49,21 @@ class WindowsEndpoint : public EventEngine::Endpoint {
absl::AnyInvocable<void(absl::Status)> cb);
// Resets the per-request data
void Reset();
// Run the callback with whatever data is available, and reset state.
//
// Returns true if the callback has been called with some data. Returns
// false if no data has been read.
bool MaybeFinishIfDataHasAlreadyBeenRead();
// Execute the callback and reset.
void ExecuteCallbackAndReset(absl::Status status);
// Swap any leftover slices into the provided buffer
void DonateSpareSlices(SliceBuffer* buffer);
private:
std::shared_ptr<AsyncIOState> io_state_;
absl::AnyInvocable<void(absl::Status)> cb_;
SliceBuffer* buffer_ = nullptr;
SliceBuffer last_read_buffer_;
};
// Permanent closure type for Write callbacks
@ -86,6 +96,10 @@ class WindowsEndpoint : public EventEngine::Endpoint {
HandleWriteClosure handle_write_event;
};
// Perform the low-level calls and execute the HandleReadClosure
// asynchronously.
absl::Status DoTcpRead(SliceBuffer* buffer);
EventEngine::ResolvedAddress peer_address_;
std::string peer_address_string_;
EventEngine::ResolvedAddress local_address_;

@ -354,7 +354,7 @@ WindowsEventEngineListener::AddSinglePortSocketListener(
"the Listener was starting. This is invalid usage, all ports must "
"be bound before the Listener is started.",
this);
single_port_listener_ptr->Start();
GRPC_RETURN_IF_ERROR(single_port_listener_ptr->Start());
}
return single_port_listener_ptr;
}

Loading…
Cancel
Save