grpclb address support in the c-ares resolver

reviewable/pr11237/r1
Yuchen Zeng 8 years ago
parent b1f0af8951
commit f3bdbb7ce8
  1. 30
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
  2. 318
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
  3. 14
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h

@ -95,7 +95,7 @@ typedef struct {
gpr_backoff backoff_state;
/** currently resolving addresses */
grpc_resolved_addresses *addresses;
grpc_lb_addresses *lb_addresses;
} ares_dns_resolver;
static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
@ -158,19 +158,10 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_channel_args *result = NULL;
GPR_ASSERT(r->resolving);
r->resolving = false;
if (r->addresses != NULL) {
grpc_lb_addresses *addresses = grpc_lb_addresses_create(
r->addresses->naddrs, NULL /* user_data_vtable */);
for (size_t i = 0; i < r->addresses->naddrs; ++i) {
grpc_lb_addresses_set_address(
addresses, i, &r->addresses->addrs[i].addr,
r->addresses->addrs[i].len, false /* is_balancer */,
NULL /* balancer_name */, NULL /* user_data */);
}
grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(addresses);
if (r->lb_addresses != NULL) {
grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(r->lb_addresses);
result = grpc_channel_args_copy_and_add(r->channel_args, &new_arg, 1);
grpc_resolved_addresses_destroy(r->addresses);
grpc_lb_addresses_destroy(exec_ctx, addresses);
grpc_lb_addresses_destroy(exec_ctx, r->lb_addresses);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
@ -220,10 +211,10 @@ static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx,
GRPC_RESOLVER_REF(&r->base, "dns-resolving");
GPR_ASSERT(!r->resolving);
r->resolving = true;
r->addresses = NULL;
grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port,
r->interested_parties, &r->dns_ares_on_resolved_locked,
&r->addresses);
r->lb_addresses = NULL;
grpc_resolve_grpclb_address_ares(
exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties,
&r->dns_ares_on_resolved_locked, &r->lb_addresses);
}
static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
@ -233,6 +224,7 @@ static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
*r->target_result = r->resolved_result == NULL
? NULL
: grpc_channel_args_copy(r->resolved_result);
gpr_log(GPR_DEBUG, "dns_ares_maybe_finish_next_locked");
grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
r->published_version = r->resolved_version;
@ -255,14 +247,14 @@ static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx,
grpc_resolver_args *args,
const char *default_port) {
// Get name from args.
/* Get name from args. */
const char *path = args->uri->path;
if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based dns uri's not supported");
return NULL;
}
if (path[0] == '/') ++path;
// Create resolver.
/* Create resolver. */
ares_dns_resolver *r = gpr_zalloc(sizeof(ares_dns_resolver));
grpc_resolver_init(&r->base, &dns_ares_resolver_vtable, args->combiner);
r->name_to_resolve = gpr_strdup(path);

@ -48,6 +48,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include <nameser.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
@ -59,16 +60,16 @@ static gpr_mu g_init_mu;
typedef struct grpc_ares_request {
/** following members are set in grpc_resolve_address_ares_impl */
/** host to resolve, parsed from the name to resolve */
char *host;
/** port to fill in sockaddr_in, parsed from the name to resolve */
char *port;
/** default port to use */
char *default_port;
/** closure to call when the request completes */
grpc_closure *on_done;
/** the pointer to receive the resolved addresses */
grpc_resolved_addresses **addrs_out;
union {
grpc_resolved_addresses **addrs;
grpc_lb_addresses **lb_addrs;
} addrs_out;
/** if true, the output addresses are in the format of grpc_lb_addresses,
otherwise they are in the format of grpc_resolved_addresses */
bool lb_addrs_out;
/** the evernt driver used by this request */
grpc_ares_ev_driver *ev_driver;
/** number of ongoing queries */
@ -82,6 +83,18 @@ typedef struct grpc_ares_request {
grpc_error *error;
} grpc_ares_request;
typedef struct grpc_ares_hostbyname_request {
/** following members are set in create_hostbyname_request */
/** the top-level request instance */
grpc_ares_request *parent_request;
/** host to resolve, parsed from the name to resolve */
char *host;
/** port to fill in sockaddr_in, parsed from the name to resolve */
uint16_t port;
/** is it a grpclb address */
bool is_balancer;
} grpc_ares_hostbyname_request;
static void do_basic_init(void) { gpr_mu_init(&g_init_mu); }
static uint16_t strhtons(const char *port) {
@ -93,6 +106,10 @@ static uint16_t strhtons(const char *port) {
return htons((unsigned short)atoi(port));
}
static void grpc_ares_request_ref(grpc_ares_request *r) {
gpr_ref(&r->pending_queries);
}
static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx,
grpc_ares_request *r) {
/* If there are no pending queries, invoke on_done callback and destroy the
@ -105,77 +122,200 @@ static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx,
the newly created exec_ctx, since the caller has been warned not to
acquire locks in on_done. ares_dns_resolver is using combiner to
protect resources needed by on_done. */
gpr_log(GPR_DEBUG, "grpc_ares_request_unref NULl");
grpc_exec_ctx new_exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_closure_sched(&new_exec_ctx, r->on_done, r->error);
grpc_exec_ctx_finish(&new_exec_ctx);
} else {
gpr_log(GPR_DEBUG, "grpc_ares_request_unref exec_ctx");
grpc_closure_sched(exec_ctx, r->on_done, r->error);
}
gpr_mu_destroy(&r->mu);
grpc_ares_ev_driver_destroy(r->ev_driver);
gpr_free(r->host);
gpr_free(r->port);
gpr_free(r->default_port);
gpr_free(r);
}
}
static void on_done_cb(void *arg, int status, int timeouts,
struct hostent *hostent) {
grpc_ares_request *r = (grpc_ares_request *)arg;
static grpc_ares_hostbyname_request *create_hostbyname_request(
grpc_ares_request *parent_request, char *host, uint16_t port,
bool is_balancer) {
grpc_ares_hostbyname_request *hr =
gpr_zalloc(sizeof(grpc_ares_hostbyname_request));
hr->parent_request = parent_request;
hr->host = gpr_strdup(host);
hr->port = port;
hr->is_balancer = is_balancer;
grpc_ares_request_ref(parent_request);
return hr;
}
static void destroy_hostbyname_request(grpc_exec_ctx *exec_ctx,
grpc_ares_hostbyname_request *hr) {
grpc_ares_request_unref(exec_ctx, hr->parent_request);
gpr_free(hr->host);
gpr_free(hr);
}
static void on_hostbyname_done_cb(void *arg, int status, int timeouts,
struct hostent *hostent) {
grpc_ares_hostbyname_request *hr = (grpc_ares_hostbyname_request *)arg;
grpc_ares_request *r = hr->parent_request;
gpr_mu_lock(&r->mu);
if (status == ARES_SUCCESS) {
GRPC_ERROR_UNREF(r->error);
r->error = GRPC_ERROR_NONE;
r->success = true;
grpc_resolved_addresses **addresses = r->addrs_out;
if (*addresses == NULL) {
*addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
(*addresses)->naddrs = 0;
(*addresses)->addrs = NULL;
if (r->lb_addrs_out) {
grpc_lb_addresses **lb_addresses = r->addrs_out.lb_addrs;
if (*lb_addresses == NULL) {
*lb_addresses = grpc_lb_addresses_create(0, NULL);
}
size_t prev_naddr = (*lb_addresses)->num_addresses;
size_t i;
for (i = 0; hostent->h_addr_list[i] != NULL; i++) {
}
(*lb_addresses)->num_addresses += i;
(*lb_addresses)->addresses =
gpr_realloc((*lb_addresses)->addresses,
sizeof(grpc_lb_address) * (*lb_addresses)->num_addresses);
for (i = prev_naddr; i < (*lb_addresses)->num_addresses; i++) {
memset(&(*lb_addresses)->addresses[i], 0, sizeof(grpc_lb_address));
if (hostent->h_addrtype == AF_INET6) {
size_t addr_len = sizeof(struct sockaddr_in6);
struct sockaddr_in6 addr;
memcpy(&addr.sin6_addr, hostent->h_addr_list[i - prev_naddr],
sizeof(struct in6_addr));
addr.sin6_family = (sa_family_t)hostent->h_addrtype;
addr.sin6_port = hr->port;
grpc_lb_addresses_set_address(
*lb_addresses, i, &addr, addr_len,
hr->is_balancer /* is_balancer */,
hr->is_balancer ? strdup(hr->host) : NULL /* balancer_name */,
NULL /* user_data */);
char output[INET6_ADDRSTRLEN];
ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN);
gpr_log(GPR_DEBUG,
"c-ares resolver gets a AF_INET6 result: \n"
" addr: %s\n port: %d\n sin6_scope_id: %d\n",
output, ntohs(hr->port), addr.sin6_scope_id);
} else { /* hostent->h_addrtype == AF_INET6 */
size_t addr_len = sizeof(struct sockaddr_in);
struct sockaddr_in addr;
memcpy(&addr.sin_addr, hostent->h_addr_list[i - prev_naddr],
sizeof(struct in_addr));
addr.sin_family = (sa_family_t)hostent->h_addrtype;
addr.sin_port = hr->port;
grpc_lb_addresses_set_address(
*lb_addresses, i, &addr, addr_len,
hr->is_balancer /* is_balancer */,
hr->is_balancer ? strdup(hr->host) : NULL /* balancer_name */,
NULL /* user_data */);
char output[INET_ADDRSTRLEN];
ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN);
gpr_log(GPR_DEBUG,
"c-ares resolver gets a AF_INET result: \n"
" addr: %s\n port: %d\n",
output, ntohs(hr->port));
}
}
} else { /* r->lb_addrs_out */
grpc_resolved_addresses **addresses = r->addrs_out.addrs;
if (*addresses == NULL) {
*addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
(*addresses)->naddrs = 0;
(*addresses)->addrs = NULL;
}
size_t prev_naddr = (*addresses)->naddrs;
size_t i;
for (i = 0; hostent->h_addr_list[i] != NULL; i++) {
}
(*addresses)->naddrs += i;
(*addresses)->addrs =
gpr_realloc((*addresses)->addrs,
sizeof(grpc_resolved_address) * (*addresses)->naddrs);
for (i = prev_naddr; i < (*addresses)->naddrs; i++) {
memset(&(*addresses)->addrs[i], 0, sizeof(grpc_resolved_address));
if (hostent->h_addrtype == AF_INET6) {
(*addresses)->addrs[i].len = sizeof(struct sockaddr_in6);
struct sockaddr_in6 *addr =
(struct sockaddr_in6 *)&(*addresses)->addrs[i].addr;
addr->sin6_family = (sa_family_t)hostent->h_addrtype;
addr->sin6_port = hr->port;
char output[INET6_ADDRSTRLEN];
memcpy(&addr->sin6_addr, hostent->h_addr_list[i - prev_naddr],
sizeof(struct in6_addr));
ares_inet_ntop(AF_INET6, &addr->sin6_addr, output, INET6_ADDRSTRLEN);
gpr_log(GPR_DEBUG,
"c-ares resolver gets a AF_INET6 result: \n"
" addr: %s\n port: %d\n sin6_scope_id: %d\n",
output, ntohs(hr->port), addr->sin6_scope_id);
} else { /* hostent->h_addrtype == AF_INET6 */
(*addresses)->addrs[i].len = sizeof(struct sockaddr_in);
struct sockaddr_in *addr =
(struct sockaddr_in *)&(*addresses)->addrs[i].addr;
memcpy(&addr->sin_addr, hostent->h_addr_list[i - prev_naddr],
sizeof(struct in_addr));
addr->sin_family = (sa_family_t)hostent->h_addrtype;
addr->sin_port = hr->port;
char output[INET_ADDRSTRLEN];
ares_inet_ntop(AF_INET, &addr->sin_addr, output, INET_ADDRSTRLEN);
gpr_log(GPR_DEBUG,
"c-ares resolver gets a AF_INET result: \n"
" addr: %s\n port: %d\n",
output, ntohs(hr->port));
}
}
}
size_t prev_naddr = (*addresses)->naddrs;
size_t i;
for (i = 0; hostent->h_addr_list[i] != NULL; i++) {
} else if (!r->success) {
char *error_msg;
gpr_asprintf(&error_msg, "C-ares status is not ARES_SUCCESS: %s",
ares_strerror(status));
grpc_error *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
gpr_free(error_msg);
if (r->error == GRPC_ERROR_NONE) {
r->error = error;
} else {
r->error = grpc_error_add_child(error, r->error);
}
(*addresses)->naddrs += i;
(*addresses)->addrs =
gpr_realloc((*addresses)->addrs,
sizeof(grpc_resolved_address) * (*addresses)->naddrs);
for (i = prev_naddr; i < (*addresses)->naddrs; i++) {
memset(&(*addresses)->addrs[i], 0, sizeof(grpc_resolved_address));
if (hostent->h_addrtype == AF_INET6) {
(*addresses)->addrs[i].len = sizeof(struct sockaddr_in6);
struct sockaddr_in6 *addr =
(struct sockaddr_in6 *)&(*addresses)->addrs[i].addr;
addr->sin6_family = (sa_family_t)hostent->h_addrtype;
addr->sin6_port = strhtons(r->port);
char output[INET6_ADDRSTRLEN];
memcpy(&addr->sin6_addr, hostent->h_addr_list[i - prev_naddr],
sizeof(struct in6_addr));
ares_inet_ntop(AF_INET6, &addr->sin6_addr, output, INET6_ADDRSTRLEN);
gpr_log(GPR_DEBUG,
"c-ares resolver gets a AF_INET6 result: \n"
" addr: %s\n port: %s\n sin6_scope_id: %d\n",
output, r->port, addr->sin6_scope_id);
} else {
(*addresses)->addrs[i].len = sizeof(struct sockaddr_in);
struct sockaddr_in *addr =
(struct sockaddr_in *)&(*addresses)->addrs[i].addr;
memcpy(&addr->sin_addr, hostent->h_addr_list[i - prev_naddr],
sizeof(struct in_addr));
addr->sin_family = (sa_family_t)hostent->h_addrtype;
addr->sin_port = strhtons(r->port);
char output[INET_ADDRSTRLEN];
ares_inet_ntop(AF_INET, &addr->sin_addr, output, INET_ADDRSTRLEN);
gpr_log(GPR_DEBUG,
"c-ares resolver gets a AF_INET result: \n"
" addr: %s\n port: %s\n",
output, r->port);
}
gpr_mu_unlock(&r->mu);
destroy_hostbyname_request(NULL, hr);
}
static void on_srv_query_done_cb(void *arg, int status, int timeouts,
unsigned char *abuf, int alen) {
grpc_ares_request *r = (grpc_ares_request *)arg;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_DEBUG, "on_query_srv_done_cb");
if (status == ARES_SUCCESS) {
gpr_log(GPR_DEBUG, "on_query_srv_done_cb ARES_SUCCESS");
struct ares_srv_reply *reply;
const int parse_status = ares_parse_srv_reply(abuf, alen, &reply);
if (parse_status == ARES_SUCCESS) {
ares_channel *channel = grpc_ares_ev_driver_get_channel(r->ev_driver);
for (struct ares_srv_reply *srv_it = reply; srv_it != NULL;
srv_it = srv_it->next) {
if (grpc_ipv6_loopback_available()) {
grpc_ares_hostbyname_request *hr = create_hostbyname_request(
r, srv_it->host, srv_it->port, true /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET6,
on_hostbyname_done_cb, hr);
}
grpc_ares_hostbyname_request *hr = create_hostbyname_request(
r, srv_it->host, srv_it->port, true /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb,
hr);
grpc_ares_ev_driver_start(&exec_ctx, r->ev_driver);
}
}
if (reply != NULL) {
ares_free_data(reply);
}
} else if (!r->success) {
char *error_msg;
gpr_asprintf(&error_msg, "C-ares status is not ARES_SUCCESS: %s",
@ -188,15 +328,15 @@ static void on_done_cb(void *arg, int status, int timeouts,
r->error = grpc_error_add_child(error, r->error);
}
}
gpr_mu_unlock(&r->mu);
grpc_ares_request_unref(NULL, r);
grpc_ares_request_unref(&exec_ctx, r);
grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port,
grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
static void resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port,
grpc_pollset_set *interested_parties,
grpc_closure *on_done, void **addrs,
bool is_lb_addrs_out) {
/* TODO(zyc): Enable tracing after #9603 is checked in */
/* if (grpc_dns_trace) {
gpr_log(GPR_DEBUG, "resolve_address (blocking): name=%s, default_port=%s",
@ -235,19 +375,33 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
gpr_mu_init(&r->mu);
r->ev_driver = ev_driver;
r->on_done = on_done;
r->addrs_out = addrs;
r->default_port = gpr_strdup(default_port);
r->port = port;
r->host = host;
r->lb_addrs_out = is_lb_addrs_out;
if (is_lb_addrs_out) {
r->addrs_out.lb_addrs = (grpc_lb_addresses **)addrs;
} else {
r->addrs_out.addrs = (grpc_resolved_addresses **)addrs;
}
r->success = false;
r->error = GRPC_ERROR_NONE;
ares_channel *channel = grpc_ares_ev_driver_get_channel(r->ev_driver);
gpr_ref_init(&r->pending_queries, 2);
gpr_ref_init(&r->pending_queries, 1);
if (grpc_ipv6_loopback_available()) {
gpr_ref(&r->pending_queries);
ares_gethostbyname(*channel, r->host, AF_INET6, on_done_cb, r);
grpc_ares_hostbyname_request *hr = create_hostbyname_request(
r, host, strhtons(port), false /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_cb, hr);
}
grpc_ares_hostbyname_request *hr = create_hostbyname_request(
r, host, strhtons(port), false /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, hr);
if (is_lb_addrs_out) {
/* Query the SRV record */
grpc_ares_request_ref(r);
char *service_name;
gpr_asprintf(&service_name, "_grpclb._tcp.%s", host);
ares_query(*channel, service_name, ns_c_in, ns_t_srv, on_srv_query_done_cb,
r);
gpr_free(service_name);
}
ares_gethostbyname(*channel, r->host, AF_INET, on_done_cb, r);
/* TODO(zyc): Handle CNAME records here. */
grpc_ares_ev_driver_start(exec_ctx, r->ev_driver);
grpc_ares_request_unref(exec_ctx, r);
@ -258,6 +412,26 @@ error_cleanup:
gpr_free(port);
}
void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port,
grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
resolve_address_ares_impl(exec_ctx, name, default_port, interested_parties,
on_done, (void **)addrs,
false /* is_lb_addrs_out */);
}
void grpc_resolve_grpclb_address_ares(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port,
grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_lb_addresses **addrs) {
resolve_address_ares_impl(exec_ctx, name, default_port, interested_parties,
on_done, (void **)addrs,
true /* is_lb_addrs_out */);
}
void (*grpc_resolve_address_ares)(
grpc_exec_ctx *exec_ctx, const char *name, const char *default_port,
grpc_pollset_set *interested_parties, grpc_closure *on_done,

@ -34,6 +34,7 @@
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
@ -51,6 +52,19 @@ extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx,
grpc_closure *on_done,
grpc_resolved_addresses **addresses);
/* Asynchronously resolve addr. It will try to resolve grpclb SRV records in
addition to the normal address records. For normal address records, it uses
\a default_port if a port isn't designated in \a addr, otherwise it uses the
port in \a addr. grpc_ares_init() must be called at least once before this
function. \a on_done may be called directly in this function without being
scheduled with \a exec_ctx, it must not try to acquire locks that are being
held by the caller. */
void grpc_resolve_grpclb_address_ares(grpc_exec_ctx *exec_ctx, const char *addr,
const char *default_port,
grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_lb_addresses **addresses);
/* Initialize gRPC ares wrapper. Must be called at least once before
grpc_resolve_address_ares(). */
grpc_error *grpc_ares_init(void);

Loading…
Cancel
Save