Change PF and RR to expect only backends.

pull/12342/head
Juanli Shen 8 years ago committed by Juanli Shen
parent f874e7640f
commit 8af54b8bfa
  1. 16
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
  2. 16
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c

@ -296,8 +296,6 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_args *args) { const grpc_lb_policy_args *args) {
pick_first_lb_policy *p = (pick_first_lb_policy *)policy; pick_first_lb_policy *p = (pick_first_lb_policy *)policy;
/* Find the number of backend addresses. We ignore balancer
* addresses, since we don't know how to handle them. */
const grpc_arg *arg = const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) { if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
@ -317,11 +315,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
return; return;
} }
const grpc_lb_addresses *addresses = arg->value.pointer.p; const grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0; if (addresses->num_addresses == 0) {
for (size_t i = 0; i < addresses->num_addresses; i++) {
if (!addresses->addresses[i].is_balancer) ++num_addrs;
}
if (num_addrs == 0) {
// Empty update. Unsubscribe from all current subchannels and put the // Empty update. Unsubscribe from all current subchannels and put the
// channel in TRANSIENT_FAILURE. // channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set( grpc_connectivity_state_set(
@ -333,9 +327,10 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
} }
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
(void *)p, (unsigned long)num_addrs); (void *)p, (unsigned long)addresses->num_addresses);
} }
grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs); grpc_subchannel_args *sc_args =
gpr_zalloc(sizeof(*sc_args) * addresses->num_addresses);
/* We remove the following keys in order for subchannel keys belonging to /* We remove the following keys in order for subchannel keys belonging to
* subchannels point to the same address to match. */ * subchannels point to the same address to match. */
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
@ -344,7 +339,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
/* Create list of subchannel args for new addresses in \a args. */ /* Create list of subchannel args for new addresses in \a args. */
for (size_t i = 0; i < addresses->num_addresses; i++) { for (size_t i = 0; i < addresses->num_addresses; i++) {
if (addresses->addresses[i].is_balancer) continue; // If there were any balancer, we would have chosen grpclb policy instead.
GPR_ASSERT(!addresses->addresses[i].is_balancer);
if (addresses->addresses[i].user_data != NULL) { if (addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored"); "This LB policy doesn't support user data. It will be ignored");

@ -737,8 +737,6 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_args *args) { const grpc_lb_policy_args *args) {
round_robin_lb_policy *p = (round_robin_lb_policy *)policy; round_robin_lb_policy *p = (round_robin_lb_policy *)policy;
/* Find the number of backend addresses. We ignore balancer addresses, since
* we don't know how to handle them. */
const grpc_arg *arg = const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) { if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
@ -757,12 +755,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
return; return;
} }
grpc_lb_addresses *addresses = arg->value.pointer.p; grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0; rr_subchannel_list *subchannel_list =
for (size_t i = 0; i < addresses->num_addresses; i++) { rr_subchannel_list_create(p, addresses->num_addresses);
if (!addresses->addresses[i].is_balancer) ++num_addrs; if (addresses->num_addresses == 0) {
}
rr_subchannel_list *subchannel_list = rr_subchannel_list_create(p, num_addrs);
if (num_addrs == 0) {
grpc_connectivity_state_set( grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
@ -794,9 +789,8 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
GRPC_ARG_LB_ADDRESSES}; GRPC_ARG_LB_ADDRESSES};
/* Create subchannels for addresses in the update. */ /* Create subchannels for addresses in the update. */
for (size_t i = 0; i < addresses->num_addresses; i++) { for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */ // If there were any balancer, we would have chosen grpclb policy instead.
if (addresses->addresses[i].is_balancer) continue; GPR_ASSERT(!addresses->addresses[i].is_balancer);
GPR_ASSERT(i < num_addrs);
memset(&sc_args, 0, sizeof(grpc_subchannel_args)); memset(&sc_args, 0, sizeof(grpc_subchannel_args));
grpc_arg addr_arg = grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address); grpc_create_subchannel_address_arg(&addresses->addresses[i].address);

Loading…
Cancel
Save