Store subchannel address in a channel arg.

pull/9318/head
Mark D. Roth 8 years ago
parent 6b6954050c
commit 0748f3925c
  1. 3
      src/core/ext/client_channel/connector.h
  2. 40
      src/core/ext/client_channel/subchannel.c
  3. 8
      src/core/ext/client_channel/subchannel.h
  4. 12
      src/core/ext/client_channel/subchannel_index.c
  5. 15
      src/core/ext/lb_policy/pick_first/pick_first.c
  6. 15
      src/core/ext/lb_policy/round_robin/round_robin.c
  7. 10
      src/core/ext/transport/chttp2/client/chttp2_connector.c

@ -48,9 +48,6 @@ struct grpc_connector {
typedef struct {
/** set of pollsets interested in this connection */
grpc_pollset_set *interested_parties;
/** address to connect to */
const grpc_resolved_address *addr;
size_t addr_len;
/** initial connect string to send */
grpc_slice initial_connect_string;
/** deadline for connection */

@ -41,9 +41,12 @@
#include "src/core/ext/client_channel/client_channel.h"
#include "src/core/ext/client_channel/initial_connect_string.h"
#include "src/core/ext/client_channel/parse_address.h"
#include "src/core/ext/client_channel/subchannel_index.h"
#include "src/core/ext/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@ -95,8 +98,6 @@ struct grpc_subchannel {
size_t num_filters;
/** channel arguments */
grpc_channel_args *args;
/** address to connect to */
grpc_resolved_address *addr;
grpc_subchannel_key *key;
@ -211,7 +212,6 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
grpc_subchannel *c = arg;
gpr_free((void *)c->filters);
grpc_channel_args_destroy(exec_ctx, c->args);
gpr_free(c->addr);
grpc_slice_unref_internal(exec_ctx, c->initial_connect_string);
grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
grpc_connector_unref(exec_ctx, c->connector);
@ -327,12 +327,22 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
} else {
c->filters = NULL;
}
c->addr = gpr_malloc(sizeof(grpc_resolved_address));
if (args->addr->len)
memcpy(c->addr, args->addr, sizeof(grpc_resolved_address));
c->pollset_set = grpc_pollset_set_create();
grpc_set_initial_connect_string(&c->addr, &c->initial_connect_string);
c->args = grpc_channel_args_copy(args->args);
const grpc_arg *addr_arg =
grpc_channel_args_find(args->args, GRPC_ARG_SUBCHANNEL_ADDRESS);
GPR_ASSERT(addr_arg != NULL); // Should have been set by LB policy.
grpc_resolved_address *addr = gpr_malloc(sizeof(*addr));
grpc_uri_to_sockaddr(addr_arg->value.string, addr);
grpc_set_initial_connect_string(&addr, &c->initial_connect_string);
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
grpc_arg new_arg;
new_arg.key = GRPC_ARG_SUBCHANNEL_ADDRESS;
new_arg.type = GRPC_ARG_STRING;
new_arg.value.string = grpc_sockaddr_to_uri(addr);
gpr_free(addr);
c->args = grpc_channel_args_copy_and_add_and_remove(
args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
gpr_free(new_arg.value.string);
c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
grpc_closure_init(&c->connected, subchannel_connected, c,
@ -385,7 +395,6 @@ static void continue_connect_locked(grpc_exec_ctx *exec_ctx,
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
args.addr = c->addr;
args.deadline = c->next_attempt;
args.channel_args = c->args;
args.initial_connect_string = c->initial_connect_string;
@ -771,3 +780,16 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack(
grpc_subchannel_call *subchannel_call) {
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
}
void grpc_uri_to_sockaddr(char *uri_str, grpc_resolved_address *addr) {
grpc_uri *uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */);
GPR_ASSERT(uri != NULL);
if (strcmp(uri->scheme, "ipv4") == 0) {
GPR_ASSERT(parse_ipv4(uri, addr));
} else if (strcmp(uri->scheme, "ipv6") == 0) {
GPR_ASSERT(parse_ipv6(uri, addr));
} else {
GPR_ASSERT(parse_unix(uri, addr));
}
grpc_uri_destroy(uri);
}

@ -40,6 +40,9 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
// Channel arg containing a grpc_resolved_address to connect to.
#define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
@ -164,8 +167,6 @@ struct grpc_subchannel_args {
size_t filter_count;
/** Channel arguments to be supplied to the newly created channel */
const grpc_channel_args *args;
/** Address to connect to */
grpc_resolved_address *addr;
};
/** create a subchannel given a connector */
@ -173,4 +174,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
const grpc_subchannel_args *args);
/// Sets \a addr from \a uri_str.
void grpc_uri_to_sockaddr(char *uri_str, grpc_resolved_address *addr);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H */

@ -86,11 +86,6 @@ static grpc_subchannel_key *create_key(
} else {
k->args.filters = NULL;
}
k->args.addr = gpr_malloc(sizeof(grpc_resolved_address));
k->args.addr->len = args->addr->len;
if (k->args.addr->len > 0) {
memcpy(k->args.addr, args->addr, sizeof(grpc_resolved_address));
}
k->args.args = copy_channel_args(args->args);
return k;
}
@ -108,14 +103,8 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
grpc_subchannel_key *b) {
int c = GPR_ICMP(a->connector, b->connector);
if (c != 0) return c;
c = GPR_ICMP(a->args.addr->len, b->args.addr->len);
if (c != 0) return c;
c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
if (a->args.addr->len) {
c = memcmp(a->args.addr->addr, b->args.addr->addr, a->args.addr->len);
if (c != 0) return c;
}
if (a->args.filter_count > 0) {
c = memcmp(a->args.filters, b->args.filters,
a->args.filter_count * sizeof(*a->args.filters));
@ -129,7 +118,6 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_connector_unref(exec_ctx, k->connector);
gpr_free((grpc_channel_args *)k->args.filters);
grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args);
gpr_free(k->args.addr);
gpr_free(k);
}

@ -36,7 +36,9 @@
#include <grpc/support/alloc.h>
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
typedef struct pending_pick {
@ -466,11 +468,18 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = &addresses->addresses[i].address;
sc_args.args = args->args;
grpc_arg addr_arg;
addr_arg.key = GRPC_ARG_SUBCHANNEL_ADDRESS;
addr_arg.type = GRPC_ARG_STRING;
addr_arg.value.string =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
grpc_channel_args *new_args =
grpc_channel_args_copy_and_add(args->args, &addr_arg, 1);
gpr_free(addr_arg.value.string);
sc_args.args = new_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
grpc_channel_args_destroy(exec_ctx, new_args);
if (subchannel != NULL) {
p->subchannels[subchannel_idx++] = subchannel;

@ -64,8 +64,10 @@
#include <grpc/support/alloc.h>
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
@ -729,11 +731,18 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (addresses->addresses[i].is_balancer) continue;
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = &addresses->addresses[i].address;
sc_args.args = args->args;
grpc_arg addr_arg;
addr_arg.key = GRPC_ARG_SUBCHANNEL_ADDRESS;
addr_arg.type = GRPC_ARG_STRING;
addr_arg.value.string =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
grpc_channel_args *new_args =
grpc_channel_args_copy_and_add(args->args, &addr_arg, 1);
gpr_free(addr_arg.value.string);
sc_args.args = new_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
grpc_channel_args_destroy(exec_ctx, new_args);
if (subchannel != NULL) {
subchannel_data *sd = gpr_malloc(sizeof(*sd));

@ -43,6 +43,7 @@
#include "src/core/ext/client_channel/connector.h"
#include "src/core/ext/client_channel/http_connect_handshaker.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
@ -220,6 +221,11 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
grpc_connect_out_args *result,
grpc_closure *notify) {
chttp2_connector *c = (chttp2_connector *)con;
const grpc_arg *addr_arg =
grpc_channel_args_find(args->channel_args, GRPC_ARG_SUBCHANNEL_ADDRESS);
GPR_ASSERT(addr_arg != NULL); // Should have been set by LB policy.
grpc_resolved_address addr;
grpc_uri_to_sockaddr(addr_arg->value.string, &addr);
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->notify == NULL);
c->notify = notify;
@ -231,8 +237,8 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(!c->connecting);
c->connecting = true;
grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint,
args->interested_parties, args->channel_args,
args->addr, args->deadline);
args->interested_parties, args->channel_args, &addr,
args->deadline);
gpr_mu_unlock(&c->mu);
}

Loading…
Cancel
Save