Merge pull request #15435 from apolcyn/fix_shutdown_closed_socket

Fix shutdown of closed fd when c-ares opens a second fd
pull/15648/head
apolcyn 7 years ago committed by GitHub
commit 701a4b1c32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 45
      CMakeLists.txt
  2. 48
      Makefile
  3. 65
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  4. 7
      src/core/lib/iomgr/ev_epollex_linux.cc
  5. 18
      test/cpp/naming/BUILD
  6. 289
      test/cpp/naming/cancel_ares_query_test.cc
  7. 19
      test/cpp/naming/gen_build_yaml.py
  8. 110
      test/cpp/naming/resolver_component_test.cc
  9. 20
      tools/run_tests/generated/sources_and_headers.json
  10. 22
      tools/run_tests/generated/tests.json

@ -663,6 +663,9 @@ endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx address_sorting_test) add_dependencies(buildtests_cxx address_sorting_test)
endif() endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx cancel_ares_query_test)
endif()
add_custom_target(buildtests add_custom_target(buildtests
DEPENDS buildtests_c buildtests_cxx) DEPENDS buildtests_c buildtests_cxx)
@ -15840,6 +15843,48 @@ target_link_libraries(address_sorting_test
${_gRPC_GFLAGS_LIBRARIES} ${_gRPC_GFLAGS_LIBRARIES}
) )
endif()
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(cancel_ares_query_test
test/cpp/naming/cancel_ares_query_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(cancel_ares_query_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(cancel_ares_query_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_test_util
grpc_test_util
gpr_test_util
grpc++
grpc
gpr
grpc++_test_config
${_gRPC_GFLAGS_LIBRARIES}
)
endif() endif()
endif (gRPC_BUILD_TESTS) endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS)

@ -1316,6 +1316,7 @@ resolver_component_tests_runner_invoker_unsecure: $(BINDIR)/$(CONFIG)/resolver_c
resolver_component_tests_runner_invoker: $(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker resolver_component_tests_runner_invoker: $(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker
address_sorting_test_unsecure: $(BINDIR)/$(CONFIG)/address_sorting_test_unsecure address_sorting_test_unsecure: $(BINDIR)/$(CONFIG)/address_sorting_test_unsecure
address_sorting_test: $(BINDIR)/$(CONFIG)/address_sorting_test address_sorting_test: $(BINDIR)/$(CONFIG)/address_sorting_test
cancel_ares_query_test: $(BINDIR)/$(CONFIG)/cancel_ares_query_test
alts_credentials_fuzzer_one_entry: $(BINDIR)/$(CONFIG)/alts_credentials_fuzzer_one_entry alts_credentials_fuzzer_one_entry: $(BINDIR)/$(CONFIG)/alts_credentials_fuzzer_one_entry
api_fuzzer_one_entry: $(BINDIR)/$(CONFIG)/api_fuzzer_one_entry api_fuzzer_one_entry: $(BINDIR)/$(CONFIG)/api_fuzzer_one_entry
client_fuzzer_one_entry: $(BINDIR)/$(CONFIG)/client_fuzzer_one_entry client_fuzzer_one_entry: $(BINDIR)/$(CONFIG)/client_fuzzer_one_entry
@ -1753,6 +1754,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker \ $(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker \
$(BINDIR)/$(CONFIG)/address_sorting_test_unsecure \ $(BINDIR)/$(CONFIG)/address_sorting_test_unsecure \
$(BINDIR)/$(CONFIG)/address_sorting_test \ $(BINDIR)/$(CONFIG)/address_sorting_test \
$(BINDIR)/$(CONFIG)/cancel_ares_query_test \
else else
buildtests_cxx: privatelibs_cxx \ buildtests_cxx: privatelibs_cxx \
@ -1877,6 +1879,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker \ $(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker \
$(BINDIR)/$(CONFIG)/address_sorting_test_unsecure \ $(BINDIR)/$(CONFIG)/address_sorting_test_unsecure \
$(BINDIR)/$(CONFIG)/address_sorting_test \ $(BINDIR)/$(CONFIG)/address_sorting_test \
$(BINDIR)/$(CONFIG)/cancel_ares_query_test \
endif endif
@ -2362,6 +2365,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/address_sorting_test_unsecure || ( echo test address_sorting_test_unsecure failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/address_sorting_test_unsecure || ( echo test address_sorting_test_unsecure failed ; exit 1 )
$(E) "[RUN] Testing address_sorting_test" $(E) "[RUN] Testing address_sorting_test"
$(Q) $(BINDIR)/$(CONFIG)/address_sorting_test || ( echo test address_sorting_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/address_sorting_test || ( echo test address_sorting_test failed ; exit 1 )
$(E) "[RUN] Testing cancel_ares_query_test"
$(Q) $(BINDIR)/$(CONFIG)/cancel_ares_query_test || ( echo test cancel_ares_query_test failed ; exit 1 )
flaky_test_cxx: buildtests_cxx flaky_test_cxx: buildtests_cxx
@ -23687,6 +23692,49 @@ endif
endif endif
CANCEL_ARES_QUERY_TEST_SRC = \
test/cpp/naming/cancel_ares_query_test.cc \
CANCEL_ARES_QUERY_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CANCEL_ARES_QUERY_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/cancel_ares_query_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/cancel_ares_query_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/cancel_ares_query_test: $(PROTOBUF_DEP) $(CANCEL_ARES_QUERY_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(CANCEL_ARES_QUERY_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/cancel_ares_query_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/naming/cancel_ares_query_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
deps_cancel_ares_query_test: $(CANCEL_ARES_QUERY_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(CANCEL_ARES_QUERY_TEST_OBJS:.o=.dep)
endif
endif
ALTS_CREDENTIALS_FUZZER_ONE_ENTRY_SRC = \ ALTS_CREDENTIALS_FUZZER_ONE_ENTRY_SRC = \
test/core/security/alts_credentials_fuzzer.cc \ test/core/security/alts_credentials_fuzzer.cc \
test/core/util/one_corpus_entry_fuzzer.cc \ test/core/util/one_corpus_entry_fuzzer.cc \

@ -21,6 +21,7 @@
#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
#include <ares.h> #include <ares.h>
#include <string.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
@ -55,8 +56,8 @@ typedef struct fd_node {
bool readable_registered; bool readable_registered;
/** if the writable closure has been registered */ /** if the writable closure has been registered */
bool writable_registered; bool writable_registered;
/** if the fd is being shut down */ /** if the fd has been shutdown yet from grpc iomgr perspective */
bool shutting_down; bool already_shutdown;
} fd_node; } fd_node;
struct grpc_ares_ev_driver { struct grpc_ares_ev_driver {
@ -101,25 +102,26 @@ static void fd_node_destroy(fd_node* fdn) {
gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd));
GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(!fdn->writable_registered);
GPR_ASSERT(fdn->already_shutdown);
gpr_mu_destroy(&fdn->mu); gpr_mu_destroy(&fdn->mu);
/* TODO: we need to pass a non-null "release_fd" parameter to
* grpc_fd_orphan because "epollsig" iomgr will close the fd
* even if "already_closed" is true, and it only leaves it open
* if "release_fd" is non-null. This is unlike the rest of the
* pollers, should this be changed within epollsig? */
int dummy_release_fd;
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */ grpc_fd_orphan. */
grpc_fd_orphan(fdn->fd, nullptr, nullptr, true /* already_closed */, grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, true /* already_closed */,
"c-ares query finished"); "c-ares query finished");
gpr_free(fdn); gpr_free(fdn);
} }
static void fd_node_shutdown(fd_node* fdn) { static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
gpr_mu_lock(&fdn->mu); if (!fdn->already_shutdown) {
fdn->shutting_down = true; fdn->already_shutdown = true;
if (!fdn->readable_registered && !fdn->writable_registered) { grpc_fd_shutdown(fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason));
gpr_mu_unlock(&fdn->mu);
fd_node_destroy(fdn);
} else {
grpc_fd_shutdown(
fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("c-ares fd shutdown"));
gpr_mu_unlock(&fdn->mu);
} }
} }
@ -127,7 +129,10 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set) { grpc_pollset_set* pollset_set) {
*ev_driver = static_cast<grpc_ares_ev_driver*>( *ev_driver = static_cast<grpc_ares_ev_driver*>(
gpr_malloc(sizeof(grpc_ares_ev_driver))); gpr_malloc(sizeof(grpc_ares_ev_driver)));
int status = ares_init(&(*ev_driver)->channel); ares_options opts;
memset(&opts, 0, sizeof(opts));
opts.flags |= ARES_FLAG_STAYOPEN;
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create"); gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create");
if (status != ARES_SUCCESS) { if (status != ARES_SUCCESS) {
char* err_msg; char* err_msg;
@ -164,8 +169,9 @@ void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver) {
ev_driver->shutting_down = true; ev_driver->shutting_down = true;
fd_node* fn = ev_driver->fds; fd_node* fn = ev_driver->fds;
while (fn != nullptr) { while (fn != nullptr) {
grpc_fd_shutdown(fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( gpr_mu_lock(&fn->mu);
"grpc_ares_ev_driver_shutdown")); fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
gpr_mu_unlock(&fn->mu);
fn = fn->next; fn = fn->next;
} }
gpr_mu_unlock(&ev_driver->mu); gpr_mu_unlock(&ev_driver->mu);
@ -202,14 +208,7 @@ static void on_readable_cb(void* arg, grpc_error* error) {
gpr_mu_lock(&fdn->mu); gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd); const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->readable_registered = false; fdn->readable_registered = false;
if (fdn->shutting_down && !fdn->writable_registered) {
gpr_mu_unlock(&fdn->mu);
fd_node_destroy(fdn);
grpc_ares_ev_driver_unref(ev_driver);
return;
}
gpr_mu_unlock(&fdn->mu); gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "readable on %d", fd); gpr_log(GPR_DEBUG, "readable on %d", fd);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
do { do {
@ -236,14 +235,7 @@ static void on_writable_cb(void* arg, grpc_error* error) {
gpr_mu_lock(&fdn->mu); gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd); const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->writable_registered = false; fdn->writable_registered = false;
if (fdn->shutting_down && !fdn->readable_registered) {
gpr_mu_unlock(&fdn->mu);
fd_node_destroy(fdn);
grpc_ares_ev_driver_unref(ev_driver);
return;
}
gpr_mu_unlock(&fdn->mu); gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "writable on %d", fd); gpr_log(GPR_DEBUG, "writable on %d", fd);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd); ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd);
@ -288,7 +280,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
fdn->ev_driver = ev_driver; fdn->ev_driver = ev_driver;
fdn->readable_registered = false; fdn->readable_registered = false;
fdn->writable_registered = false; fdn->writable_registered = false;
fdn->shutting_down = false; fdn->already_shutdown = false;
gpr_mu_init(&fdn->mu); gpr_mu_init(&fdn->mu);
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn, GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
@ -329,7 +321,16 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
while (ev_driver->fds != nullptr) { while (ev_driver->fds != nullptr) {
fd_node* cur = ev_driver->fds; fd_node* cur = ev_driver->fds;
ev_driver->fds = ev_driver->fds->next; ev_driver->fds = ev_driver->fds->next;
fd_node_shutdown(cur); gpr_mu_lock(&cur->mu);
fd_node_shutdown_locked(cur, "c-ares fd shutdown");
if (!cur->readable_registered && !cur->writable_registered) {
gpr_mu_unlock(&cur->mu);
fd_node_destroy(cur);
} else {
cur->next = new_list;
new_list = cur;
gpr_mu_unlock(&cur->mu);
}
} }
ev_driver->fds = new_list; ev_driver->fds = new_list;
// If the ev driver has no working fd, all the tasks are done. // If the ev driver has no working fd, all the tasks are done.

@ -438,7 +438,12 @@ static bool fd_is_shutdown(grpc_fd* fd) {
/* Might be called multiple times */ /* Might be called multiple times */
static void fd_shutdown(grpc_fd* fd, grpc_error* why) { static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR); if (shutdown(fd->fd, SHUT_RDWR)) {
if (errno != ENOTCONN) {
gpr_log(GPR_ERROR, "Error shutting down fd %d. errno: %d",
grpc_fd_wrapped_fd(fd), errno);
}
}
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
} }
GRPC_ERROR_UNREF(why); GRPC_ERROR_UNREF(why);

@ -22,7 +22,7 @@ package(
licenses(["notice"]) # Apache v2 licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_py_binary") load("//bazel:grpc_build_system.bzl", "grpc_py_binary", "grpc_cc_test")
load(":generate_resolver_component_tests.bzl", "generate_resolver_component_tests") load(":generate_resolver_component_tests.bzl", "generate_resolver_component_tests")
@ -35,4 +35,20 @@ grpc_py_binary(
testonly = True, testonly = True,
) )
grpc_cc_test(
name = "cancel_ares_query_test",
srcs = ["cancel_ares_query_test.cc"],
external_deps = ["gmock"],
deps = [
"//test/cpp/util:test_util",
"//test/core/util:grpc_test_util",
"//test/core/util:gpr_test_util",
"//:grpc++",
"//:grpc",
"//:gpr",
"//test/cpp/util:test_config",
"//test/core/end2end:cq_verifier",
],
)
generate_resolver_component_tests() generate_resolver_component_tests()

@ -0,0 +1,289 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <stdio.h>
#include <string.h>
#include <gflags/gflags.h>
#include <gmock/gmock.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "include/grpc/support/string_util.h"
#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/util/cmdline.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
// TODO: pull in different headers when enabling this
// test on windows. Also set BAD_SOCKET_RETURN_VAL
// to INVALID_SOCKET on windows.
#include "src/core/lib/iomgr/sockaddr_posix.h"
#define BAD_SOCKET_RETURN_VAL -1
namespace {
void* Tag(intptr_t t) { return (void*)t; }
gpr_timespec FiveSecondsFromNow(void) {
return grpc_timeout_seconds_to_deadline(5);
}
void DrainCq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, FiveSecondsFromNow(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
void EndTest(grpc_channel* client, grpc_completion_queue* cq) {
grpc_channel_destroy(client);
grpc_completion_queue_shutdown(cq);
DrainCq(cq);
grpc_completion_queue_destroy(cq);
}
class FakeNonResponsiveDNSServer {
public:
FakeNonResponsiveDNSServer(int port) {
socket_ = socket(AF_INET6, SOCK_DGRAM, 0);
if (socket_ == BAD_SOCKET_RETURN_VAL) {
gpr_log(GPR_DEBUG, "Failed to create UDP ipv6 socket");
abort();
}
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port);
((char*)&addr.sin6_addr)[15] = 1;
if (bind(socket_, (const sockaddr*)&addr, sizeof(addr)) != 0) {
gpr_log(GPR_DEBUG, "Failed to bind UDP ipv6 socket to [::1]:%d", port);
abort();
}
}
~FakeNonResponsiveDNSServer() { close(socket_); }
private:
int socket_;
};
struct ArgsStruct {
gpr_atm done_atm;
gpr_mu* mu;
grpc_pollset* pollset;
grpc_pollset_set* pollset_set;
grpc_combiner* lock;
grpc_channel_args* channel_args;
};
void ArgsInit(ArgsStruct* args) {
args->pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(args->pollset, &args->mu);
args->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
args->lock = grpc_combiner_create();
gpr_atm_rel_store(&args->done_atm, 0);
args->channel_args = nullptr;
}
void DoNothing(void* arg, grpc_error* error) {}
void ArgsFinish(ArgsStruct* args) {
grpc_pollset_set_del_pollset(args->pollset_set, args->pollset);
grpc_pollset_set_destroy(args->pollset_set);
grpc_closure DoNothing_cb;
GRPC_CLOSURE_INIT(&DoNothing_cb, DoNothing, nullptr,
grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(args->pollset, &DoNothing_cb);
// exec_ctx needs to be flushed before calling grpc_pollset_destroy()
grpc_channel_args_destroy(args->channel_args);
grpc_core::ExecCtx::Get()->Flush();
grpc_pollset_destroy(args->pollset);
gpr_free(args->pollset);
GRPC_COMBINER_UNREF(args->lock, nullptr);
}
void PollPollsetUntilRequestDone(ArgsStruct* args) {
while (true) {
bool done = gpr_atm_acq_load(&args->done_atm) != 0;
if (done) {
break;
}
grpc_pollset_worker* worker = nullptr;
grpc_core::ExecCtx exec_ctx;
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(args->pollset, &worker,
grpc_timespec_to_millis_round_up(
gpr_inf_future(GPR_CLOCK_REALTIME))));
gpr_mu_unlock(args->mu);
}
}
void CheckResolverResultAssertFailureLocked(void* arg, grpc_error* error) {
EXPECT_NE(error, GRPC_ERROR_NONE);
ArgsStruct* args = static_cast<ArgsStruct*>(arg);
gpr_atm_rel_store(&args->done_atm, 1);
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
gpr_mu_unlock(args->mu);
}
TEST(CancelDuringAresQuery, TestCancelActiveDNSQuery) {
grpc_core::ExecCtx exec_ctx;
ArgsStruct args;
ArgsInit(&args);
int fake_dns_port = grpc_pick_unused_port_or_die();
FakeNonResponsiveDNSServer fake_dns_server(fake_dns_port);
char* client_target;
GPR_ASSERT(gpr_asprintf(
&client_target,
"dns://[::1]:%d/dont-care-since-wont-be-resolved.test.com:1234",
fake_dns_port));
// create resolver and resolve
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
grpc_core::ResolverRegistry::CreateResolver(client_target, nullptr,
args.pollset_set, args.lock);
gpr_free(client_target);
grpc_closure on_resolver_result_changed;
GRPC_CLOSURE_INIT(&on_resolver_result_changed,
CheckResolverResultAssertFailureLocked, (void*)&args,
grpc_combiner_scheduler(args.lock));
resolver->NextLocked(&args.channel_args, &on_resolver_result_changed);
// Without resetting and causing resolver shutdown, the
// PollPollsetUntilRequestDone call should never finish.
resolver.reset();
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone(&args);
ArgsFinish(&args);
}
TEST(CancelDuringAresQuery,
TestHitDeadlineAndDestroyChannelDuringAresResolutionIsGraceful) {
// Start up fake non responsive DNS server
int fake_dns_port = grpc_pick_unused_port_or_die();
FakeNonResponsiveDNSServer fake_dns_server(fake_dns_port);
// Create a call that will try to use the fake DNS server
char* client_target = nullptr;
GPR_ASSERT(gpr_asprintf(
&client_target,
"dns://[::1]:%d/dont-care-since-wont-be-resolved.test.com:1234",
fake_dns_port));
grpc_channel* client =
grpc_insecure_channel_create(client_target,
/* client_args */ nullptr, nullptr);
gpr_free(client_target);
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
cq_verifier* cqv = cq_verifier_create(cq);
gpr_timespec deadline = grpc_timeout_milliseconds_to_deadline(10);
grpc_call* call = grpc_channel_create_call(
client, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
GPR_ASSERT(call);
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details call_details;
grpc_call_details_init(&call_details);
grpc_status_code status;
const char* error_string;
grpc_slice details;
// Set ops for client the request
grpc_op ops_base[6];
memset(ops_base, 0, sizeof(ops_base));
grpc_op* op = ops_base;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->data.recv_status_on_client.error_string = &error_string;
op->flags = 0;
op->reserved = nullptr;
op++;
// Run the call and sanity check it failed as expected
grpc_call_error error = grpc_call_start_batch(
call, ops_base, static_cast<size_t>(op - ops_base), Tag(1), nullptr);
EXPECT_EQ(GRPC_CALL_OK, error);
CQ_EXPECT_COMPLETION(cqv, Tag(1), 1);
cq_verify(cqv);
EXPECT_EQ(status, GRPC_STATUS_DEADLINE_EXCEEDED);
// Teardown
grpc_slice_unref(details);
gpr_free((void*)error_string);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_unref(call);
cq_verifier_destroy(cqv);
EndTest(client, cq);
}
} // namespace
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
gpr_setenv("GRPC_DNS_RESOLVER", "ares");
// Sanity check the time that it takes to run the test
// including the teardown time (the teardown
// part of the test involves cancelling the DNS query,
// which is the main point of interest for this test).
gpr_timespec overall_deadline = grpc_timeout_seconds_to_deadline(4);
grpc_init();
auto result = RUN_ALL_TESTS();
grpc_shutdown();
if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) > 0) {
gpr_log(GPR_ERROR, "Test took too long");
abort();
}
return result;
}

@ -121,6 +121,25 @@ def main():
'grpc++_test_config', 'grpc++_test_config',
], ],
} for unsecure_build_config_suffix in ['_unsecure', ''] } for unsecure_build_config_suffix in ['_unsecure', '']
] + [
{
'name': 'cancel_ares_query_test',
'build': 'test',
'language': 'c++',
'gtest': True,
'run': True,
'src': ['test/cpp/naming/cancel_ares_query_test.cc'],
'platforms': ['linux', 'posix', 'mac'],
'deps': [
'grpc++_test_util',
'grpc_test_util',
'gpr_test_util',
'grpc++',
'grpc',
'gpr',
'grpc++_test_config',
],
},
] ]
} }

@ -22,10 +22,14 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include <string.h> #include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <thread>
#include <vector> #include <vector>
#include "test/cpp/util/subprocess.h" #include "test/cpp/util/subprocess.h"
@ -48,6 +52,12 @@
#include "test/core/util/port.h" #include "test/core/util/port.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
// TODO: pull in different headers when enabling this
// test on windows. Also set BAD_SOCKET_RETURN_VAL
// to INVALID_SOCKET on windows.
#include "src/core/lib/iomgr/sockaddr_posix.h"
#define BAD_SOCKET_RETURN_VAL -1
using grpc::SubProcess; using grpc::SubProcess;
using std::vector; using std::vector;
using testing::UnorderedElementsAreArray; using testing::UnorderedElementsAreArray;
@ -231,7 +241,73 @@ void CheckLBPolicyResultLocked(grpc_channel_args* channel_args,
} }
} }
void OpenAndCloseSocketsStressLoop(int dummy_port, gpr_event* done_ev) {
// The goal of this loop is to catch socket
// "use after close" bugs within the c-ares resolver by acting
// like some separate thread doing I/O.
// It's goal is to try to hit race conditions whereby:
// 1) The c-ares resolver closes a socket.
// 2) This loop opens a socket with (coincidentally) the same handle.
// 3) the c-ares resolver mistakenly uses that same socket without
// realizing that its closed.
// 4) This loop performs an operation on that socket that should
// succeed but instead fails because of what the c-ares
// resolver did in the meantime.
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(dummy_port);
((char*)&addr.sin6_addr)[15] = 1;
for (;;) {
if (gpr_event_get(done_ev)) {
return;
}
std::vector<int> sockets;
// First open a bunch of sockets, bind and listen
// '50' is an arbitrary number that, experimentally,
// has a good chance of catching bugs.
for (size_t i = 0; i < 50; i++) {
int s = socket(AF_INET6, SOCK_STREAM, 0);
int val = 1;
setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
fcntl(s, F_SETFL, O_NONBLOCK);
ASSERT_TRUE(s != BAD_SOCKET_RETURN_VAL)
<< "Failed to create TCP ipv6 socket";
gpr_log(GPR_DEBUG, "Opened fd: %d", s);
ASSERT_TRUE(bind(s, (const sockaddr*)&addr, sizeof(addr)) == 0)
<< "Failed to bind socket " + std::to_string(s) +
" to [::1]:" + std::to_string(dummy_port) +
". errno: " + std::to_string(errno);
ASSERT_TRUE(listen(s, 1) == 0) << "Failed to listen on socket " +
std::to_string(s) +
". errno: " + std::to_string(errno);
sockets.push_back(s);
}
// Do a non-blocking accept followed by a close on all of those sockets.
// Do this in a separate loop to try to induce a time window to hit races.
for (size_t i = 0; i < sockets.size(); i++) {
gpr_log(GPR_DEBUG, "non-blocking accept then close on %d", sockets[i]);
if (accept(sockets[i], nullptr, nullptr)) {
// If e.g. a "shutdown" was called on this fd from another thread,
// then this accept call should fail with an unexpected error.
ASSERT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK)
<< "OpenAndCloseSocketsStressLoop accept on socket " +
std::to_string(sockets[i]) +
" failed in "
"an unexpected way. "
"errno: " +
std::to_string(errno) +
". Socket use-after-close bugs are likely.";
}
ASSERT_TRUE(close(sockets[i]) == 0)
<< "Failed to close socket: " + std::to_string(sockets[i]) +
". errno: " + std::to_string(errno);
}
}
}
void CheckResolverResultLocked(void* argsp, grpc_error* err) { void CheckResolverResultLocked(void* argsp, grpc_error* err) {
EXPECT_EQ(err, GRPC_ERROR_NONE);
ArgsStruct* args = (ArgsStruct*)argsp; ArgsStruct* args = (ArgsStruct*)argsp;
grpc_channel_args* channel_args = args->channel_args; grpc_channel_args* channel_args = args->channel_args;
const grpc_arg* channel_arg = const grpc_arg* channel_arg =
@ -271,7 +347,17 @@ void CheckResolverResultLocked(void* argsp, grpc_error* err) {
gpr_mu_unlock(args->mu); gpr_mu_unlock(args->mu);
} }
TEST(ResolverComponentTest, TestResolvesRelevantRecords) { void CheckResolvedWithoutErrorLocked(void* argsp, grpc_error* err) {
EXPECT_EQ(err, GRPC_ERROR_NONE);
ArgsStruct* args = (ArgsStruct*)argsp;
gpr_atm_rel_store(&args->done_atm, 1);
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
gpr_mu_unlock(args->mu);
}
void RunResolvesRelevantRecordsTest(void (*OnDoneLocked)(void* arg,
grpc_error* error)) {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
ArgsStruct args; ArgsStruct args;
ArgsInit(&args); ArgsInit(&args);
@ -289,14 +375,32 @@ TEST(ResolverComponentTest, TestResolvesRelevantRecords) {
args.pollset_set, args.lock); args.pollset_set, args.lock);
gpr_free(whole_uri); gpr_free(whole_uri);
grpc_closure on_resolver_result_changed; grpc_closure on_resolver_result_changed;
GRPC_CLOSURE_INIT(&on_resolver_result_changed, CheckResolverResultLocked, GRPC_CLOSURE_INIT(&on_resolver_result_changed, OnDoneLocked, (void*)&args,
(void*)&args, grpc_combiner_scheduler(args.lock)); grpc_combiner_scheduler(args.lock));
resolver->NextLocked(&args.channel_args, &on_resolver_result_changed); resolver->NextLocked(&args.channel_args, &on_resolver_result_changed);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone(&args); PollPollsetUntilRequestDone(&args);
ArgsFinish(&args); ArgsFinish(&args);
} }
TEST(ResolverComponentTest, TestResolvesRelevantRecords) {
RunResolvesRelevantRecordsTest(CheckResolverResultLocked);
}
TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) {
// Start up background stress thread
int dummy_port = grpc_pick_unused_port_or_die();
gpr_event done_ev;
gpr_event_init(&done_ev);
std::thread socket_stress_thread(OpenAndCloseSocketsStressLoop, dummy_port,
&done_ev);
// Run the resolver test
RunResolvesRelevantRecordsTest(CheckResolvedWithoutErrorLocked);
// Shutdown and join stress thread
gpr_event_set(&done_ev, (void*)1);
socket_stress_thread.join();
}
} // namespace } // namespace
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -6541,6 +6541,26 @@
"third_party": false, "third_party": false,
"type": "target" "type": "target"
}, },
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc++",
"grpc++_test_config",
"grpc++_test_util",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c++",
"name": "cancel_ares_query_test",
"src": [
"test/cpp/naming/cancel_ares_query_test.cc"
],
"third_party": false,
"type": "target"
},
{ {
"deps": [ "deps": [
"gpr", "gpr",

@ -5638,6 +5638,28 @@
], ],
"uses_polling": true "uses_polling": true
}, },
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "cancel_ares_query_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": true
},
{ {
"args": [], "args": [],
"boringssl": true, "boringssl": true,

Loading…
Cancel
Save