sockaddr_resolver now supports comma-separated list of IPs

pull/3011/head
David Garcia Quintas 9 years ago
parent 43a53ffa0a
commit 2bfd275b2b
  1. 59
      src/core/client_config/resolvers/sockaddr_resolver.c
  2. 36
      test/core/end2end/dualstack_socket_test.c

@ -60,9 +60,12 @@ typedef struct {
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels);
/** the address that we've 'resolved' */
struct sockaddr_storage addr;
int addr_len;
/** the addresses that we've 'resolved' */
struct sockaddr_storage *addrs;
/** the corresponding length of the addresses */
int *addrs_len;
/** how many elements in \a addrs */
size_t num_addrs;
/** mutex guarding the rest of the state */
gpr_mu mu;
@ -119,17 +122,22 @@ static void sockaddr_next(grpc_resolver *resolver,
static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
grpc_client_config *cfg;
grpc_lb_policy *lb_policy;
grpc_subchannel *subchannel;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
if (r->next_completion != NULL && !r->published) {
size_t i;
cfg = grpc_client_config_create();
memset(&args, 0, sizeof(args));
args.addr = (struct sockaddr *)&r->addr;
args.addr_len = r->addr_len;
subchannel =
grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args);
lb_policy = r->lb_policy_factory(&subchannel, 1);
subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs);
for (i = 0; i < r->num_addrs; i++) {
memset(&args, 0, sizeof(args));
args.addr = (struct sockaddr *)&r->addrs[i];
args.addr_len = r->addrs_len[i];
subchannels[i] = grpc_subchannel_factory_create_subchannel(
r->subchannel_factory, &args);
}
lb_policy = r->lb_policy_factory(subchannels, r->num_addrs);
gpr_free(subchannels);
grpc_client_config_set_lb_policy(cfg, lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "unix");
r->published = 1;
@ -143,6 +151,8 @@ static void sockaddr_destroy(grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(r->subchannel_factory);
gpr_free(r->addrs);
gpr_free(r->addrs_len);
gpr_free(r);
}
@ -238,13 +248,18 @@ done:
return result;
}
static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
grpc_uri *uri,
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels),
grpc_subchannel_factory *subchannel_factory,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) {
size_t i;
int errors_found = 0; /* GPR_FALSE */
sockaddr_resolver *r;
gpr_slice path_slice;
gpr_slice_buffer path_parts;
if (0 != strcmp(uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported");
@ -253,7 +268,29 @@ static grpc_resolver *sockaddr_create(
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
if (!parse(uri, &r->addr, &r->addr_len)) {
path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing);
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
r->num_addrs = path_parts.count;
r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs);
r->addrs_len = gpr_malloc(sizeof(int) * r->num_addrs);
for(i = 0; i < r->num_addrs; i++) {
grpc_uri ith_uri = *uri;
char* part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
ith_uri.path = part_str;
if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) {
errors_found = 1; /* GPR_TRUE */
}
gpr_free(part_str);
if (errors_found) break;
}
gpr_slice_buffer_destroy(&path_parts);
gpr_slice_unref(path_slice);
if (errors_found) {
gpr_free(r);
return NULL;
}

@ -32,12 +32,16 @@
*/
#include <string.h>
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/support/string.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
@ -57,6 +61,7 @@ static void drain_cq(grpc_completion_queue *cq) {
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void do_nothing(void *ignored) {}
void test_connect(const char *server_host, const char *client_host, int port,
int expect_ok) {
char *client_hostport;
@ -109,8 +114,30 @@ void test_connect(const char *server_host, const char *client_host, int port,
/* Create client. */
if (client_host[0] == 'i') {
/* for ipv4:/ipv6: addresses, just concatenate the port */
gpr_asprintf(&client_hostport, "%s:%d", client_host, port);
/* for ipv4:/ipv6: addresses, concatenate the port to each of the parts */
size_t i;
gpr_slice uri_slice;
gpr_slice_buffer uri_parts;
char **hosts_with_port;
uri_slice =
gpr_slice_new((char *)client_host, strlen(client_host), do_nothing);
gpr_slice_buffer_init(&uri_parts);
gpr_slice_split(uri_slice, ",", &uri_parts);
hosts_with_port = gpr_malloc(sizeof(char*) * uri_parts.count);
for (i = 0; i < uri_parts.count; i++) {
char *uri_part_str = gpr_dump_slice(uri_parts.slices[i], GPR_DUMP_ASCII);
gpr_asprintf(&hosts_with_port[i], "%s:%d", uri_part_str, port);
gpr_free(uri_part_str);
}
client_hostport = gpr_strjoin_sep((const char **)hosts_with_port,
uri_parts.count, ",", NULL);
for (i = 0; i < uri_parts.count; i++) {
gpr_free(hosts_with_port[i]);
}
gpr_free(hosts_with_port);
gpr_slice_buffer_destroy(&uri_parts);
gpr_slice_unref(uri_slice);
} else {
gpr_join_host_port(&client_hostport, client_host, port);
}
@ -260,7 +287,8 @@ int main(int argc, char **argv) {
test_connect("0.0.0.0", "127.0.0.1", 0, 1);
test_connect("0.0.0.0", "::ffff:127.0.0.1", 0, 1);
test_connect("0.0.0.0", "ipv4:127.0.0.1", 0, 1);
test_connect("0.0.0.0", "ipv6:[::ffff:127.0.0.1]", 0, 1);
test_connect("0.0.0.0", "ipv4:127.0.0.1,127.0.0.2,127.0.0.3", 0, 1);
test_connect("0.0.0.0", "ipv6:[::ffff:127.0.0.1],[::ffff:127.0.0.2]", 0, 1);
test_connect("0.0.0.0", "localhost", 0, 1);
if (do_ipv6) {
test_connect("::", "::1", 0, 1);

Loading…
Cancel
Save