[c-ares DNS resolver] Fix file descriptor use-after-close bug when c-ares writes succeed but subsequent read fails (#33871)

Normally, c-ares related fds are destroyed after all DNS resolution is
finished in [this code
path](c82d31677a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc (L210)).
Also there are some fds that c-ares may fail to open or write to
initially, and c-ares will close them internally before grpc ever knows
about them.

But if:
1) c-ares opens a socket and successfully writes a request on it
2) then a subsequent read fails

Then c-ares will close the fd in [this code
path](bad62225b7/src/lib/ares_process.c (L740)),
but gRPC will have a reference on the fd and will still use it
afterwards.

Fix here is to leverage the c-ares socket-override API to properly track
fd ownership between c-ares and grpc.

Related: internal issue b/292203138
pull/33929/head
apolcyn 2 years ago committed by GitHub
parent 98104bbc3c
commit 76203ba589
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CMakeLists.txt
  2. 2
      build_autogenerated.yaml
  3. 112
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  4. 5
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  5. 3
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
  6. 28
      test/core/util/BUILD
  7. 199
      test/core/util/socket_use_after_close_detector.cc
  8. 56
      test/core/util/socket_use_after_close_detector.h
  9. 1
      test/cpp/naming/BUILD
  10. 149
      test/cpp/naming/cancel_ares_query_test.cc
  11. 1
      test/cpp/naming/generate_resolver_component_tests.bzl
  12. 153
      test/cpp/naming/resolver_component_test.cc

1
CMakeLists.txt generated

@ -7661,6 +7661,7 @@ if(gRPC_BUILD_TESTS)
add_executable(cancel_ares_query_test
test/core/end2end/cq_verifier.cc
test/core/util/fake_udp_and_tcp_server.cc
test/core/util/socket_use_after_close_detector.cc
test/cpp/naming/cancel_ares_query_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc

@ -5495,9 +5495,11 @@ targets:
headers:
- test/core/end2end/cq_verifier.h
- test/core/util/fake_udp_and_tcp_server.h
- test/core/util/socket_use_after_close_detector.h
src:
- test/core/end2end/cq_verifier.cc
- test/core/util/fake_udp_and_tcp_server.cc
- test/core/util/socket_use_after_close_detector.cc
- test/cpp/naming/cancel_ares_query_test.cc
deps:
- grpc++_test_config

@ -17,28 +17,36 @@
//
#include <grpc/support/port_platform.h>
#include <memory>
#include <string>
#include "absl/base/thread_annotations.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/port.h"
#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
#include <string.h>
#include <ares_build.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <unistd.h>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
#include <ares.h>
#include "absl/base/thread_annotations.h"
#include "absl/strings/str_cat.h"
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
namespace grpc_core {
@ -98,12 +106,94 @@ class GrpcPolledFdPosix : public GrpcPolledFd {
class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
public:
~GrpcPolledFdFactoryPosix() override {
for (auto& fd : owned_fds_) {
close(fd);
}
}
GrpcPolledFd* NewGrpcPolledFdLocked(
ares_socket_t as, grpc_pollset_set* driver_pollset_set) override {
auto insert_result = owned_fds_.insert(as);
GPR_ASSERT(insert_result.second);
return new GrpcPolledFdPosix(as, driver_pollset_set);
}
void ConfigureAresChannelLocked(ares_channel /*channel*/) override {}
void ConfigureAresChannelLocked(ares_channel channel) override {
ares_set_socket_functions(channel, &kSockFuncs, this);
ares_set_socket_configure_callback(
channel, &GrpcPolledFdFactoryPosix::ConfigureSocket, nullptr);
}
private:
/// Overridden socket API for c-ares
static ares_socket_t Socket(int af, int type, int protocol,
void* /*user_data*/) {
return socket(af, type, protocol);
}
/// Overridden connect API for c-ares
static int Connect(ares_socket_t as, const struct sockaddr* target,
ares_socklen_t target_len, void* /*user_data*/) {
return connect(as, target, target_len);
}
/// Overridden writev API for c-ares
static ares_ssize_t WriteV(ares_socket_t as, const struct iovec* iov,
int iovec_count, void* /*user_data*/) {
return writev(as, iov, iovec_count);
}
/// Overridden recvfrom API for c-ares
static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
int flags, struct sockaddr* from,
ares_socklen_t* from_len, void* /*user_data*/) {
return recvfrom(as, data, data_len, flags, from, from_len);
}
/// Overridden close API for c-ares
static int Close(ares_socket_t as, void* user_data) {
GrpcPolledFdFactoryPosix* self =
static_cast<GrpcPolledFdFactoryPosix*>(user_data);
if (self->owned_fds_.find(as) == self->owned_fds_.end()) {
// c-ares owns this fd, grpc has never seen it
return close(as);
}
return 0;
}
/// Because we're using socket API overrides, c-ares won't
/// perform its typical configuration on the socket. See
/// https://github.com/c-ares/c-ares/blob/bad62225b7f6b278b92e8e85a255600b629ef517/src/lib/ares_process.c#L1018.
/// So we use the configure socket callback override and copy default
/// settings that c-ares would normally apply on posix platforms:
/// - non-blocking
/// - cloexec flag
/// - disable nagle */
static int ConfigureSocket(ares_socket_t fd, int type, void* /*user_data*/) {
grpc_error_handle err;
err = grpc_set_socket_nonblocking(fd, true);
if (!err.ok()) return -1;
err = grpc_set_socket_cloexec(fd, true);
if (!err.ok()) return -1;
if (type == SOCK_STREAM) {
err = grpc_set_socket_low_latency(fd, true);
if (!err.ok()) return -1;
}
return 0;
}
const struct ares_socket_functions kSockFuncs = {
&GrpcPolledFdFactoryPosix::Socket /* socket */,
&GrpcPolledFdFactoryPosix::Close /* close */,
&GrpcPolledFdFactoryPosix::Connect /* connect */,
&GrpcPolledFdFactoryPosix::RecvFrom /* recvfrom */,
&GrpcPolledFdFactoryPosix::WriteV /* writev */,
};
// fds that are used/owned by grpc - we (grpc) will close them rather than
// c-ares
std::unordered_set<ares_socket_t> owned_fds_;
};
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Mutex* /* mu */) {

@ -515,6 +515,8 @@ static void noop_inject_channel_config(ares_channel* /*channel*/) {}
void (*grpc_ares_test_only_inject_config)(ares_channel* channel) =
noop_inject_channel_config;
bool g_grpc_ares_test_only_force_tcp = false;
grpc_error_handle grpc_ares_ev_driver_create_locked(
grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set,
int query_timeout_ms, grpc_ares_request* request)
@ -523,6 +525,9 @@ grpc_error_handle grpc_ares_ev_driver_create_locked(
ares_options opts;
memset(&opts, 0, sizeof(opts));
opts.flags |= ARES_FLAG_STAYOPEN;
if (g_grpc_ares_test_only_force_tcp) {
opts.flags |= ARES_FLAG_USEVC;
}
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
grpc_ares_test_only_inject_config(&(*ev_driver)->channel);
GRPC_CARES_TRACE_LOG("request:%p grpc_ares_ev_driver_create_locked", request);

@ -133,4 +133,7 @@ void grpc_cares_wrapper_address_sorting_sort(
// Exposed in this header for C-core tests only
extern void (*grpc_ares_test_only_inject_config)(ares_channel* channel);
// Exposed in this header for C-core tests only
extern bool g_grpc_ares_test_only_force_tcp;
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H

@ -363,6 +363,34 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "socket_use_after_close_detector",
srcs = ["socket_use_after_close_detector.cc"],
hdrs = ["socket_use_after_close_detector.h"],
external_deps = ["gtest"],
language = "C++",
deps = [
"grpc_test_util",
"//:gpr",
"//:grpc",
"//src/core:grpc_sockaddr",
],
)
grpc_cc_library(
name = "socket_use_after_close_detector_unsecure",
srcs = ["socket_use_after_close_detector.cc"],
hdrs = ["socket_use_after_close_detector.h"],
external_deps = ["gtest"],
language = "C++",
deps = [
"grpc_test_util_unsecure",
"//:gpr",
"//:grpc",
"//src/core:grpc_sockaddr",
],
)
grpc_cc_library(
name = "build",
srcs = ["build.cc"],

@ -0,0 +1,199 @@
//
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "test/core/util/socket_use_after_close_detector.h"
#include <errno.h>
#include <fcntl.h>
#include <string.h>
// IWYU pragma: no_include <arpa/inet.h>
// IWYU pragma: no_include <unistd.h>
#include <algorithm>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include "gtest/gtest.h"
#include <grpc/support/sync.h>
#include "src/core/lib/iomgr/sockaddr.h"
#include "test/core/util/port.h"
// TODO(unknown): pull in different headers when enabling this
// test on windows. Also set BAD_SOCKET_RETURN_VAL
// to INVALID_SOCKET on windows.
#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_windows.h"
#define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
#else
#define BAD_SOCKET_RETURN_VAL (-1)
#endif
namespace {
#ifdef GPR_WINDOWS
void OpenAndCloseSocketsStressLoop(int port, gpr_event* done_ev) {
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port);
((char*)&addr.sin6_addr)[15] = 1;
for (;;) {
if (gpr_event_get(done_ev)) {
return;
}
std::vector<int> sockets;
for (size_t i = 0; i < 50; i++) {
SOCKET s = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
WSA_FLAG_OVERLAPPED);
ASSERT_TRUE(s != BAD_SOCKET_RETURN_VAL)
<< "Failed to create TCP ipv6 socket";
char val = 1;
ASSERT_TRUE(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
SOCKET_ERROR)
<< "Failed to set socketopt reuseaddr. WSA error: " +
std::to_string(WSAGetLastError());
ASSERT_TRUE(grpc_tcp_set_non_block(s) == absl::OkStatus())
<< "Failed to set socket non-blocking";
ASSERT_TRUE(bind(s, (const sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR)
<< "Failed to bind socket " + std::to_string(s) +
" to [::1]:" + std::to_string(port) +
". WSA error: " + std::to_string(WSAGetLastError());
ASSERT_TRUE(listen(s, 1) != SOCKET_ERROR)
<< "Failed to listen on socket " + std::to_string(s) +
". WSA error: " + std::to_string(WSAGetLastError());
sockets.push_back(s);
}
// Do a non-blocking accept followed by a close on all of those sockets.
// Do this in a separate loop to try to induce a time window to hit races.
for (size_t i = 0; i < sockets.size(); i++) {
ASSERT_TRUE(accept(sockets[i], nullptr, nullptr) == INVALID_SOCKET)
<< "Accept on phony socket unexpectedly accepted actual connection.";
ASSERT_TRUE(WSAGetLastError() == WSAEWOULDBLOCK)
<< "OpenAndCloseSocketsStressLoop accept on socket " +
std::to_string(sockets[i]) +
" failed in "
"an unexpected way. "
"WSA error: " +
std::to_string(WSAGetLastError()) +
". Socket use-after-close bugs are likely.";
ASSERT_TRUE(closesocket(sockets[i]) != SOCKET_ERROR)
<< "Failed to close socket: " + std::to_string(sockets[i]) +
". WSA error: " + std::to_string(WSAGetLastError());
}
}
return;
}
#else
void OpenAndCloseSocketsStressLoop(int port, gpr_event* done_ev) {
// The goal of this loop is to catch socket
// "use after close" bugs within the c-ares resolver by acting
// like some separate thread doing I/O.
// It's goal is to try to hit race conditions whereby:
// 1) The c-ares resolver closes a socket.
// 2) This loop opens a socket with (coincidentally) the same handle.
// 3) the c-ares resolver mistakenly uses that same socket without
// realizing that its closed.
// 4) This loop performs an operation on that socket that should
// succeed but instead fails because of what the c-ares
// resolver did in the meantime.
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port);
(reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
for (;;) {
if (gpr_event_get(done_ev)) {
return;
}
std::vector<int> sockets;
// First open a bunch of sockets, bind and listen
// '50' is an arbitrary number that, experimentally,
// has a good chance of catching bugs.
for (size_t i = 0; i < 50; i++) {
int s = socket(AF_INET6, SOCK_STREAM, 0);
int val = 1;
ASSERT_TRUE(setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)) ==
0)
<< "Failed to set socketopt reuseport";
ASSERT_TRUE(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) ==
0)
<< "Failed to set socket reuseaddr";
ASSERT_TRUE(fcntl(s, F_SETFL, O_NONBLOCK) == 0)
<< "Failed to set socket non-blocking";
ASSERT_TRUE(s != BAD_SOCKET_RETURN_VAL)
<< "Failed to create TCP ipv6 socket";
ASSERT_TRUE(bind(s, (const sockaddr*)&addr, sizeof(addr)) == 0)
<< "Failed to bind socket " + std::to_string(s) +
" to [::1]:" + std::to_string(port) +
". errno: " + std::to_string(errno);
ASSERT_TRUE(listen(s, 1) == 0) << "Failed to listen on socket " +
std::to_string(s) +
". errno: " + std::to_string(errno);
sockets.push_back(s);
}
// Do a non-blocking accept followed by a close on all of those sockets.
// Do this in a separate loop to try to induce a time window to hit races.
for (size_t i = 0; i < sockets.size(); i++) {
if (accept(sockets[i], nullptr, nullptr)) {
// If e.g. a "shutdown" was called on this fd from another thread,
// then this accept call should fail with an unexpected error.
ASSERT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK)
<< "OpenAndCloseSocketsStressLoop accept on socket " +
std::to_string(sockets[i]) +
" failed in "
"an unexpected way. "
"errno: " +
std::to_string(errno) +
". Socket use-after-close bugs are likely.";
}
ASSERT_TRUE(close(sockets[i]) == 0)
<< "Failed to close socket: " + std::to_string(sockets[i]) +
". errno: " + std::to_string(errno);
}
}
}
#endif
} // namespace
namespace grpc_core {
namespace testing {
SocketUseAfterCloseDetector::SocketUseAfterCloseDetector() {
int port = grpc_pick_unused_port_or_die();
gpr_event_init(&done_ev_);
thread_ = std::make_unique<std::thread>(OpenAndCloseSocketsStressLoop, port,
&done_ev_);
}
SocketUseAfterCloseDetector::~SocketUseAfterCloseDetector() {
gpr_event_set(&done_ev_, reinterpret_cast<void*>(1));
thread_->join();
}
} // namespace testing
} // namespace grpc_core

@ -0,0 +1,56 @@
//
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_TEST_CORE_UTIL_SOCKET_USE_AFTER_CLOSE_DETECTOR_H
#define GRPC_TEST_CORE_UTIL_SOCKET_USE_AFTER_CLOSE_DETECTOR_H
#include <grpc/support/port_platform.h>
#include <memory>
#include <thread>
#include <grpc/support/sync_generic.h>
namespace grpc_core {
namespace testing {
// This class is meant to detect file descriptor use-after-close
// bugs occuring somewhere in the program while the object is in live.
// The implementation currently uses a background thread to open
// and close sockets in a loop, catching socket use-after-close bugs
// by watching them manifest as unexpected socket operation failures.
//
// Note: this will not give false positives but may give false negatives.
// That said this seems to be fairly reliable at finding use-after-close
// bugs, at least on linux, because of fd handles being quickly reused.
// For example this was able to catch the use-after-close bug from
// https://github.com/grpc/grpc/pull/33871 "almost every time".
class SocketUseAfterCloseDetector {
public:
SocketUseAfterCloseDetector();
~SocketUseAfterCloseDetector();
private:
std::unique_ptr<std::thread> thread_;
gpr_event done_ev_;
};
} // namespace testing
} // namespace grpc_core
#endif // GRPC_TEST_CORE_UTIL_SOCKET_USE_AFTER_CLOSE_DETECTOR_H

@ -47,6 +47,7 @@ grpc_cc_test(
"//test/core/end2end:cq_verifier",
"//test/core/util:fake_udp_and_tcp_server",
"//test/core/util:grpc_test_util",
"//test/core/util:socket_use_after_close_detector",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
],

@ -33,6 +33,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/config/core_configuration.h"
@ -54,6 +55,7 @@
#include "test/core/util/cmdline.h"
#include "test/core/util/fake_udp_and_tcp_server.h"
#include "test/core/util/port.h"
#include "test/core/util/socket_use_after_close_detector.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/test_config.h"
@ -277,71 +279,23 @@ TEST_F(CancelDuringAresQuery, TestFdsAreDeletedFromPollsetSet) {
grpc_pollset_set_destroy(fake_other_pollset_set);
}
// Settings for TestCancelDuringActiveQuery test
typedef enum {
NONE,
SHORT,
ZERO,
} cancellation_test_query_timeout_setting;
std::string kFakeName = "dont-care-since-wont-be-resolved.test.com:1234";
void TestCancelDuringActiveQuery(
cancellation_test_query_timeout_setting query_timeout_setting) {
// Start up fake non responsive DNS server
grpc_core::testing::FakeUdpAndTcpServer fake_dns_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
grpc_status_code expected_status_code,
absl::string_view expected_error_message_substring,
gpr_timespec rpc_deadline, int dns_query_timeout_ms,
int fake_dns_server_port) {
// Create a call that will try to use the fake DNS server
std::string name = "dont-care-since-wont-be-resolved.test.com:1234";
std::string client_target =
absl::StrFormat("dns://[::1]:%d/%s", fake_dns_server.port(), name);
gpr_log(GPR_DEBUG, "TestCancelActiveDNSQuery. query timeout setting: %d",
query_timeout_setting);
absl::StrFormat("dns://[::1]:%d/%s", fake_dns_server_port, kFakeName);
grpc_channel_args* client_args = nullptr;
grpc_status_code expected_status_code = GRPC_STATUS_OK;
std::string expected_error_message_substring;
gpr_timespec rpc_deadline;
if (query_timeout_setting == NONE) {
// The RPC deadline should go off well before the DNS resolution
// timeout fires.
expected_status_code = GRPC_STATUS_DEADLINE_EXCEEDED;
// use default DNS resolution timeout (which is over one minute).
client_args = nullptr;
rpc_deadline = grpc_timeout_milliseconds_to_deadline(100);
} else if (query_timeout_setting == SHORT) {
// The DNS resolution timeout should fire well before the
// RPC's deadline expires.
expected_status_code = GRPC_STATUS_UNAVAILABLE;
if (grpc_core::IsEventEngineDnsEnabled()) {
expected_error_message_substring =
absl::StrCat("errors resolving ", name);
} else {
expected_error_message_substring =
absl::StrCat("DNS resolution failed for ", name);
}
grpc_arg arg;
arg.type = GRPC_ARG_INTEGER;
arg.key = const_cast<char*>(GRPC_ARG_DNS_ARES_QUERY_TIMEOUT_MS);
arg.value.integer =
1; // Set this shorter than the call deadline so that it goes off.
client_args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
// Set the deadline high enough such that if we hit this and get
// a deadline exceeded status code, then we are confident that there's
// a bug causing cancellation of DNS resolutions to not happen in a timely
// manner.
rpc_deadline = grpc_timeout_seconds_to_deadline(10);
} else if (query_timeout_setting == ZERO) {
// The RPC deadline should go off well before the DNS resolution
// timeout fires.
expected_status_code = GRPC_STATUS_DEADLINE_EXCEEDED;
if (dns_query_timeout_ms >= 0) {
grpc_arg arg;
arg.type = GRPC_ARG_INTEGER;
arg.key = const_cast<char*>(GRPC_ARG_DNS_ARES_QUERY_TIMEOUT_MS);
arg.value.integer = 0; // Set this to zero to disable query timeouts.
arg.value.integer = dns_query_timeout_ms;
client_args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
rpc_deadline = grpc_timeout_milliseconds_to_deadline(100);
} else {
abort();
}
grpc_channel_credentials* creds = grpc_insecure_credentials_create();
grpc_channel* client =
@ -413,19 +367,96 @@ void TestCancelDuringActiveQuery(
TEST_F(CancelDuringAresQuery,
TestHitDeadlineAndDestroyChannelDuringAresResolutionIsGraceful) {
TestCancelDuringActiveQuery(NONE /* don't set query timeouts */);
grpc_core::testing::SocketUseAfterCloseDetector
socket_use_after_close_detector;
grpc_core::testing::FakeUdpAndTcpServer fake_dns_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
grpc_status_code expected_status_code = GRPC_STATUS_DEADLINE_EXCEEDED;
// The RPC deadline should go off well before the DNS resolution
// timeout fires.
gpr_timespec rpc_deadline = grpc_timeout_milliseconds_to_deadline(100);
int dns_query_timeout_ms = -1; // don't set query timeout
TestCancelDuringActiveQuery(
expected_status_code, "" /* expected error message substring */,
rpc_deadline, dns_query_timeout_ms, fake_dns_server.port());
}
TEST_F(
CancelDuringAresQuery,
TestHitDeadlineAndDestroyChannelDuringAresResolutionWithQueryTimeoutIsGraceful) {
TestCancelDuringActiveQuery(SHORT /* set short query timeout */);
grpc_core::testing::SocketUseAfterCloseDetector
socket_use_after_close_detector;
grpc_core::testing::FakeUdpAndTcpServer fake_dns_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
grpc_status_code expected_status_code = GRPC_STATUS_UNAVAILABLE;
std::string expected_error_message_substring;
if (grpc_core::IsEventEngineDnsEnabled()) {
expected_error_message_substring =
absl::StrCat("errors resolving ", kFakeName);
} else {
expected_error_message_substring =
absl::StrCat("DNS resolution failed for ", kFakeName);
}
// The DNS resolution timeout should fire well before the
// RPC's deadline expires.
gpr_timespec rpc_deadline = grpc_timeout_seconds_to_deadline(10);
int dns_query_timeout_ms = 1;
TestCancelDuringActiveQuery(expected_status_code,
expected_error_message_substring, rpc_deadline,
dns_query_timeout_ms, fake_dns_server.port());
}
TEST_F(
CancelDuringAresQuery,
TestHitDeadlineAndDestroyChannelDuringAresResolutionWithZeroQueryTimeoutIsGraceful) {
TestCancelDuringActiveQuery(ZERO /* disable query timeouts */);
grpc_core::testing::SocketUseAfterCloseDetector
socket_use_after_close_detector;
grpc_core::testing::FakeUdpAndTcpServer fake_dns_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
grpc_status_code expected_status_code = GRPC_STATUS_DEADLINE_EXCEEDED;
// The RPC deadline should go off well before the DNS resolution
// timeout fires.
gpr_timespec rpc_deadline = grpc_timeout_milliseconds_to_deadline(100);
int dns_query_timeout_ms = 0; // disable query timeouts
TestCancelDuringActiveQuery(
expected_status_code, "" /* expected error message substring */,
rpc_deadline, dns_query_timeout_ms, fake_dns_server.port());
}
TEST_F(CancelDuringAresQuery, TestQueryFailsBecauseTcpServerClosesSocket) {
grpc_core::testing::SocketUseAfterCloseDetector
socket_use_after_close_detector;
// Use a fake TCP server that immediately closes the socket and causes
// c-ares to pick up a socket read error, while the previous socket
// connect/writes succeeded. Meanwhile, force c-ares to only use TCP.
// The goal is to hit a socket use-after-close bug described in
// https://github.com/grpc/grpc/pull/33871.
grpc_core::testing::FakeUdpAndTcpServer fake_dns_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::
CloseSocketUponReceivingBytesFromPeer);
// TODO(yijiem): make this test work with the EE DNS resolver by supporting
// this test flag to force TCP in the EE DNS resolver.
if (grpc_core::IsEventEngineDnsEnabled()) return;
g_grpc_ares_test_only_force_tcp = true;
grpc_status_code expected_status_code = GRPC_STATUS_UNAVAILABLE;
std::string expected_error_message_substring =
absl::StrCat("DNS resolution failed for ", kFakeName);
// Don't really care about the deadline - we should quickly hit a DNS
// resolution failure.
gpr_timespec rpc_deadline = grpc_timeout_seconds_to_deadline(100);
int dns_query_timeout_ms = -1; // don't set query timeout
TestCancelDuringActiveQuery(expected_status_code,
expected_error_message_substring, rpc_deadline,
dns_query_timeout_ms, fake_dns_server.port());
g_grpc_ares_test_only_force_tcp = false;
}
} // namespace

@ -59,6 +59,7 @@ def generate_resolver_component_tests():
"//test/cpp/util:test_util%s" % unsecure_build_config_suffix,
"//test/core/util:grpc_test_util%s" % unsecure_build_config_suffix,
"//test/core/util:fake_udp_and_tcp_server%s" % unsecure_build_config_suffix,
"//test/core/util:socket_use_after_close_detector%s" % unsecure_build_config_suffix,
"//:grpc++%s" % unsecure_build_config_suffix,
"//:grpc%s" % unsecure_build_config_suffix,
"//:gpr",

@ -64,23 +64,11 @@
#include "src/core/lib/resolver/server_address.h"
#include "test/core/util/fake_udp_and_tcp_server.h"
#include "test/core/util/port.h"
#include "test/core/util/socket_use_after_close_detector.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/subprocess.h"
#include "test/cpp/util/test_config.h"
// TODO(unknown): pull in different headers when enabling this
// test on windows. Also set BAD_SOCKET_RETURN_VAL
// to INVALID_SOCKET on windows.
#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/sockaddr_windows.h"
#include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_windows.h"
#define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
#else
#include "src/core/lib/iomgr/sockaddr_posix.h"
#define BAD_SOCKET_RETURN_VAL (-1)
#endif
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
using std::vector;
using testing::UnorderedElementsAreArray;
@ -303,134 +291,6 @@ void CheckLBPolicyResultLocked(const grpc_core::ChannelArgs channel_args,
}
}
#ifdef GPR_WINDOWS
void OpenAndCloseSocketsStressLoop(int phony_port, gpr_event* done_ev) {
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(phony_port);
((char*)&addr.sin6_addr)[15] = 1;
for (;;) {
if (gpr_event_get(done_ev)) {
return;
}
std::vector<int> sockets;
for (size_t i = 0; i < 50; i++) {
SOCKET s = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
WSA_FLAG_OVERLAPPED);
ASSERT_TRUE(s != BAD_SOCKET_RETURN_VAL)
<< "Failed to create TCP ipv6 socket";
gpr_log(GPR_DEBUG, "Opened socket: %d", s);
char val = 1;
ASSERT_TRUE(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
SOCKET_ERROR)
<< "Failed to set socketopt reuseaddr. WSA error: " +
std::to_string(WSAGetLastError());
ASSERT_TRUE(grpc_tcp_set_non_block(s) == absl::OkStatus())
<< "Failed to set socket non-blocking";
ASSERT_TRUE(bind(s, (const sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR)
<< "Failed to bind socket " + std::to_string(s) +
" to [::1]:" + std::to_string(phony_port) +
". WSA error: " + std::to_string(WSAGetLastError());
ASSERT_TRUE(listen(s, 1) != SOCKET_ERROR)
<< "Failed to listen on socket " + std::to_string(s) +
". WSA error: " + std::to_string(WSAGetLastError());
sockets.push_back(s);
}
// Do a non-blocking accept followed by a close on all of those sockets.
// Do this in a separate loop to try to induce a time window to hit races.
for (size_t i = 0; i < sockets.size(); i++) {
gpr_log(GPR_DEBUG, "non-blocking accept then close on %d", sockets[i]);
ASSERT_TRUE(accept(sockets[i], nullptr, nullptr) == INVALID_SOCKET)
<< "Accept on phony socket unexpectedly accepted actual connection.";
ASSERT_TRUE(WSAGetLastError() == WSAEWOULDBLOCK)
<< "OpenAndCloseSocketsStressLoop accept on socket " +
std::to_string(sockets[i]) +
" failed in "
"an unexpected way. "
"WSA error: " +
std::to_string(WSAGetLastError()) +
". Socket use-after-close bugs are likely.";
ASSERT_TRUE(closesocket(sockets[i]) != SOCKET_ERROR)
<< "Failed to close socket: " + std::to_string(sockets[i]) +
". WSA error: " + std::to_string(WSAGetLastError());
}
}
return;
}
#else
void OpenAndCloseSocketsStressLoop(int phony_port, gpr_event* done_ev) {
// The goal of this loop is to catch socket
// "use after close" bugs within the c-ares resolver by acting
// like some separate thread doing I/O.
// It's goal is to try to hit race conditions whereby:
// 1) The c-ares resolver closes a socket.
// 2) This loop opens a socket with (coincidentally) the same handle.
// 3) the c-ares resolver mistakenly uses that same socket without
// realizing that its closed.
// 4) This loop performs an operation on that socket that should
// succeed but instead fails because of what the c-ares
// resolver did in the meantime.
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(phony_port);
(reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
for (;;) {
if (gpr_event_get(done_ev)) {
return;
}
std::vector<int> sockets;
// First open a bunch of sockets, bind and listen
// '50' is an arbitrary number that, experimentally,
// has a good chance of catching bugs.
for (size_t i = 0; i < 50; i++) {
int s = socket(AF_INET6, SOCK_STREAM, 0);
int val = 1;
ASSERT_TRUE(setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)) ==
0)
<< "Failed to set socketopt reuseport";
ASSERT_TRUE(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) ==
0)
<< "Failed to set socket reuseaddr";
ASSERT_TRUE(fcntl(s, F_SETFL, O_NONBLOCK) == 0)
<< "Failed to set socket non-blocking";
ASSERT_TRUE(s != BAD_SOCKET_RETURN_VAL)
<< "Failed to create TCP ipv6 socket";
gpr_log(GPR_DEBUG, "Opened fd: %d", s);
ASSERT_TRUE(bind(s, (const sockaddr*)&addr, sizeof(addr)) == 0)
<< "Failed to bind socket " + std::to_string(s) +
" to [::1]:" + std::to_string(phony_port) +
". errno: " + std::to_string(errno);
ASSERT_TRUE(listen(s, 1) == 0) << "Failed to listen on socket " +
std::to_string(s) +
". errno: " + std::to_string(errno);
sockets.push_back(s);
}
// Do a non-blocking accept followed by a close on all of those sockets.
// Do this in a separate loop to try to induce a time window to hit races.
for (size_t i = 0; i < sockets.size(); i++) {
gpr_log(GPR_DEBUG, "non-blocking accept then close on %d", sockets[i]);
if (accept(sockets[i], nullptr, nullptr)) {
// If e.g. a "shutdown" was called on this fd from another thread,
// then this accept call should fail with an unexpected error.
ASSERT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK)
<< "OpenAndCloseSocketsStressLoop accept on socket " +
std::to_string(sockets[i]) +
" failed in "
"an unexpected way. "
"errno: " +
std::to_string(errno) +
". Socket use-after-close bugs are likely.";
}
ASSERT_TRUE(close(sockets[i]) == 0)
<< "Failed to close socket: " + std::to_string(sockets[i]) +
". errno: " + std::to_string(errno);
}
}
}
#endif
class ResultHandler : public grpc_core::Resolver::ResultHandler {
public:
static std::unique_ptr<grpc_core::Resolver::ResultHandler> Create(
@ -664,18 +524,11 @@ TEST(ResolverComponentTest, TestResolvesRelevantRecords) {
}
TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) {
// Start up background stress thread
int phony_port = grpc_pick_unused_port_or_die();
gpr_event done_ev;
gpr_event_init(&done_ev);
std::thread socket_stress_thread(OpenAndCloseSocketsStressLoop, phony_port,
&done_ev);
grpc_core::testing::SocketUseAfterCloseDetector
socket_use_after_close_detector;
// Run the resolver test
RunResolvesRelevantRecordsTest(ResultHandler::Create,
grpc_core::ChannelArgs());
// Shutdown and join stress thread
gpr_event_set(&done_ev, reinterpret_cast<void*>(1));
socket_stress_thread.join();
}
TEST(ResolverComponentTest, TestDoesntCrashOrHangWith1MsTimeout) {

Loading…
Cancel
Save