[EventEngine] Workaround for missing data bug on endpoint/socket shutdown (#38014)

There was an edge case in which a socket or endpoint was shut down, a socket `read` call returned zero bytes, and there was unread in the read buffer from a previous read operation. The endpoint callbacks were called with an error status to indicate the end of the stream, and the callbacks did not consume that final chunk of data.

My current hunch is that something inside gRPC is violating the EventEngine Endpoint::Read contract, but I'm not certain what, yet. 88b5c9e3ab/include/grpc/event_engine/event_engine.h (L197-L199)

However, by modifying WindowsEndpoint to return an `absl::OkStatus()` if there's any data in the buffer, tests appear to pass.

Closes #38014

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38014 from drfloob:win-endpoint-data-leak b24b2d9f8a
PiperOrigin-RevId: 691063044
pull/38021/head
AJ Heller 5 months ago committed by Copybara-Service
parent cdac698f75
commit 7e5dc145db
  1. 30
      src/core/lib/event_engine/windows/windows_endpoint.cc
  2. 3
      test/core/end2end/end2end_tests.cc

@ -42,7 +42,8 @@ void DumpSliceBuffer(SliceBuffer* buffer, absl::string_view context_string) {
for (size_t i = 0; i < buffer->Count(); i++) {
auto str = buffer->MutableSliceAt(i).as_string_view();
GRPC_TRACE_LOG(event_engine_endpoint, INFO)
<< context_string << ": " << str;
<< context_string << " [" << i + 1 << "/" << buffer->Count()
<< "]: " << str;
}
}
@ -78,7 +79,7 @@ WindowsEndpoint::~WindowsEndpoint() {
void WindowsEndpoint::AsyncIOState::DoTcpRead(SliceBuffer* buffer) {
GRPC_TRACE_LOG(event_engine_endpoint, INFO)
<< "WindowsEndpoint::" << endpoint << " reading";
<< "WindowsEndpoint::" << endpoint << " attempting a read";
if (socket->IsShutdown()) {
socket->read_info()->SetErrorStatus(
absl::InternalError("Socket is shutting down."));
@ -294,15 +295,23 @@ void WindowsEndpoint::HandleReadClosure::Run() {
return ResetAndReturnCallback()(status);
}
if (result.bytes_transferred == 0) {
DCHECK_GT(io_state.use_count(), 0);
// Either the endpoint is shut down or we've seen the end of the stream
if (GRPC_TRACE_FLAG_ENABLED(event_engine_endpoint_data)) {
DumpSliceBuffer(buffer_, absl::StrFormat("WindowsEndpoint::%p READ",
io_state->endpoint));
LOG(INFO) << "WindowsEndpoint::" << this << " read 0 bytes.";
DumpSliceBuffer(
&last_read_buffer_,
absl::StrFormat("WindowsEndpoint::%p READ last_read_buffer_: ",
io_state->endpoint));
}
status = absl::InternalError("End of TCP stream");
grpc_core::StatusSetInt(&status, grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
buffer_->Swap(last_read_buffer_);
if (buffer_->Length() == 0) {
// Only send an error if there is no more data to consume. If the endpoint
// or socket is shut down, the next read will discover that.
status = absl::InternalError("End of TCP stream");
grpc_core::StatusSetInt(&status, grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
}
return ResetAndReturnCallback()(status);
}
DCHECK_GT(result.bytes_transferred, 0);
@ -320,8 +329,13 @@ void WindowsEndpoint::HandleReadClosure::Run() {
bool WindowsEndpoint::HandleReadClosure::MaybeFinishIfDataHasAlreadyBeenRead() {
if (last_read_buffer_.Length() > 0) {
GRPC_TRACE_LOG(event_engine_endpoint, INFO)
<< "WindowsEndpoint::" << io_state_->endpoint
<< " finishing a synchronous read";
buffer_->Swap(last_read_buffer_);
// Captures io_state_ to ensure it remains alive until the callback is run.
if (GRPC_TRACE_FLAG_ENABLED(event_engine_endpoint_data)) {
DumpSliceBuffer(buffer_, "finishing synchronous read");
}
io_state_->thread_pool->Run(
[cb = ResetAndReturnCallback()]() mutable { cb(absl::OkStatus()); });
return true;

@ -65,9 +65,6 @@ void CoreEnd2endTest::SetUp() {
CoreConfiguration::Reset();
initialized_ = false;
grpc_prewarm_os_for_tests();
#ifdef GPR_WINDOWS
GTEST_SKIP() << "Disabled on Windows due to high flake rate";
#endif
}
void CoreEnd2endTest::TearDown() {

Loading…
Cancel
Save