Change LB policies to get their input from channel args.

pull/8462/head
Mark D. Roth 8 years ago
parent 3686996786
commit 5bd7be0c55
  1. 10
      src/core/ext/client_config/client_channel.c
  2. 61
      src/core/ext/lb_policy/grpclb/grpclb.c
  3. 31
      src/core/ext/lb_policy/pick_first/pick_first.c
  4. 30
      src/core/ext/lb_policy/round_robin/round_robin.c
  5. 55
      src/core/lib/channel/channel_args.c
  6. 11
      src/core/lib/channel/channel_args.h

@ -192,11 +192,17 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resolver_result_get_channel_args(chand->resolver_result); grpc_resolver_result_get_channel_args(chand->resolver_result);
lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy_args.client_channel_factory = chand->client_channel_factory;
// Find LB policy name.
const char *lb_policy_name = NULL;
const grpc_arg *lb_policy_name_arg =
grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_POLICY_NAME);
if (lb_policy_name_arg != NULL) {
GPR_ASSERT(lb_policy_name_arg->type == GRPC_ARG_STRING);
lb_policy_name = lb_policy_name_arg->value.string;
}
// Special case: If all of the addresses are balancer addresses, // Special case: If all of the addresses are balancer addresses,
// assume that we should use the grpclb policy, regardless of what the // assume that we should use the grpclb policy, regardless of what the
// resolver actually specified. // resolver actually specified.
const char *lb_policy_name =
grpc_resolver_result_get_lb_policy_name(chand->resolver_result);
bool found_backend_address = false; bool found_backend_address = false;
for (size_t i = 0; i < lb_policy_args.addresses->num_addresses; ++i) { for (size_t i = 0; i < lb_policy_args.addresses->num_addresses; ++i) {
if (!lb_policy_args.addresses->addresses[i].is_balancer) { if (!lb_policy_args.addresses->addresses[i].is_balancer) {

@ -470,9 +470,17 @@ static grpc_lb_policy *create_rr_locked(
args.server_name = glb_policy->server_name; args.server_name = glb_policy->server_name;
args.client_channel_factory = glb_policy->cc_factory; args.client_channel_factory = glb_policy->cc_factory;
args.addresses = process_serverlist(serverlist); args.addresses = process_serverlist(serverlist);
args.args = glb_policy->args;
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
const grpc_arg arg = grpc_lb_addresses_create_channel_arg(args.addresses);
args.args = grpc_channel_args_copy_and_add_and_remove(
glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
1);
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
grpc_channel_args_destroy(args.args);
if (glb_policy->addresses != NULL) { if (glb_policy->addresses != NULL) {
/* dispose of the previous version */ /* dispose of the previous version */
@ -574,6 +582,13 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory, grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) { grpc_lb_policy_args *args) {
/* Get server name. */
const grpc_arg* arg =
grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
const char* server_name =
arg != NULL && arg->type == GRPC_ARG_STRING
? arg->value.string : NULL;
/* Count the number of gRPC-LB addresses. There must be at least one. /* Count the number of gRPC-LB addresses. There must be at least one.
* TODO(roth): For now, we ignore non-balancer addresses, but in the * TODO(roth): For now, we ignore non-balancer addresses, but in the
* future, we may change the behavior such that we fall back to using * future, we may change the behavior such that we fall back to using
@ -581,22 +596,25 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* time, this should be changed to allow a list with no balancer addresses, * time, this should be changed to allow a list with no balancer addresses,
* since the resolver might fail to return a balancer address even when * since the resolver might fail to return a balancer address even when
* this is the right LB policy to use. */ * this is the right LB policy to use. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
grpc_lb_addresses* addresses = arg->value.pointer.p;
size_t num_grpclb_addrs = 0; size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < args->addresses->num_addresses; ++i) { for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs; if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
} }
if (num_grpclb_addrs == 0) return NULL; if (num_grpclb_addrs == 0) return NULL;
glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy)); glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
memset(glb_policy, 0, sizeof(*glb_policy)); memset(glb_policy, 0, sizeof(*glb_policy));
/* All input addresses in args->addresses come from a resolver that claims /* All input addresses in addresses come from a resolver that claims
* they are LB services. It's the resolver's responsibility to make sure * they are LB services. It's the resolver's responsibility to make sure
* this * this
* policy is only instantiated and used in that case. * policy is only instantiated and used in that case.
* *
* Create a client channel over them to communicate with a LB service */ * Create a client channel over them to communicate with a LB service */
glb_policy->server_name = gpr_strdup(args->server_name); glb_policy->server_name = gpr_strdup(server_name);
glb_policy->cc_factory = args->client_channel_factory; glb_policy->cc_factory = args->client_channel_factory;
glb_policy->args = grpc_channel_args_copy(args->args); glb_policy->args = grpc_channel_args_copy(args->args);
GPR_ASSERT(glb_policy->cc_factory != NULL); GPR_ASSERT(glb_policy->cc_factory != NULL);
@ -606,20 +624,20 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* TODO(dgq): support mixed ip version */ * TODO(dgq): support mixed ip version */
char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
size_t addr_index = 0; size_t addr_index = 0;
for (size_t i = 0; i < args->addresses->num_addresses; i++) { for (size_t i = 0; i < addresses->num_addresses; i++) {
if (args->addresses->addresses[i].user_data != NULL) { if (addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored"); "This LB policy doesn't support user data. It will be ignored");
} }
if (args->addresses->addresses[i].is_balancer) { if (addresses->addresses[i].is_balancer) {
if (addr_index == 0) { if (addr_index == 0) {
addr_strs[addr_index++] = grpc_sockaddr_to_uri( addr_strs[addr_index++] = grpc_sockaddr_to_uri(
(const struct sockaddr *)&args->addresses->addresses[i] (const struct sockaddr *)&addresses->addresses[i]
.address.addr); .address.addr);
} else { } else {
GPR_ASSERT(grpc_sockaddr_to_string( GPR_ASSERT(grpc_sockaddr_to_string(
&addr_strs[addr_index++], &addr_strs[addr_index++],
(const struct sockaddr *)&args->addresses->addresses[i] (const struct sockaddr *)&addresses->addresses[i]
.address.addr, .address.addr,
true) > 0); true) > 0);
} }
@ -629,10 +647,29 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs, char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs,
num_grpclb_addrs, ",", &uri_path_len); num_grpclb_addrs, ",", &uri_path_len);
/* will pick using pick_first */ /* Create a channel to talk to the LBs.
*
* We strip out the channel arg for the LB policy name, since we want
* to use the default (pick_first) in this case.
*
* We also strip out the channel arg for the resolved addresses, since
* that will be generated by the name resolver used in the LB channel.
* Note that the LB channel will use the sockaddr resolver, so this
* won't actually generate a query to DNS (or some other name service).
* However, the addresses returned by the sockaddr resolver will have
* is_balancer=false, whereas our own addresses have is_balancer=true.
* We need the LB channel to return addresses with is_balancer=false
* so that it does not wind up recursively using the grpclb LB policy,
* as per the special case logic in client_channel.c.
*/
static const char* keys_to_remove[] = {
GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES};
grpc_channel_args *new_args = grpc_channel_args_copy_and_remove(
args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove));
glb_policy->lb_channel = grpc_client_channel_factory_create_channel( glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
exec_ctx, glb_policy->cc_factory, target_uri_str, exec_ctx, glb_policy->cc_factory, target_uri_str,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, args->args); GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args);
grpc_channel_args_destroy(new_args);
gpr_free(target_uri_str); gpr_free(target_uri_str);
for (size_t i = 0; i < num_grpclb_addrs; i++) { for (size_t i = 0; i < num_grpclb_addrs; i++) {

@ -34,7 +34,9 @@
#include <string.h> #include <string.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
typedef struct pending_pick { typedef struct pending_pick {
@ -432,14 +434,23 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory, grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) { grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL); GPR_ASSERT(args->client_channel_factory != NULL);
/* Get server name. */
const grpc_arg* arg =
grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
const char* server_name =
arg != NULL && arg->type == GRPC_ARG_STRING
? arg->value.string : NULL;
/* Find the number of backend addresses. We ignore balancer /* Find the number of backend addresses. We ignore balancer
* addresses, since we don't know how to handle them. */ * addresses, since we don't know how to handle them. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
grpc_lb_addresses* addresses = arg->value.pointer.p;
size_t num_addrs = 0; size_t num_addrs = 0;
for (size_t i = 0; i < args->addresses->num_addresses; i++) { for (size_t i = 0; i < addresses->num_addresses; i++) {
if (!args->addresses->addresses[i].is_balancer) ++num_addrs; if (!addresses->addresses[i].is_balancer) ++num_addrs;
} }
if (num_addrs == 0) return NULL; if (num_addrs == 0) return NULL;
@ -450,22 +461,22 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args; grpc_subchannel_args sc_args;
size_t subchannel_idx = 0; size_t subchannel_idx = 0;
for (size_t i = 0; i < args->addresses->num_addresses; i++) { for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */ /* Skip balancer addresses, since we only know how to handle backends. */
if (args->addresses->addresses[i].is_balancer) continue; if (addresses->addresses[i].is_balancer) continue;
if (args->addresses->addresses[i].user_data != NULL) { if (addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored"); "This LB policy doesn't support user data. It will be ignored");
} }
memset(&sc_args, 0, sizeof(grpc_subchannel_args)); memset(&sc_args, 0, sizeof(grpc_subchannel_args));
/* server_name will be copied as part of the subchannel creation. This makes /* server_name will be copied as part of the subchannel creation. This makes
* the copying of args->server_name (a borrowed pointer) OK. */ * the copying of server_name (a borrowed pointer) OK. */
sc_args.server_name = args->server_name; sc_args.server_name = server_name;
sc_args.addr = sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr); (struct sockaddr *)(&addresses->addresses[i].address.addr);
sc_args.addr_len = args->addresses->addresses[i].address.len; sc_args.addr_len = addresses->addresses[i].address.len;
sc_args.args = args->args; sc_args.args = args->args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(

@ -64,6 +64,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/static_metadata.h"
@ -598,14 +599,23 @@ static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory, grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) { grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL); GPR_ASSERT(args->client_channel_factory != NULL);
/* Get server name. */
const grpc_arg* arg =
grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
const char* server_name =
arg != NULL && arg->type == GRPC_ARG_STRING
? arg->value.string : NULL;
/* Find the number of backend addresses. We ignore balancer /* Find the number of backend addresses. We ignore balancer
* addresses, since we don't know how to handle them. */ * addresses, since we don't know how to handle them. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
grpc_lb_addresses* addresses = arg->value.pointer.p;
size_t num_addrs = 0; size_t num_addrs = 0;
for (size_t i = 0; i < args->addresses->num_addresses; i++) { for (size_t i = 0; i < addresses->num_addresses; i++) {
if (!args->addresses->addresses[i].is_balancer) ++num_addrs; if (!addresses->addresses[i].is_balancer) ++num_addrs;
} }
if (num_addrs == 0) return NULL; if (num_addrs == 0) return NULL;
@ -618,17 +628,17 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_subchannel_args sc_args; grpc_subchannel_args sc_args;
size_t subchannel_idx = 0; size_t subchannel_idx = 0;
for (size_t i = 0; i < args->addresses->num_addresses; i++) { for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */ /* Skip balancer addresses, since we only know how to handle backends. */
if (args->addresses->addresses[i].is_balancer) continue; if (addresses->addresses[i].is_balancer) continue;
memset(&sc_args, 0, sizeof(grpc_subchannel_args)); memset(&sc_args, 0, sizeof(grpc_subchannel_args));
/* server_name will be copied as part of the subchannel creation. This makes /* server_name will be copied as part of the subchannel creation. This makes
* the copying of args->server_name (a borrowed pointer) OK. */ * the copying of server_name (a borrowed pointer) OK. */
sc_args.server_name = args->server_name; sc_args.server_name = server_name;
sc_args.addr = sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr); (struct sockaddr *)(&addresses->addresses[i].address.addr);
sc_args.addr_len = args->addresses->addresses[i].address.len; sc_args.addr_len = addresses->addresses[i].address.len;
sc_args.args = args->args; sc_args.args = args->args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
@ -641,7 +651,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p; sd->policy = p;
sd->index = subchannel_idx; sd->index = subchannel_idx;
sd->subchannel = subchannel; sd->subchannel = subchannel;
sd->user_data = args->addresses->addresses[i].user_data; sd->user_data = addresses->addresses[i].user_data;
++subchannel_idx; ++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure, grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd); rr_connectivity_changed, sd);

@ -66,22 +66,59 @@ static grpc_arg copy_arg(const grpc_arg *src) {
grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
const grpc_arg *to_add, const grpc_arg *to_add,
size_t num_to_add) { size_t num_to_add) {
return grpc_channel_args_copy_and_add_and_remove(src, NULL, 0, to_add,
num_to_add);
}
grpc_channel_args *grpc_channel_args_copy_and_remove(
const grpc_channel_args *src, const char** to_remove,
size_t num_to_remove) {
return grpc_channel_args_copy_and_add_and_remove(src, to_remove,
num_to_remove, NULL, 0);
}
static bool should_remove_arg(const grpc_arg* arg, const char** to_remove,
size_t num_to_remove) {
for (size_t i = 0; i < num_to_remove; ++i) {
if (strcmp(arg->key, to_remove[i]) == 0) return true;
}
return false;
}
grpc_channel_args *grpc_channel_args_copy_and_add_and_remove(
const grpc_channel_args *src, const char** to_remove, size_t num_to_remove,
const grpc_arg *to_add, size_t num_to_add) {
// Figure out how many args we'll be copying.
size_t num_args_to_copy = 0;
if (src != NULL) {
for (size_t i = 0; i < src->num_args; ++i) {
if (!should_remove_arg(&src->args[i], to_remove, num_to_remove)) {
++num_args_to_copy;
}
}
}
// Create result.
grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args)); grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args));
size_t i; dst->num_args = num_args_to_copy + num_to_add;
size_t src_num_args = (src == NULL) ? 0 : src->num_args; if (dst->num_args == 0) {
if (!src && !to_add) {
dst->num_args = 0;
dst->args = NULL; dst->args = NULL;
return dst; return dst;
} }
dst->num_args = src_num_args + num_to_add;
dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args); dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args);
for (i = 0; i < src_num_args; i++) { // Copy args from src that are not being removed.
dst->args[i] = copy_arg(&src->args[i]); size_t dst_idx = 0;
if (src != NULL) {
for (size_t i = 0; i < src->num_args; ++i) {
if (!should_remove_arg(&src->args[i], to_remove, num_to_remove)) {
dst->args[dst_idx++] = copy_arg(&src->args[i]);
}
}
} }
for (i = 0; i < num_to_add; i++) { // Add args from to_add.
dst->args[i + src_num_args] = copy_arg(&to_add[i]); for (size_t i = 0; i < num_to_add; ++i) {
dst->args[dst_idx++] = copy_arg(&to_add[i]);
} }
GPR_ASSERT(dst_idx == dst->num_args);
return dst; return dst;
} }

@ -51,6 +51,17 @@ grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
const grpc_arg *to_add, const grpc_arg *to_add,
size_t num_to_add); size_t num_to_add);
/** Copies the arguments in \a src except for those whose keys are in
\a to_remove. */
grpc_channel_args *grpc_channel_args_copy_and_remove(
const grpc_channel_args *src, const char** to_remove, size_t num_to_remove);
/** Copies the arguments from \a src except for those whose keys are in
\a to_remove and appends the arguments in \a to_add. */
grpc_channel_args *grpc_channel_args_copy_and_add_and_remove(
const grpc_channel_args *src, const char** to_remove, size_t num_to_remove,
const grpc_arg *to_add, size_t num_to_add);
/** Concatenate args from \a a and \a b into a new instance */ /** Concatenate args from \a a and \a b into a new instance */
grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
const grpc_channel_args *b); const grpc_channel_args *b);

Loading…
Cancel
Save