Separate the posix part of the c-ares driver

pull/15780/head
Alexander Polcyn 7 years ago
parent c65066666f
commit 7eda61937e
  1. 1
      BUILD
  2. 2
      CMakeLists.txt
  3. 2
      Makefile
  4. 1
      build.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 1
      gRPC-Core.podspec
  8. 1
      grpc.gemspec
  9. 2
      grpc.gyp
  10. 1
      package.xml
  11. 311
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
  12. 36
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
  13. 302
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  14. 1
      src/python/grpcio/grpc_core_dependencies.py
  15. 1
      tools/doxygen/Doxyfile.core.internal
  16. 1
      tools/run_tests/generated/sources_and_headers.json

@ -1333,6 +1333,7 @@ grpc_cc_library(
name = "grpc_resolver_dns_ares",
srcs = [
"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc",
"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc",
"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc",
"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc",
"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc",

@ -1213,6 +1213,7 @@ add_library(grpc
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
@ -2503,6 +2504,7 @@ add_library(grpc_unsecure
src/core/ext/transport/inproc/inproc_plugin.cc
src/core/ext/transport/inproc/inproc_transport.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc

@ -3588,6 +3588,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc \
@ -4844,6 +4845,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/transport/inproc/inproc_plugin.cc \
src/core/ext/transport/inproc/inproc_transport.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc \

@ -725,6 +725,7 @@ filegroups:
- src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
src:
- src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
- src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
- src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
- src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
- src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc

@ -370,6 +370,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc \

@ -346,6 +346,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\lb_policy\\pick_first\\pick_first.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin\\round_robin.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\dns_resolver_ares.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_ev_driver.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_ev_driver_posix.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_wrapper.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_wrapper_fallback.cc " +

@ -785,6 +785,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc',

@ -725,6 +725,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc )

@ -536,6 +536,7 @@
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc',
@ -1246,6 +1247,7 @@
'src/core/ext/transport/inproc/inproc_plugin.cc',
'src/core/ext/transport/inproc/inproc_transport.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc',

@ -730,6 +730,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc" role="src" />

@ -0,0 +1,311 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
#include <ares.h>
#include <string.h>
#include <sys/ioctl.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
typedef struct fd_node {
/** the owner of this fd node */
grpc_ares_ev_driver* ev_driver;
/** a closure wrapping on_readable_locked, which should be
invoked when the grpc_fd in this node becomes readable. */
grpc_closure read_closure;
/** a closure wrapping on_writable_locked, which should be
invoked when the grpc_fd in this node becomes writable. */
grpc_closure write_closure;
/** next fd node in the list */
struct fd_node* next;
/** wrapped fd that's polled by grpc's poller for the current platform */
grpc_core::GrpcPolledFd* grpc_polled_fd;
/** if the readable closure has been registered */
bool readable_registered;
/** if the writable closure has been registered */
bool writable_registered;
/** if the fd has been shutdown yet from grpc iomgr perspective */
bool already_shutdown;
} fd_node;
struct grpc_ares_ev_driver {
/** the ares_channel owned by this event driver */
ares_channel channel;
/** pollset set for driving the IO events of the channel */
grpc_pollset_set* pollset_set;
/** refcount of the event driver */
gpr_refcount refs;
/** combiner to synchronize c-ares and I/O callbacks on */
grpc_combiner* combiner;
/** a list of grpc_fd that this event driver is currently using. */
fd_node* fds;
/** is this event driver currently working? */
bool working;
/** is this event driver being shut down */
bool shutting_down;
};
static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver);
static grpc_ares_ev_driver* grpc_ares_ev_driver_ref(
grpc_ares_ev_driver* ev_driver) {
gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
gpr_ref(&ev_driver->refs);
return ev_driver;
}
static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
if (gpr_unref(&ev_driver->refs)) {
gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
GPR_ASSERT(ev_driver->fds == nullptr);
GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
ares_destroy(ev_driver->channel);
gpr_free(ev_driver);
}
}
static void fd_node_destroy_locked(fd_node* fdn) {
gpr_log(GPR_DEBUG, "delete fd: %s", fdn->grpc_polled_fd->GetName());
GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered);
GPR_ASSERT(fdn->already_shutdown);
grpc_core::Delete(fdn->grpc_polled_fd);
gpr_free(fdn);
}
static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
if (!fdn->already_shutdown) {
fdn->already_shutdown = true;
fdn->grpc_polled_fd->ShutdownLocked(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason));
}
}
grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set,
grpc_combiner* combiner) {
*ev_driver = static_cast<grpc_ares_ev_driver*>(
gpr_malloc(sizeof(grpc_ares_ev_driver)));
ares_options opts;
memset(&opts, 0, sizeof(opts));
opts.flags |= ARES_FLAG_STAYOPEN;
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
grpc_core::ConfigureAresChannelLocked(&(*ev_driver)->channel);
gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked");
if (status != ARES_SUCCESS) {
char* err_msg;
gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s",
ares_strerror(status));
grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg);
gpr_free(err_msg);
gpr_free(*ev_driver);
return err;
}
(*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver");
gpr_ref_init(&(*ev_driver)->refs, 1);
(*ev_driver)->pollset_set = pollset_set;
(*ev_driver)->fds = nullptr;
(*ev_driver)->working = false;
(*ev_driver)->shutting_down = false;
return GRPC_ERROR_NONE;
}
void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) {
// We mark the event driver as being shut down. If the event driver
// is working, grpc_ares_notify_on_event_locked will shut down the
// fds; if it's not working, there are no fds to shut down.
ev_driver->shutting_down = true;
grpc_ares_ev_driver_unref(ev_driver);
}
void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) {
ev_driver->shutting_down = true;
fd_node* fn = ev_driver->fds;
while (fn != nullptr) {
fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
fn = fn->next;
}
}
// Search fd in the fd_node list head. This is an O(n) search, the max possible
// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) {
fd_node dummy_head;
dummy_head.next = *head;
fd_node* node = &dummy_head;
while (node->next != nullptr) {
if (node->next->grpc_polled_fd->GetWrappedAresSocketLocked() == as) {
fd_node* ret = node->next;
node->next = node->next->next;
*head = dummy_head.next;
return ret;
}
node = node->next;
}
return nullptr;
}
static void on_readable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
fdn->readable_registered = false;
gpr_log(GPR_DEBUG, "readable on %s", fdn->grpc_polled_fd->GetName());
if (error == GRPC_ERROR_NONE) {
do {
ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD);
} while (fdn->grpc_polled_fd->IsFdStillReadableLocked());
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
// by the following ares_cancel() and the on_done callbacks will be invoked
// with a status of ARES_ECANCELLED. The remaining file descriptors in this
// ev_driver will be cleaned up in the follwing
// grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel);
}
grpc_ares_notify_on_event_locked(ev_driver);
grpc_ares_ev_driver_unref(ev_driver);
}
static void on_writable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
fdn->writable_registered = false;
gpr_log(GPR_DEBUG, "writable on %s", fdn->grpc_polled_fd->GetName());
if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as);
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
// by the following ares_cancel() and the on_done callbacks will be invoked
// with a status of ARES_ECANCELLED. The remaining file descriptors in this
// ev_driver will be cleaned up in the follwing
// grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel);
}
grpc_ares_notify_on_event_locked(ev_driver);
grpc_ares_ev_driver_unref(ev_driver);
}
ares_channel* grpc_ares_ev_driver_get_channel_locked(
grpc_ares_ev_driver* ev_driver) {
return &ev_driver->channel;
}
// Get the file descriptors used by the ev_driver's ares channel, register
// driver_closure with these filedescriptors.
static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
fd_node* new_list = nullptr;
if (!ev_driver->shutting_down) {
ares_socket_t socks[ARES_GETSOCK_MAXNUM];
int socks_bitmask =
ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM);
for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]);
// Create a new fd_node if sock[i] is not in the fd_node list.
if (fdn == nullptr) {
fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
fdn->grpc_polled_fd = grpc_core::NewGrpcPolledFdLocked(
socks[i], ev_driver->pollset_set);
gpr_log(GPR_DEBUG, "new fd: %s", fdn->grpc_polled_fd->GetName());
fdn->ev_driver = ev_driver;
fdn->readable_registered = false;
fdn->writable_registered = false;
fdn->already_shutdown = false;
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
grpc_combiner_scheduler(ev_driver->combiner));
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
grpc_combiner_scheduler(ev_driver->combiner));
}
fdn->next = new_list;
new_list = fdn;
// Register read_closure if the socket is readable and read_closure has
// not been registered with this socket.
if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
!fdn->readable_registered) {
grpc_ares_ev_driver_ref(ev_driver);
gpr_log(GPR_DEBUG, "notify read on: %s",
fdn->grpc_polled_fd->GetName());
fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure);
fdn->readable_registered = true;
}
// Register write_closure if the socket is writable and write_closure
// has not been registered with this socket.
if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
!fdn->writable_registered) {
gpr_log(GPR_DEBUG, "notify write on: %s",
fdn->grpc_polled_fd->GetName());
grpc_ares_ev_driver_ref(ev_driver);
fdn->grpc_polled_fd->RegisterForOnWriteableLocked(
&fdn->write_closure);
fdn->writable_registered = true;
}
}
}
}
// Any remaining fds in ev_driver->fds were not returned by ares_getsock() and
// are therefore no longer in use, so they can be shut down and removed from
// the list.
while (ev_driver->fds != nullptr) {
fd_node* cur = ev_driver->fds;
ev_driver->fds = ev_driver->fds->next;
fd_node_shutdown_locked(cur, "c-ares fd shutdown");
if (!cur->readable_registered && !cur->writable_registered) {
fd_node_destroy_locked(cur);
} else {
cur->next = new_list;
new_list = cur;
}
}
ev_driver->fds = new_list;
// If the ev driver has no working fd, all the tasks are done.
if (new_list == nullptr) {
ev_driver->working = false;
gpr_log(GPR_DEBUG, "ev driver stop working");
}
}
void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
if (!ev_driver->working) {
ev_driver->working = true;
grpc_ares_notify_on_event_locked(ev_driver);
}
}
#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */

@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>
#include <ares.h>
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/iomgr/pollset_set.h"
typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
@ -51,5 +52,40 @@ void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver);
/* Shutdown all the grpc_fds used by \a ev_driver */
void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver);
namespace grpc_core {
/* A wrapped fd that integrates with the grpc iomgr of the current platform.
* A GrpcPolledFd knows how to create grpc platform-specific iomgr endpoints
* from "ares_socket_t" sockets, and then sign up for readability/writeability
* with that poller, and do shutdown and destruction. */
class GrpcPolledFd {
public:
virtual ~GrpcPolledFd() {}
/* Called when c-ares library is interested and there's no pending callback */
virtual void RegisterForOnReadableLocked(grpc_closure* read_closure)
GRPC_ABSTRACT;
/* Called when c-ares library is interested and there's no pending callback */
virtual void RegisterForOnWriteableLocked(grpc_closure* write_closure)
GRPC_ABSTRACT;
/* Indicates if there is data left even after just being read from */
virtual bool IsFdStillReadableLocked() GRPC_ABSTRACT;
/* Called once and only once. Must cause cancellation of any pending
* read/write callbacks. */
virtual void ShutdownLocked(grpc_error* error) GRPC_ABSTRACT;
/* Get the underlying ares_socket_t that this was created from */
virtual ares_socket_t GetWrappedAresSocketLocked() GRPC_ABSTRACT;
/* A unique name, for logging */
virtual const char* GetName() GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
/* Creates a new wrapped fd for the current platform */
GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
grpc_pollset_set* driver_pollset_set);
void ConfigureAresChannelLocked(ares_channel* channel);
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \
*/

@ -36,286 +36,60 @@
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
typedef struct fd_node {
/** the owner of this fd node */
grpc_ares_ev_driver* ev_driver;
/** a closure wrapping on_readable_locked, which should be
invoked when the grpc_fd in this node becomes readable. */
grpc_closure read_closure;
/** a closure wrapping on_writable_locked, which should be
invoked when the grpc_fd in this node becomes writable. */
grpc_closure write_closure;
/** next fd node in the list */
struct fd_node* next;
/** the grpc_fd owned by this fd node */
grpc_fd* fd;
/** if the readable closure has been registered */
bool readable_registered;
/** if the writable closure has been registered */
bool writable_registered;
/** if the fd has been shutdown yet from grpc iomgr perspective */
bool already_shutdown;
} fd_node;
struct grpc_ares_ev_driver {
/** the ares_channel owned by this event driver */
ares_channel channel;
/** pollset set for driving the IO events of the channel */
grpc_pollset_set* pollset_set;
/** refcount of the event driver */
gpr_refcount refs;
/** combiner to synchronize c-ares and I/O callbacks on */
grpc_combiner* combiner;
/** a list of grpc_fd that this event driver is currently using. */
fd_node* fds;
/** is this event driver currently working? */
bool working;
/** is this event driver being shut down */
bool shutting_down;
};
static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver);
static grpc_ares_ev_driver* grpc_ares_ev_driver_ref(
grpc_ares_ev_driver* ev_driver) {
gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
gpr_ref(&ev_driver->refs);
return ev_driver;
}
static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
if (gpr_unref(&ev_driver->refs)) {
gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
GPR_ASSERT(ev_driver->fds == nullptr);
GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
ares_destroy(ev_driver->channel);
gpr_free(ev_driver);
namespace grpc_core {
class GrpcPolledFdPosix : public GrpcPolledFd {
public:
GrpcPolledFdPosix(ares_socket_t as, grpc_pollset_set* driver_pollset_set)
: as_(as) {
gpr_asprintf(&name_, "c-ares fd: %d", (int)as);
fd_ = grpc_fd_create((int)as, name_, false);
grpc_pollset_set_add_fd(driver_pollset_set, fd_);
}
}
static void fd_node_destroy_locked(fd_node* fdn) {
gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd));
GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered);
GPR_ASSERT(fdn->already_shutdown);
/* c-ares library will close the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */
int dummy_release_fd;
grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, "c-ares query finished");
gpr_free(fdn);
}
static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
if (!fdn->already_shutdown) {
fdn->already_shutdown = true;
grpc_fd_shutdown(fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason));
~GrpcPolledFdPosix() {
gpr_free(name_);
/* c-ares library will close the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */
int dummy_release_fd;
grpc_fd_orphan(fd_, nullptr, &dummy_release_fd, "c-ares query finished");
}
}
grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set,
grpc_combiner* combiner) {
*ev_driver = static_cast<grpc_ares_ev_driver*>(
gpr_malloc(sizeof(grpc_ares_ev_driver)));
ares_options opts;
memset(&opts, 0, sizeof(opts));
opts.flags |= ARES_FLAG_STAYOPEN;
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked");
if (status != ARES_SUCCESS) {
char* err_msg;
gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s",
ares_strerror(status));
grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg);
gpr_free(err_msg);
gpr_free(*ev_driver);
return err;
void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
grpc_fd_notify_on_read(fd_, read_closure);
}
(*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver");
gpr_ref_init(&(*ev_driver)->refs, 1);
(*ev_driver)->pollset_set = pollset_set;
(*ev_driver)->fds = nullptr;
(*ev_driver)->working = false;
(*ev_driver)->shutting_down = false;
return GRPC_ERROR_NONE;
}
void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) {
// We mark the event driver as being shut down. If the event driver
// is working, grpc_ares_notify_on_event_locked will shut down the
// fds; if it's not working, there are no fds to shut down.
ev_driver->shutting_down = true;
grpc_ares_ev_driver_unref(ev_driver);
}
void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
grpc_fd_notify_on_write(fd_, write_closure);
}
void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) {
ev_driver->shutting_down = true;
fd_node* fn = ev_driver->fds;
while (fn != nullptr) {
fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
fn = fn->next;
bool IsFdStillReadableLocked() override {
size_t bytes_available = 0;
return ioctl(grpc_fd_wrapped_fd(fd_), FIONREAD, &bytes_available) == 0 &&
bytes_available > 0;
}
}
// Search fd in the fd_node list head. This is an O(n) search, the max possible
// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
static fd_node* pop_fd_node_locked(fd_node** head, int fd) {
fd_node dummy_head;
dummy_head.next = *head;
fd_node* node = &dummy_head;
while (node->next != nullptr) {
if (grpc_fd_wrapped_fd(node->next->fd) == fd) {
fd_node* ret = node->next;
node->next = node->next->next;
*head = dummy_head.next;
return ret;
}
node = node->next;
void ShutdownLocked(grpc_error* error) override {
grpc_fd_shutdown(fd_, error);
}
return nullptr;
}
/* Check if \a fd is still readable */
static bool grpc_ares_is_fd_still_readable_locked(
grpc_ares_ev_driver* ev_driver, int fd) {
size_t bytes_available = 0;
return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0;
}
ares_socket_t GetWrappedAresSocketLocked() override { return as_; }
static void on_readable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->readable_registered = false;
gpr_log(GPR_DEBUG, "readable on %d", fd);
if (error == GRPC_ERROR_NONE) {
do {
ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD);
} while (grpc_ares_is_fd_still_readable_locked(ev_driver, fd));
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
// by the following ares_cancel() and the on_done callbacks will be invoked
// with a status of ARES_ECANCELLED. The remaining file descriptors in this
// ev_driver will be cleaned up in the follwing
// grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel);
}
grpc_ares_notify_on_event_locked(ev_driver);
grpc_ares_ev_driver_unref(ev_driver);
}
const char* GetName() override { return name_; }
static void on_writable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->writable_registered = false;
gpr_log(GPR_DEBUG, "writable on %d", fd);
if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd);
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
// by the following ares_cancel() and the on_done callbacks will be invoked
// with a status of ARES_ECANCELLED. The remaining file descriptors in this
// ev_driver will be cleaned up in the follwing
// grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel);
}
grpc_ares_notify_on_event_locked(ev_driver);
grpc_ares_ev_driver_unref(ev_driver);
}
char* name_;
ares_socket_t as_;
grpc_fd* fd_;
};
ares_channel* grpc_ares_ev_driver_get_channel_locked(
grpc_ares_ev_driver* ev_driver) {
return &ev_driver->channel;
GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
grpc_pollset_set* driver_pollset_set) {
return grpc_core::New<GrpcPolledFdPosix>(as, driver_pollset_set);
}
// Get the file descriptors used by the ev_driver's ares channel, register
// driver_closure with these filedescriptors.
static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
fd_node* new_list = nullptr;
if (!ev_driver->shutting_down) {
ares_socket_t socks[ARES_GETSOCK_MAXNUM];
int socks_bitmask =
ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM);
for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]);
// Create a new fd_node if sock[i] is not in the fd_node list.
if (fdn == nullptr) {
char* fd_name;
gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i);
fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
gpr_log(GPR_DEBUG, "new fd: %d", socks[i]);
fdn->fd = grpc_fd_create(socks[i], fd_name, false);
fdn->ev_driver = ev_driver;
fdn->readable_registered = false;
fdn->writable_registered = false;
fdn->already_shutdown = false;
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
grpc_combiner_scheduler(ev_driver->combiner));
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
grpc_combiner_scheduler(ev_driver->combiner));
grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd);
gpr_free(fd_name);
}
fdn->next = new_list;
new_list = fdn;
// Register read_closure if the socket is readable and read_closure has
// not been registered with this socket.
if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
!fdn->readable_registered) {
grpc_ares_ev_driver_ref(ev_driver);
gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->fd));
grpc_fd_notify_on_read(fdn->fd, &fdn->read_closure);
fdn->readable_registered = true;
}
// Register write_closure if the socket is writable and write_closure
// has not been registered with this socket.
if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
!fdn->writable_registered) {
gpr_log(GPR_DEBUG, "notify write on: %d",
grpc_fd_wrapped_fd(fdn->fd));
grpc_ares_ev_driver_ref(ev_driver);
grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure);
fdn->writable_registered = true;
}
}
}
}
// Any remaining fds in ev_driver->fds were not returned by ares_getsock() and
// are therefore no longer in use, so they can be shut down and removed from
// the list.
while (ev_driver->fds != nullptr) {
fd_node* cur = ev_driver->fds;
ev_driver->fds = ev_driver->fds->next;
fd_node_shutdown_locked(cur, "c-ares fd shutdown");
if (!cur->readable_registered && !cur->writable_registered) {
fd_node_destroy_locked(cur);
} else {
cur->next = new_list;
new_list = cur;
}
}
ev_driver->fds = new_list;
// If the ev driver has no working fd, all the tasks are done.
if (new_list == nullptr) {
ev_driver->working = false;
gpr_log(GPR_DEBUG, "ev driver stop working");
}
}
void ConfigureAresChannelLocked(ares_channel* channel) {}
void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
if (!ev_driver->working) {
ev_driver->working = true;
grpc_ares_notify_on_event_locked(ev_driver);
}
}
} // namespace grpc_core
#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */

@ -345,6 +345,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc',

@ -913,6 +913,7 @@ src/core/ext/filters/client_channel/resolver.cc \
src/core/ext/filters/client_channel/resolver.h \
src/core/ext/filters/client_channel/resolver/README.md \
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc \

@ -10136,6 +10136,7 @@
"name": "grpc_resolver_dns_ares",
"src": [
"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc",
"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc",
"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h",
"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc",
"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc",

Loading…
Cancel
Save