|
|
|
@ -185,7 +185,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
|
|
|
|
|
initial_metadata_add_lb_token(wc_arg->initial_metadata, |
|
|
|
|
wc_arg->lb_token_mdelem_storage, |
|
|
|
|
wc_arg->lb_token); |
|
|
|
|
user_data_copy(wc_arg->lb_token)); |
|
|
|
|
|
|
|
|
|
grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL); |
|
|
|
|
gpr_free(wc_arg->owning_pending_node); |
|
|
|
@ -302,6 +302,12 @@ typedef struct glb_lb_policy { |
|
|
|
|
* response has arrived. */ |
|
|
|
|
grpc_grpclb_serverlist *serverlist; |
|
|
|
|
|
|
|
|
|
/** total number of valid addresses received in \a serverlist */ |
|
|
|
|
size_t num_ok_serverlist_addresses; |
|
|
|
|
|
|
|
|
|
/** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */ |
|
|
|
|
grpc_lb_address *lb_addresses; |
|
|
|
|
|
|
|
|
|
/** list of picks that are waiting on RR's policy connectivity */ |
|
|
|
|
pending_pick *pending_picks; |
|
|
|
|
|
|
|
|
@ -329,32 +335,39 @@ struct rr_connectivity_data { |
|
|
|
|
glb_lb_policy *glb_policy; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/* populate \a addresses according to \a serverlist. Returns the number of
|
|
|
|
|
* addresses successfully parsed and added to \a addresses */ |
|
|
|
|
static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, |
|
|
|
|
grpc_lb_address **lb_addresses) { |
|
|
|
|
size_t num_valid = 0; |
|
|
|
|
/* first pass: count how many are valid in order to allocate the necessary
|
|
|
|
|
* memory in a single block */ |
|
|
|
|
for (size_t i = 0; i < serverlist->num_servers; ++i) { |
|
|
|
|
const grpc_grpclb_server *server = serverlist->servers[i]; |
|
|
|
|
const grpc_grpclb_ip_address *ip = &server->ip_address; |
|
|
|
|
|
|
|
|
|
if (server->port >> 16 != 0) { |
|
|
|
|
static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, |
|
|
|
|
bool log) { |
|
|
|
|
const grpc_grpclb_ip_address *ip = &server->ip_address; |
|
|
|
|
if (server->port >> 16 != 0) { |
|
|
|
|
if (log) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"Invalid port '%d' at index %zu of serverlist. Ignoring.", |
|
|
|
|
server->port, i); |
|
|
|
|
continue; |
|
|
|
|
server->port, idx); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (ip->size != 4 && ip->size != 16) { |
|
|
|
|
if (ip->size != 4 && ip->size != 16) { |
|
|
|
|
if (log) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"Expected IP to be 4 or 16 bytes, got %d at index %zu of " |
|
|
|
|
"serverlist. Ignoring", |
|
|
|
|
ip->size, i); |
|
|
|
|
continue; |
|
|
|
|
ip->size, idx); |
|
|
|
|
} |
|
|
|
|
++num_valid; |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* populate \a addresses according to \a serverlist. Returns the number of
|
|
|
|
|
* addresses successfully parsed and added to \a addresses */ |
|
|
|
|
static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, |
|
|
|
|
grpc_lb_address **lb_addresses) { |
|
|
|
|
size_t num_valid = 0; |
|
|
|
|
/* first pass: count how many are valid in order to allocate the necessary
|
|
|
|
|
* memory in a single block */ |
|
|
|
|
for (size_t i = 0; i < serverlist->num_servers; ++i) { |
|
|
|
|
if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid; |
|
|
|
|
} |
|
|
|
|
if (num_valid == 0) { |
|
|
|
|
return 0; |
|
|
|
@ -368,9 +381,14 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, |
|
|
|
|
memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid); |
|
|
|
|
|
|
|
|
|
/* second pass: actually populate the addresses and LB tokens (aka user data
|
|
|
|
|
* to the outside world) to be read by the RR policy during its creation */ |
|
|
|
|
* to the outside world) to be read by the RR policy during its creation. |
|
|
|
|
* Given that the validity tests are very cheap, they are performed again |
|
|
|
|
* instead of marking the valid ones during the first pass, as this would |
|
|
|
|
* incurr in an allocation due to the arbitrary number of server */ |
|
|
|
|
size_t num_processed = 0; |
|
|
|
|
for (size_t i = 0; i < num_valid; ++i) { |
|
|
|
|
const grpc_grpclb_server *server = serverlist->servers[i]; |
|
|
|
|
if (!is_server_valid(serverlist->servers[i], i, false)) continue; |
|
|
|
|
grpc_lb_address *const lb_addr = &lb_addrs[i]; |
|
|
|
|
|
|
|
|
|
/* lb token processing */ |
|
|
|
@ -410,7 +428,9 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, |
|
|
|
|
addr6->sin6_port = netorder_port; |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(*sa_len > 0); |
|
|
|
|
++num_processed; |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(num_processed == num_valid); |
|
|
|
|
*lb_addresses = lb_addrs; |
|
|
|
|
return num_valid; |
|
|
|
|
} |
|
|
|
@ -423,19 +443,27 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_lb_policy_args args; |
|
|
|
|
memset(&args, 0, sizeof(args)); |
|
|
|
|
args.client_channel_factory = glb_policy->cc_factory; |
|
|
|
|
args.num_addresses = process_serverlist(serverlist, &args.lb_addresses); |
|
|
|
|
args.user_data_vtable.copy = user_data_copy; |
|
|
|
|
args.user_data_vtable.destroy = user_data_destroy; |
|
|
|
|
const size_t num_ok_addresses = |
|
|
|
|
process_serverlist(serverlist, &args.lb_addresses); |
|
|
|
|
args.num_addresses = num_ok_addresses; |
|
|
|
|
|
|
|
|
|
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); |
|
|
|
|
|
|
|
|
|
glb_policy->num_ok_serverlist_addresses = num_ok_addresses; |
|
|
|
|
if (glb_policy->lb_addresses != NULL) { |
|
|
|
|
/* dispose of the previous version */ |
|
|
|
|
for (size_t i = 0; i < num_ok_addresses; ++i) { |
|
|
|
|
user_data_destroy(glb_policy->lb_addresses[i].user_data); |
|
|
|
|
} |
|
|
|
|
gpr_free(glb_policy->lb_addresses); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
glb_policy->lb_addresses = args.lb_addresses; |
|
|
|
|
|
|
|
|
|
if (args.num_addresses > 0) { |
|
|
|
|
/* free "resolved" addresses memblock */ |
|
|
|
|
gpr_free(args.lb_addresses->resolved_address); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < args.num_addresses; ++i) { |
|
|
|
|
args.user_data_vtable.destroy(args.lb_addresses[i].user_data); |
|
|
|
|
} |
|
|
|
|
gpr_free(args.lb_addresses); |
|
|
|
|
return rr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -601,6 +629,11 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
grpc_grpclb_destroy_serverlist(glb_policy->serverlist); |
|
|
|
|
} |
|
|
|
|
gpr_mu_destroy(&glb_policy->mu); |
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < glb_policy->num_ok_serverlist_addresses; ++i) { |
|
|
|
|
user_data_destroy(glb_policy->lb_addresses[i].user_data); |
|
|
|
|
} |
|
|
|
|
gpr_free(glb_policy->lb_addresses); |
|
|
|
|
gpr_free(glb_policy); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -762,9 +795,9 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick"); |
|
|
|
|
|
|
|
|
|
/* add the load reporting initial metadata */ |
|
|
|
|
initial_metadata_add_lb_token(pick_args->initial_metadata, |
|
|
|
|
pick_args->lb_token_mdelem_storage, |
|
|
|
|
glb_policy->wc_arg.lb_token); |
|
|
|
|
initial_metadata_add_lb_token( |
|
|
|
|
pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, |
|
|
|
|
user_data_copy(glb_policy->wc_arg.lb_token)); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, |
|
|
|
|