|
|
@ -107,6 +107,7 @@ |
|
|
|
#include <grpc/support/string_util.h> |
|
|
|
#include <grpc/support/string_util.h> |
|
|
|
|
|
|
|
|
|
|
|
#include "src/core/ext/client_config/client_channel_factory.h" |
|
|
|
#include "src/core/ext/client_config/client_channel_factory.h" |
|
|
|
|
|
|
|
#include "src/core/ext/client_config/lb_policy_factory.h" |
|
|
|
#include "src/core/ext/client_config/lb_policy_registry.h" |
|
|
|
#include "src/core/ext/client_config/lb_policy_registry.h" |
|
|
|
#include "src/core/ext/client_config/parse_address.h" |
|
|
|
#include "src/core/ext/client_config/parse_address.h" |
|
|
|
#include "src/core/ext/lb_policy/grpclb/grpclb.h" |
|
|
|
#include "src/core/ext/lb_policy/grpclb/grpclb.h" |
|
|
@ -120,18 +121,6 @@ |
|
|
|
|
|
|
|
|
|
|
|
int grpc_lb_glb_trace = 0; |
|
|
|
int grpc_lb_glb_trace = 0; |
|
|
|
|
|
|
|
|
|
|
|
static void lb_addrs_destroy(grpc_lb_address *lb_addresses, |
|
|
|
|
|
|
|
size_t num_addresses) { |
|
|
|
|
|
|
|
/* free "resolved" addresses memblock */ |
|
|
|
|
|
|
|
gpr_free(lb_addresses->resolved_address); |
|
|
|
|
|
|
|
for (size_t i = 0; i < num_addresses; ++i) { |
|
|
|
|
|
|
|
if (lb_addresses[i].user_data != NULL) { |
|
|
|
|
|
|
|
GRPC_MDELEM_UNREF(lb_addresses[i].user_data); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
gpr_free(lb_addresses); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* add lb_token of selected subchannel (address) to the call's initial
|
|
|
|
/* add lb_token of selected subchannel (address) to the call's initial
|
|
|
|
* metadata */ |
|
|
|
* metadata */ |
|
|
|
static void initial_metadata_add_lb_token( |
|
|
|
static void initial_metadata_add_lb_token( |
|
|
@ -311,11 +300,8 @@ typedef struct glb_lb_policy { |
|
|
|
* response has arrived. */ |
|
|
|
* response has arrived. */ |
|
|
|
grpc_grpclb_serverlist *serverlist; |
|
|
|
grpc_grpclb_serverlist *serverlist; |
|
|
|
|
|
|
|
|
|
|
|
/** total number of valid addresses received in \a serverlist */ |
|
|
|
/** addresses from \a serverlist */ |
|
|
|
size_t num_ok_serverlist_addresses; |
|
|
|
grpc_lb_addresses *addresses; |
|
|
|
|
|
|
|
|
|
|
|
/** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */ |
|
|
|
|
|
|
|
grpc_lb_address *lb_addresses; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** list of picks that are waiting on RR's policy connectivity */ |
|
|
|
/** list of picks that are waiting on RR's policy connectivity */ |
|
|
|
pending_pick *pending_picks; |
|
|
|
pending_pick *pending_picks; |
|
|
@ -368,26 +354,18 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* populate \a addresses according to \a serverlist. Returns the number of
|
|
|
|
/* Returns addresses extracted from \a serverlist. */ |
|
|
|
* addresses successfully parsed and added to \a addresses */ |
|
|
|
static grpc_lb_addresses *process_serverlist( |
|
|
|
static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, |
|
|
|
const grpc_grpclb_serverlist *serverlist) { |
|
|
|
grpc_lb_address **lb_addresses) { |
|
|
|
|
|
|
|
size_t num_valid = 0; |
|
|
|
size_t num_valid = 0; |
|
|
|
/* first pass: count how many are valid in order to allocate the necessary
|
|
|
|
/* first pass: count how many are valid in order to allocate the necessary
|
|
|
|
* memory in a single block */ |
|
|
|
* memory in a single block */ |
|
|
|
for (size_t i = 0; i < serverlist->num_servers; ++i) { |
|
|
|
for (size_t i = 0; i < serverlist->num_servers; ++i) { |
|
|
|
if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid; |
|
|
|
if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid; |
|
|
|
} |
|
|
|
} |
|
|
|
if (num_valid == 0) { |
|
|
|
if (num_valid == 0) return NULL; |
|
|
|
return 0; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* allocate the memory block for the "resolved" addresses. */ |
|
|
|
grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid); |
|
|
|
grpc_resolved_address *r_addrs_memblock = |
|
|
|
|
|
|
|
gpr_malloc(sizeof(grpc_resolved_address) * num_valid); |
|
|
|
|
|
|
|
memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid); |
|
|
|
|
|
|
|
grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid); |
|
|
|
|
|
|
|
memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* second pass: actually populate the addresses and LB tokens (aka user data
|
|
|
|
/* second pass: actually populate the addresses and LB tokens (aka user data
|
|
|
|
* to the outside world) to be read by the RR policy during its creation. |
|
|
|
* to the outside world) to be read by the RR policy during its creation. |
|
|
@ -399,56 +377,58 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, |
|
|
|
GPR_ASSERT(addr_idx < num_valid); |
|
|
|
GPR_ASSERT(addr_idx < num_valid); |
|
|
|
const grpc_grpclb_server *server = serverlist->servers[sl_idx]; |
|
|
|
const grpc_grpclb_server *server = serverlist->servers[sl_idx]; |
|
|
|
if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; |
|
|
|
if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; |
|
|
|
grpc_lb_address *const lb_addr = &lb_addrs[addr_idx]; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* address processing */ |
|
|
|
/* address processing */ |
|
|
|
const uint16_t netorder_port = htons((uint16_t)server->port); |
|
|
|
const uint16_t netorder_port = htons((uint16_t)server->port); |
|
|
|
/* the addresses are given in binary format (a in(6)_addr struct) in
|
|
|
|
/* the addresses are given in binary format (a in(6)_addr struct) in
|
|
|
|
* server->ip_address.bytes. */ |
|
|
|
* server->ip_address.bytes. */ |
|
|
|
const grpc_grpclb_ip_address *ip = &server->ip_address; |
|
|
|
const grpc_grpclb_ip_address *ip = &server->ip_address; |
|
|
|
|
|
|
|
grpc_resolved_address addr; |
|
|
|
lb_addr->resolved_address = &r_addrs_memblock[addr_idx]; |
|
|
|
memset(&addr, 0, sizeof(addr)); |
|
|
|
struct sockaddr_storage *sa = |
|
|
|
|
|
|
|
(struct sockaddr_storage *)lb_addr->resolved_address->addr; |
|
|
|
|
|
|
|
size_t *sa_len = &lb_addr->resolved_address->len; |
|
|
|
|
|
|
|
*sa_len = 0; |
|
|
|
|
|
|
|
if (ip->size == 4) { |
|
|
|
if (ip->size == 4) { |
|
|
|
struct sockaddr_in *addr4 = (struct sockaddr_in *)sa; |
|
|
|
addr.len = sizeof(struct sockaddr_in); |
|
|
|
*sa_len = sizeof(struct sockaddr_in); |
|
|
|
struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr; |
|
|
|
memset(addr4, 0, *sa_len); |
|
|
|
|
|
|
|
addr4->sin_family = AF_INET; |
|
|
|
addr4->sin_family = AF_INET; |
|
|
|
memcpy(&addr4->sin_addr, ip->bytes, ip->size); |
|
|
|
memcpy(&addr4->sin_addr, ip->bytes, ip->size); |
|
|
|
addr4->sin_port = netorder_port; |
|
|
|
addr4->sin_port = netorder_port; |
|
|
|
} else if (ip->size == 16) { |
|
|
|
} else if (ip->size == 16) { |
|
|
|
struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa; |
|
|
|
addr.len = sizeof(struct sockaddr_in6); |
|
|
|
*sa_len = sizeof(struct sockaddr_in6); |
|
|
|
struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr; |
|
|
|
memset(addr6, 0, *sa_len); |
|
|
|
|
|
|
|
addr6->sin6_family = AF_INET; |
|
|
|
addr6->sin6_family = AF_INET; |
|
|
|
memcpy(&addr6->sin6_addr, ip->bytes, ip->size); |
|
|
|
memcpy(&addr6->sin6_addr, ip->bytes, ip->size); |
|
|
|
addr6->sin6_port = netorder_port; |
|
|
|
addr6->sin6_port = netorder_port; |
|
|
|
} |
|
|
|
} |
|
|
|
GPR_ASSERT(*sa_len > 0); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* lb token processing */ |
|
|
|
/* lb token processing */ |
|
|
|
|
|
|
|
void *user_data; |
|
|
|
if (server->has_load_balance_token) { |
|
|
|
if (server->has_load_balance_token) { |
|
|
|
const size_t lb_token_size = |
|
|
|
const size_t lb_token_size = |
|
|
|
GPR_ARRAY_SIZE(server->load_balance_token) - 1; |
|
|
|
GPR_ARRAY_SIZE(server->load_balance_token) - 1; |
|
|
|
grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( |
|
|
|
grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( |
|
|
|
(uint8_t *)server->load_balance_token, lb_token_size); |
|
|
|
(uint8_t *)server->load_balance_token, lb_token_size); |
|
|
|
lb_addr->user_data = grpc_mdelem_from_metadata_strings( |
|
|
|
user_data = grpc_mdelem_from_metadata_strings( |
|
|
|
GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); |
|
|
|
GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
"Missing LB token for backend address '%s'. The empty token will " |
|
|
|
"Missing LB token for backend address '%s'. The empty token will " |
|
|
|
"be used instead", |
|
|
|
"be used instead", |
|
|
|
grpc_sockaddr_to_uri((struct sockaddr *)sa)); |
|
|
|
grpc_sockaddr_to_uri((struct sockaddr *)&addr.addr)); |
|
|
|
lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY; |
|
|
|
user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, |
|
|
|
|
|
|
|
false /* is_balancer */, |
|
|
|
|
|
|
|
NULL /* balancer_name */, user_data); |
|
|
|
++addr_idx; |
|
|
|
++addr_idx; |
|
|
|
} |
|
|
|
} |
|
|
|
GPR_ASSERT(addr_idx == num_valid); |
|
|
|
GPR_ASSERT(addr_idx == num_valid); |
|
|
|
*lb_addresses = lb_addrs; |
|
|
|
|
|
|
|
return num_valid; |
|
|
|
return lb_addresses; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* A plugin for grpc_lb_addresses_destroy that unrefs the LB token metadata. */ |
|
|
|
|
|
|
|
static void lb_token_destroy(void *token) { |
|
|
|
|
|
|
|
if (token != NULL) GRPC_MDELEM_UNREF(token); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, |
|
|
|
static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, |
|
|
@ -459,19 +439,15 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_lb_policy_args args; |
|
|
|
grpc_lb_policy_args args; |
|
|
|
memset(&args, 0, sizeof(args)); |
|
|
|
memset(&args, 0, sizeof(args)); |
|
|
|
args.client_channel_factory = glb_policy->cc_factory; |
|
|
|
args.client_channel_factory = glb_policy->cc_factory; |
|
|
|
const size_t num_ok_addresses = |
|
|
|
args.addresses = process_serverlist(serverlist); |
|
|
|
process_serverlist(serverlist, &args.addresses); |
|
|
|
|
|
|
|
args.num_addresses = num_ok_addresses; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); |
|
|
|
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); |
|
|
|
|
|
|
|
|
|
|
|
if (glb_policy->lb_addresses != NULL) { |
|
|
|
if (glb_policy->addresses != NULL) { |
|
|
|
/* dispose of the previous version */ |
|
|
|
/* dispose of the previous version */ |
|
|
|
lb_addrs_destroy(glb_policy->lb_addresses, |
|
|
|
grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy); |
|
|
|
glb_policy->num_ok_serverlist_addresses); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
glb_policy->num_ok_serverlist_addresses = num_ok_addresses; |
|
|
|
glb_policy->addresses = args.addresses; |
|
|
|
glb_policy->lb_addresses = args.addresses; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return rr; |
|
|
|
return rr; |
|
|
|
} |
|
|
|
} |
|
|
@ -565,6 +541,19 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, |
|
|
|
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, |
|
|
|
grpc_lb_policy_factory *factory, |
|
|
|
grpc_lb_policy_factory *factory, |
|
|
|
grpc_lb_policy_args *args) { |
|
|
|
grpc_lb_policy_args *args) { |
|
|
|
|
|
|
|
/* Count the number of gRPC-LB addresses. There must be at least one.
|
|
|
|
|
|
|
|
* TODO(roth): For now, we ignore non-balancer addresses, but in the |
|
|
|
|
|
|
|
* future, we may change the behavior such that we fall back to using |
|
|
|
|
|
|
|
* the non-balancer addresses if we cannot reach any balancers. At that |
|
|
|
|
|
|
|
* time, this should be changed to allow a list with no balancer addresses, |
|
|
|
|
|
|
|
* since the resolver might fail to return a balancer address even when |
|
|
|
|
|
|
|
* this is the right LB policy to use. */ |
|
|
|
|
|
|
|
size_t num_grpclb_addrs = 0; |
|
|
|
|
|
|
|
for (size_t i = 0; i < args->addresses->num_addresses; ++i) { |
|
|
|
|
|
|
|
if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (num_grpclb_addrs == 0) return NULL; |
|
|
|
|
|
|
|
|
|
|
|
glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy)); |
|
|
|
glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy)); |
|
|
|
memset(glb_policy, 0, sizeof(*glb_policy)); |
|
|
|
memset(glb_policy, 0, sizeof(*glb_policy)); |
|
|
|
|
|
|
|
|
|
|
@ -576,36 +565,34 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, |
|
|
|
* Create a client channel over them to communicate with a LB service */ |
|
|
|
* Create a client channel over them to communicate with a LB service */ |
|
|
|
glb_policy->cc_factory = args->client_channel_factory; |
|
|
|
glb_policy->cc_factory = args->client_channel_factory; |
|
|
|
GPR_ASSERT(glb_policy->cc_factory != NULL); |
|
|
|
GPR_ASSERT(glb_policy->cc_factory != NULL); |
|
|
|
if (args->num_addresses == 0) { |
|
|
|
|
|
|
|
return NULL; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (args->addresses[0].user_data != NULL) { |
|
|
|
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
|
|
|
"This LB policy doesn't support user data. It will be ignored"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* construct a target from the addresses in args, given in the form
|
|
|
|
/* construct a target from the addresses in args, given in the form
|
|
|
|
* ipvX://ip1:port1,ip2:port2,...
|
|
|
|
* ipvX://ip1:port1,ip2:port2,...
|
|
|
|
* TODO(dgq): support mixed ip version */ |
|
|
|
* TODO(dgq): support mixed ip version */ |
|
|
|
char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses); |
|
|
|
char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); |
|
|
|
addr_strs[0] = grpc_sockaddr_to_uri( |
|
|
|
size_t addr_index = 0; |
|
|
|
(const struct sockaddr *)&args->addresses[0].resolved_address->addr); |
|
|
|
for (size_t i = 0; i < args->addresses->num_addresses; i++) { |
|
|
|
for (size_t i = 1; i < args->num_addresses; i++) { |
|
|
|
if (args->addresses->addresses[i].user_data != NULL) { |
|
|
|
if (args->addresses[i].user_data != NULL) { |
|
|
|
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
"This LB policy doesn't support user data. It will be ignored"); |
|
|
|
"This LB policy doesn't support user data. It will be ignored"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (args->addresses->addresses[i].is_balancer) { |
|
|
|
GPR_ASSERT( |
|
|
|
if (addr_index == 0) { |
|
|
|
grpc_sockaddr_to_string( |
|
|
|
addr_strs[addr_index++] = grpc_sockaddr_to_uri( |
|
|
|
&addr_strs[i], |
|
|
|
(const struct sockaddr *)&args->addresses->addresses[i] |
|
|
|
(const struct sockaddr *)&args->addresses[i].resolved_address->addr, |
|
|
|
.address.addr); |
|
|
|
true) == 0); |
|
|
|
} else { |
|
|
|
|
|
|
|
GPR_ASSERT(grpc_sockaddr_to_string( |
|
|
|
|
|
|
|
&addr_strs[addr_index++], |
|
|
|
|
|
|
|
(const struct sockaddr *)&args->addresses->addresses[i] |
|
|
|
|
|
|
|
.address.addr, |
|
|
|
|
|
|
|
true) == 0); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
size_t uri_path_len; |
|
|
|
size_t uri_path_len; |
|
|
|
char *target_uri_str = gpr_strjoin_sep( |
|
|
|
char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs, |
|
|
|
(const char **)addr_strs, args->num_addresses, ",", &uri_path_len); |
|
|
|
num_grpclb_addrs, ",", &uri_path_len); |
|
|
|
|
|
|
|
|
|
|
|
/* will pick using pick_first */ |
|
|
|
/* will pick using pick_first */ |
|
|
|
glb_policy->lb_channel = grpc_client_channel_factory_create_channel( |
|
|
|
glb_policy->lb_channel = grpc_client_channel_factory_create_channel( |
|
|
@ -613,7 +600,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, |
|
|
|
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); |
|
|
|
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); |
|
|
|
|
|
|
|
|
|
|
|
gpr_free(target_uri_str); |
|
|
|
gpr_free(target_uri_str); |
|
|
|
for (size_t i = 0; i < args->num_addresses; i++) { |
|
|
|
for (size_t i = 0; i < num_grpclb_addrs; i++) { |
|
|
|
gpr_free(addr_strs[i]); |
|
|
|
gpr_free(addr_strs[i]); |
|
|
|
} |
|
|
|
} |
|
|
|
gpr_free(addr_strs); |
|
|
|
gpr_free(addr_strs); |
|
|
@ -649,9 +636,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
grpc_grpclb_destroy_serverlist(glb_policy->serverlist); |
|
|
|
grpc_grpclb_destroy_serverlist(glb_policy->serverlist); |
|
|
|
} |
|
|
|
} |
|
|
|
gpr_mu_destroy(&glb_policy->mu); |
|
|
|
gpr_mu_destroy(&glb_policy->mu); |
|
|
|
|
|
|
|
grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy); |
|
|
|
lb_addrs_destroy(glb_policy->lb_addresses, |
|
|
|
|
|
|
|
glb_policy->num_ok_serverlist_addresses); |
|
|
|
|
|
|
|
gpr_free(glb_policy); |
|
|
|
gpr_free(glb_policy); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|