[c-ares] fix spin loop bug when c-ares gives up on a socket that still has data left in its read buffer (#34185)

If we get a readable event on an fd and both the following happens:

- c-ares does *not* read all bytes off the fd

- c-ares removes the fd from the set ARES_GETSOCK_READABLE

... then we have a busy loop here, where we'd keep asking c-ares to
process an fd that it no longer cares about.

This is indirectly related to a change in this code one month ago:
https://github.com/grpc/grpc/pull/33942 - before that change, c-ares
would close the socket when it called
[handle_error](7f3262312f/src/lib/ares_process.c (L707))
and so `IsFdStillReadableLocked` would start returning `false`, causing
us to get away with [this
loop](f6a994229e/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc (L371)).
Now, because `IsFdStillReadableLocked` will keep returning true (because
of our overridden `close` API), we'll loop forever.
pull/34205/head
apolcyn 1 year ago committed by GitHub
parent e5d7eec584
commit a35f282d58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  2. 22
      test/core/util/fake_udp_and_tcp_server.cc
  3. 3
      test/core/util/fake_udp_and_tcp_server.h
  4. 66
      test/cpp/naming/cancel_ares_query_test.cc

@ -366,9 +366,7 @@ static void on_readable(void* arg, grpc_error_handle error) {
GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request,
fdn->grpc_polled_fd->GetName());
if (error.ok() && !ev_driver->shutting_down) {
do {
ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD);
} while (fdn->grpc_polled_fd->IsFdStillReadableLocked());
ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD);
} else {
// If error is not absl::OkStatus() or the resolution was cancelled, it
// means the fd has been shutdown or timed out. The pending lookups made on
@ -438,12 +436,21 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver)
if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
!fdn->readable_registered) {
grpc_ares_ev_driver_ref(ev_driver);
GRPC_CARES_TRACE_LOG("request:%p notify read on: %s",
ev_driver->request,
fdn->grpc_polled_fd->GetName());
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable, fdn,
grpc_schedule_on_exec_ctx);
fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure);
if (fdn->grpc_polled_fd->IsFdStillReadableLocked()) {
GRPC_CARES_TRACE_LOG("request:%p schedule direct read on: %s",
ev_driver->request,
fdn->grpc_polled_fd->GetName());
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &fdn->read_closure,
absl::OkStatus());
} else {
GRPC_CARES_TRACE_LOG("request:%p notify read on: %s",
ev_driver->request,
fdn->grpc_polled_fd->GetName());
fdn->grpc_polled_fd->RegisterForOnReadableLocked(
&fdn->read_closure);
}
fdn->readable_registered = true;
}
// Register write_closure if the socket is writable and write_closure

@ -210,6 +210,28 @@ FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer(int bytes_received_size,
return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading;
}
FakeUdpAndTcpServer::ProcessReadResult
FakeUdpAndTcpServer::SendThreeAllZeroBytes(int bytes_received_size,
int read_error, int s) {
if (bytes_received_size < 0 && !ErrorIsRetryable(read_error)) {
gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
read_error);
GPR_ASSERT(0);
}
if (bytes_received_size == 0) {
// The peer has shut down the connection.
gpr_log(GPR_DEBUG, "Fake TCP server received 0 bytes from peer socket: %d.",
s);
return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
}
char buf[3] = {0, 0, 0};
int bytes_sent = send(s, buf, sizeof(buf), 0);
gpr_log(GPR_DEBUG,
"Fake TCP server sent %d all-zero bytes on peer socket: %d.",
bytes_sent, s);
return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
}
FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::FakeUdpAndTcpServerPeer(int fd)
: fd_(fd) {}

@ -96,6 +96,9 @@ class FakeUdpAndTcpServer {
static ProcessReadResult CloseSocketUponCloseFromPeer(int bytes_received_size,
int read_error, int s);
static ProcessReadResult SendThreeAllZeroBytes(int bytes_received_size,
int read_error, int s);
void ReadFromUdpSocket();
// Run a loop that periodically, every 10 ms:

@ -471,6 +471,72 @@ TEST_F(CancelDuringAresQuery, TestQueryFailsBecauseTcpServerClosesSocket) {
}
}
// This test is meant to repro a bug noticed in internal issue b/297538255.
// The general issue is the loop in
// https://github.com/grpc/grpc/blob/f6a994229e72bc771963706de7a0cd8aa9150bb6/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc#L371.
// The problem with that loop is that c-ares *can* in certain situations stop
// caring about the fd being processed without reading all of the data out of
// the read buffer. In that case, we keep looping because
// IsFdStillReadableLocked() keeps returning true, but we never make progress.
// Meanwhile, we are holding a lock which prevents cancellation or timeouts from
// kicking in, and thus we spin-loop forever.
//
// At the time of writing, this test case illustrates one way to hit that bug.
// It works as follows:
// 1) We force c-ares to use TCP for its DNS queries
// 2) We stand up a fake DNS server that, for each incoming connection, sends
// three all-zero bytes and then closes the socket.
// 3) When the c-ares library receives the three-zero-byte response from the
// DNS server, it parses the first two-bytes as a length field:
// https://github.com/c-ares/c-ares/blob/6360e96b5cf8e5980c887ce58ef727e53d77243a/src/lib/ares_process.c#L410.
// 4) Because the first two bytes were zero, c-ares attempts to malloc a
// zero-length buffer:
// https://github.com/c-ares/c-ares/blob/6360e96b5cf8e5980c887ce58ef727e53d77243a/src/lib/ares_process.c#L428.
// 5) Because c-ares' default_malloc(0) returns NULL
// (https://github.com/c-ares/c-ares/blob/7f3262312f246556d8c1bdd8ccc1844847f42787/src/lib/ares_library_init.c#L38),
// c-ares invokes handle_error and stops reading on the socket:
// https://github.com/c-ares/c-ares/blob/6360e96b5cf8e5980c887ce58ef727e53d77243a/src/lib/ares_process.c#L430.
// 6) Because we overwrite the socket "close" method, c-ares attempt to close
// the socket in handle_error does nothing except for removing the socket
// from ARES_GETSOCK_READABLE:
// https://github.com/grpc/grpc/blob/f6a994229e72bc771963706de7a0cd8aa9150bb6/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc#L156.
// 7) Because there is still one byte left in the TCP read buffer,
// IsFdStillReadableLocked will keep returning true:
// https://github.com/grpc/grpc/blob/f6a994229e72bc771963706de7a0cd8aa9150bb6/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc#L82.
// But c-ares will never try to read from that socket again, so we have an
// infinite busy loop.
TEST_F(CancelDuringAresQuery, TestQueryFailsWithDataRemainingInReadBuffer) {
#ifdef GPR_WINDOWS
GTEST_SKIP() << "TODO(apolcyn): try to unskip this test on windows after "
"https://github.com/grpc/grpc/pull/33965";
return;
#endif
if (grpc_core::IsEventEngineDnsEnabled()) {
g_event_engine_grpc_ares_test_only_force_tcp = true;
} else {
g_grpc_ares_test_only_force_tcp = true;
}
grpc_core::testing::SocketUseAfterCloseDetector
socket_use_after_close_detector;
grpc_core::testing::FakeUdpAndTcpServer fake_dns_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::SendThreeAllZeroBytes);
grpc_status_code expected_status_code = GRPC_STATUS_UNAVAILABLE;
// Don't really care about the deadline - we'll hit a DNS
// resolution failure quickly in any case.
gpr_timespec rpc_deadline = grpc_timeout_seconds_to_deadline(100);
int dns_query_timeout_ms = -1; // don't set query timeout
TestCancelDuringActiveQuery(
expected_status_code, "" /* expected error message substring */,
rpc_deadline, dns_query_timeout_ms, fake_dns_server.port());
if (grpc_core::IsEventEngineDnsEnabled()) {
g_event_engine_grpc_ares_test_only_force_tcp = false;
} else {
g_grpc_ares_test_only_force_tcp = false;
}
}
} // namespace
int main(int argc, char** argv) {

Loading…
Cancel
Save