Moved LB token changes solely into grpclb.c

pull/7746/head
David Garcia Quintas 8 years ago
parent 42adfb89ce
commit 331b9c02f9
  1. 2
      src/core/ext/client_config/client_channel.c
  2. 5
      src/core/ext/client_config/lb_policy.c
  3. 20
      src/core/ext/client_config/lb_policy.h
  4. 25
      src/core/ext/client_config/lb_policy_factory.h
  5. 198
      src/core/ext/lb_policy/grpclb/grpclb.c
  6. 18
      src/core/ext/lb_policy/pick_first/pick_first.c
  7. 89
      src/core/ext/lb_policy/round_robin/round_robin.c
  8. 9
      src/core/ext/resolver/dns/native/dns_resolver.c
  9. 10
      src/core/ext/resolver/sockaddr/sockaddr_resolver.c
  10. 18
      src/core/lib/transport/metadata.c
  11. 2
      src/core/lib/transport/metadata.h

@ -572,7 +572,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
initial_metadata_flags,
&calld->lb_token_mdelem};
r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
on_ready);
NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
return r;

@ -101,9 +101,10 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pick_args, target, on_complete);
return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data,
on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,

@ -72,7 +72,8 @@ struct grpc_lb_policy_vtable {
/** \see grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, grpc_closure *on_complete);
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete);
/** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@ -138,14 +139,19 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
/** Find an appropriate target for this call, based on \a pick_args.
Upon completion \a on_complete will be called, with \a *target set to an
appropriate connected subchannel if the pick was successful or NULL
otherwise.
Picking can be asynchronous. Any IO should be done under \a
pick_args->pollent. */
Picking can be synchronous or asynchronous. In the synchronous case, when a
pick is readily available, it'll be returned in \a target and a non-zero
value will be returned.
In the asynchronous case, zero is returned and \a on_complete will be called
once \a target and \a user_data have been set. Any IO should be done under
\a
pick_args->pollent.
The opaque \a user_data output argument corresponds to information that may
need be propagated from the LB policy. It may be NULL.
Errors are signaled by receiving a NULL \a *target. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete);
/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)

@ -47,16 +47,25 @@ struct grpc_lb_policy_factory {
const grpc_lb_policy_factory_vtable *vtable;
};
typedef struct grpc_lb_policy_address_token {
uint8_t *token;
size_t token_size;
} grpc_lb_policy_address_token;
/** A resolved address alongside any LB related information associated with it.
* \a user_data, if not \a NULL, is opaque and meant to be consumed by the gRPC
* LB policy. Anywhere else, refer to the functions in \a
* grpc_lb_policy_user_data_vtable to operate with it */
typedef struct grpc_lb_address {
grpc_resolved_address *resolved_address;
void *user_data;
} grpc_lb_address;
/** Functions acting upon the opaque \a grpc_lb_address.user_data */
typedef struct grpc_lb_policy_user_data_vtable {
void *(*copy)(void *);
void (*destroy)(void *);
} grpc_lb_policy_user_data_vtable;
typedef struct grpc_lb_policy_args {
grpc_resolved_addresses *addresses;
/* If not NULL, array of load balancing tokens associated with \a addresses,
* on a 1:1 correspondence. Some indices may be NULL for missing tokens. */
grpc_lb_policy_address_token *tokens;
grpc_lb_address *lb_addresses;
size_t num_addresses;
grpc_lb_policy_user_data_vtable user_data_vtable;
grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;

@ -116,14 +116,50 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/static_metadata.h"
int grpc_lb_glb_trace = 0;
static void *user_data_copy(void *user_data) {
if (user_data == NULL) return NULL;
return GRPC_MDELEM_REF(user_data);
}
static void user_data_destroy(void *user_data) {
if (user_data == NULL) return;
GRPC_MDELEM_UNREF(user_data);
}
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
static void initial_metadata_add_lb_token(
grpc_metadata_batch *initial_metadata,
grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
GPR_ASSERT(lb_token_mdelem_storage != NULL);
GPR_ASSERT(lb_token != NULL);
grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
lb_token);
}
typedef struct wrapped_rr_closure_arg {
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
grpc_closure *wrapped_closure;
/* the pick's initial metadata, kept in order to append the LB token for the
* pick */
grpc_metadata_batch *initial_metadata;
/* the picked target, used to determine which LB token to add to the pick's
* initial metadata */
grpc_connected_subchannel **target;
/* the LB token associated with the pick */
grpc_mdelem *lb_token;
/* storage for the lb token initial metadata mdelem */
grpc_linked_mdelem *lb_token_mdelem_storage;
/* The RR instance related to the closure */
grpc_lb_policy *rr_policy;
@ -146,6 +182,11 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
}
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
initial_metadata_add_lb_token(wc_arg->initial_metadata,
wc_arg->lb_token_mdelem_storage,
wc_arg->lb_token);
grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
gpr_free(wc_arg->owning_pending_node);
}
@ -194,12 +235,15 @@ static void add_pending_pick(pending_pick **root,
memset(pp, 0, sizeof(pending_pick));
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root;
pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
pp->pollent = pick_args->pollent;
pp->target = target;
pp->initial_metadata = pick_args->initial_metadata;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
pick_args->lb_token_mdelem_storage;
grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
&pp->wrapped_on_complete_arg);
*root = pp;
@ -285,16 +329,70 @@ struct rr_connectivity_data {
glb_lb_policy *glb_policy;
};
static bool process_serverlist(const grpc_grpclb_server *server,
struct sockaddr_storage *sa, size_t *sa_len) {
/* populate \a addresses according to \a serverlist. Returns the number of
* addresses successfully parsed and added to \a addresses */
static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
grpc_lb_address **lb_addresses) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
* memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
const grpc_grpclb_server *server = serverlist->servers[i];
const grpc_grpclb_ip_address *ip = &server->ip_address;
if (server->port >> 16 != 0) {
gpr_log(GPR_ERROR, "Invalid port '%d'.", server->port);
return false;
gpr_log(GPR_ERROR,
"Invalid port '%d' at index %zu of serverlist. Ignoring.",
server->port, i);
continue;
}
if (ip->size != 4 && ip->size != 16) {
gpr_log(GPR_ERROR,
"Expected IP to be 4 or 16 bytes, got %d at index %zu of "
"serverlist. Ignoring",
ip->size, i);
continue;
}
++num_valid;
}
if (num_valid == 0) {
return 0;
}
/* allocate the memory block for the "resolved" addresses. */
grpc_resolved_address *r_addrs_memblock =
gpr_malloc(sizeof(grpc_resolved_address) * num_valid);
memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid);
grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid);
memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid);
/* second pass: actually populate the addresses and LB tokens (aka user data
* to the outside world) to be read by the RR policy during its creation */
for (size_t i = 0; i < num_valid; ++i) {
const grpc_grpclb_server *server = serverlist->servers[i];
grpc_lb_address *const lb_addr = &lb_addrs[i];
/* lb token processing */
if (server->has_load_balance_token) {
const size_t lb_token_size =
GPR_ARRAY_SIZE(server->load_balance_token) - 1;
grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
(uint8_t *)server->load_balance_token, lb_token_size);
lb_addr->user_data = grpc_mdelem_from_metadata_strings(
GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
}
/* address processing */
const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
const grpc_grpclb_ip_address *ip = &server->ip_address;
lb_addr->resolved_address = &r_addrs_memblock[i];
struct sockaddr_storage *sa =
(struct sockaddr_storage *)lb_addr->resolved_address->addr;
size_t *sa_len = &lb_addr->resolved_address->len;
*sa_len = 0;
if (ip->size == 4) {
struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
@ -303,19 +401,18 @@ static bool process_serverlist(const grpc_grpclb_server *server,
addr4->sin_family = AF_INET;
memcpy(&addr4->sin_addr, ip->bytes, ip->size);
addr4->sin_port = netorder_port;
} else if (ip->size == 6) {
} else if (ip->size == 16) {
struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
*sa_len = sizeof(struct sockaddr_in6);
memset(addr6, 0, *sa_len);
addr6->sin6_family = AF_INET;
memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
addr6->sin6_port = netorder_port;
} else {
gpr_log(GPR_ERROR, "Expected IP to be 4 or 16 bytes. Got %d.", ip->size);
return false;
}
GPR_ASSERT(*sa_len > 0);
return true;
}
*lb_addresses = lb_addrs;
return num_valid;
}
static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
@ -326,36 +423,19 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args args;
memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
args.tokens = gpr_malloc(sizeof(grpc_lb_policy_address_token) *
serverlist->num_servers);
args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
args.addresses->addrs =
gpr_malloc(sizeof(grpc_resolved_address) * serverlist->num_servers);
size_t addr_idx = 0;
for (size_t i = 0; i < serverlist->num_servers; ++i) {
const grpc_grpclb_server *server = serverlist->servers[i];
grpc_resolved_address *raddr = &args.addresses->addrs[addr_idx];
if (!process_serverlist(server, (struct sockaddr_storage *)raddr->addr,
&raddr->len)) {
gpr_log(GPR_INFO,
"Problem processing server at index %zu of received serverlist, "
"ignoring.",
i);
continue;
}
++addr_idx;
args.tokens[i].token_size = GPR_ARRAY_SIZE(server->load_balance_token) - 1;
args.tokens[i].token = gpr_malloc(args.tokens[i].token_size);
memcpy(args.tokens[i].token, server->load_balance_token,
args.tokens[i].token_size);
}
args.addresses->naddrs = addr_idx;
args.num_addresses = process_serverlist(serverlist, &args.lb_addresses);
args.user_data_vtable.copy = user_data_copy;
args.user_data_vtable.destroy = user_data_destroy;
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
gpr_free(args.addresses->addrs);
gpr_free(args.addresses);
gpr_free(args.tokens);
if (args.num_addresses > 0) {
/* free "resolved" addresses memblock */
gpr_free(args.lb_addresses->resolved_address);
}
for (size_t i = 0; i < args.num_addresses; ++i) {
args.user_data_vtable.destroy(args.lb_addresses[i].user_data);
}
gpr_free(args.lb_addresses);
return rr;
}
@ -395,6 +475,7 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
pp->lb_token_mdelem_storage};
grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
(void **)&pp->wrapped_on_complete_arg.lb_token,
&pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
}
@ -457,25 +538,26 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* Create a client channel over them to communicate with a LB service */
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
if (args->addresses->naddrs == 0) {
if (args->num_addresses == 0) {
return NULL;
}
/* construct a target from the args->addresses, in the form
/* construct a target from the addresses in args, given in the form
* ipvX://ip1:port1,ip2:port2,...
* TODO(dgq): support mixed ip version */
char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs);
addr_strs[0] =
grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]);
for (size_t i = 1; i < args->addresses->naddrs; i++) {
GPR_ASSERT(grpc_sockaddr_to_string(
&addr_strs[i],
(const struct sockaddr *)&args->addresses->addrs[i],
char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses);
addr_strs[0] = grpc_sockaddr_to_uri(
(const struct sockaddr *)&args->lb_addresses[0].resolved_address->addr);
for (size_t i = 1; i < args->num_addresses; i++) {
GPR_ASSERT(
grpc_sockaddr_to_string(&addr_strs[i],
(const struct sockaddr *)&args->lb_addresses[i]
.resolved_address->addr,
true) == 0);
}
size_t uri_path_len;
char *target_uri_str = gpr_strjoin_sep(
(const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len);
(const char **)addr_strs, args->num_addresses, ",", &uri_path_len);
/* will pick using pick_first */
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
@ -483,7 +565,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
gpr_free(target_uri_str);
for (size_t i = 0; i < args->addresses->naddrs; i++) {
for (size_t i = 0; i < args->num_addresses; i++) {
gpr_free(addr_strs[i]);
}
gpr_free(addr_strs);
@ -635,7 +717,7 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
@ -662,22 +744,28 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
glb_policy->wc_arg.wrapped_closure = on_complete;
glb_policy->wc_arg.lb_token_mdelem_storage =
pick_args->lb_token_mdelem_storage;
glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
glb_policy->wc_arg.owning_pending_node = NULL;
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg);
r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
(void **)&glb_policy->wc_arg.lb_token,
&glb_policy->wrapped_on_complete);
if (r != 0) {
/* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
* policy and notify the original callback */
glb_policy->wc_arg.wrapped_closure = NULL;
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
(intptr_t)glb_policy->wc_arg.rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure,
GRPC_ERROR_NONE, NULL);
/* add the load reporting initial metadata */
initial_metadata_add_lb_token(pick_args->initial_metadata,
pick_args->lb_token_mdelem_storage,
glb_policy->wc_arg.lb_token);
}
} else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,

@ -200,7 +200,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
@ -438,23 +438,23 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->lb_addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
if (args->addresses->naddrs == 0) return NULL;
if (args->num_addresses == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
p->subchannels =
gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs);
memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_addresses);
memset(p->subchannels, 0, sizeof(*p->subchannels) * args->num_addresses);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
for (size_t i = 0; i < args->addresses->naddrs; i++) {
for (size_t i = 0; i < args->num_addresses; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
sc_args.addr =
(struct sockaddr *)(args->lb_addresses[i].resolved_address->addr);
sc_args.addr_len = (size_t)args->lb_addresses[i].resolved_address->len;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);

@ -84,8 +84,10 @@ typedef struct pending_pick {
/* the initial metadata for the pick. See grpc_lb_policy_pick() */
grpc_metadata_batch *initial_metadata;
/* storage for the lb token initial metadata mdelem */
grpc_linked_mdelem *lb_token_mdelem_storage;
/* output argument where to store the pick()ed user_data. It'll be NULL if no
* such data is present or there's an error (the definite test for errors is
* \a target being NULL). */
void **user_data;
/* bitmask passed to pick() and used for selective cancelling. See
* grpc_lb_policy_cancel_picks() */
@ -103,7 +105,7 @@ typedef struct pending_pick {
typedef struct ready_list {
grpc_subchannel *subchannel;
/* references namesake entry in subchannel_data */
grpc_lb_policy_address_token *lb_token;
void *user_data;
struct ready_list *next;
struct ready_list *prev;
} ready_list;
@ -121,8 +123,8 @@ typedef struct {
ready_list *ready_list_node;
/** last observed connectivity */
grpc_connectivity_state connectivity_state;
/** the subchannel's target LB token */
grpc_lb_policy_address_token *lb_token;
/** the subchannel's target user data */
void *user_data;
} subchannel_data;
struct round_robin_lb_policy {
@ -131,8 +133,10 @@ struct round_robin_lb_policy {
/** total number of addresses received at creation time */
size_t num_addresses;
/** load balancing tokens, one per incoming address */
grpc_lb_policy_address_token *lb_tokens;
/** user data, one per incoming address */
void **user_data;
/** functions to operate over \a user_data elements */
grpc_lb_policy_user_data_vtable user_data_vtable;
/** all our subchannels */
size_t num_subchannels;
@ -204,7 +208,7 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
memset(new_elem, 0, sizeof(ready_list));
new_elem->subchannel = sd->subchannel;
new_elem->lb_token = sd->lb_token;
new_elem->user_data = sd->user_data;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@ -246,7 +250,7 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
}
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node,
gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node,
(void *)node->subchannel);
}
@ -259,9 +263,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
size_t i;
ready_list *elem;
for (i = 0; i < p->num_subchannels; i++) {
for (size_t i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
gpr_free(sd);
@ -282,12 +285,10 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
elem = tmp;
}
if (p->lb_tokens != NULL) {
for (i = 0; i < p->num_addresses; i++) {
gpr_free(p->lb_tokens[i].token);
}
gpr_free(p->lb_tokens);
for (size_t i = 0; i < p->num_addresses; i++) {
p->user_data_vtable.destroy(p->user_data[i]);
}
gpr_free(p->user_data);
gpr_free(p);
}
@ -397,26 +398,9 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_mu_unlock(&p->mu);
}
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
static void initial_metadata_add_lb_token(
grpc_metadata_batch *initial_metadata,
grpc_linked_mdelem *lb_token_mdelem_storage,
grpc_lb_policy_address_token *lb_token) {
if (lb_token != NULL && lb_token->token_size > 0) {
GPR_ASSERT(lb_token->token != NULL);
grpc_mdstr *lb_token_mdstr =
grpc_mdstr_from_buffer(lb_token->token, lb_token->token_size);
grpc_metadata_batch_add_tail(
initial_metadata, lb_token_mdelem_storage,
grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LOAD_REPORTING_INITIAL,
lb_token_mdstr));
}
}
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
@ -426,9 +410,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
/* readily available, report right away */
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
initial_metadata_add_lb_token(pick_args->initial_metadata,
pick_args->lb_token_mdelem_storage,
selected->lb_token);
*user_data = p->user_data_vtable.copy(selected->user_data);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
@ -451,7 +433,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->on_complete = on_complete;
pp->initial_metadata = pick_args->initial_metadata;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
pp->user_data = user_data;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@ -493,11 +475,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
initial_metadata_add_lb_token(pp->initial_metadata,
pp->lb_token_mdelem_storage,
selected->lb_token);
*pp->target =
grpc_subchannel_get_connected_subchannel(selected->subchannel);
*pp->user_data = p->user_data_vtable.copy(selected->user_data);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
@ -631,30 +611,29 @@ static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->lb_addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
if (args->num_addresses == 0) return NULL;
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
p->num_addresses = args->addresses->naddrs;
if (args->tokens != NULL) {
/* we need to copy because args contents aren't owned */
p->lb_tokens =
gpr_malloc(sizeof(grpc_lb_policy_address_token) * p->num_addresses);
memcpy(p->lb_tokens, args->tokens,
sizeof(grpc_lb_policy_address_token) * p->num_addresses);
}
p->num_addresses = args->num_addresses;
p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses);
memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
p->user_data = gpr_malloc(sizeof(void *) * p->num_addresses);
memset(p->user_data, 0, sizeof(void *) * p->num_addresses);
p->user_data_vtable = args->user_data_vtable;
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
for (size_t i = 0; i < p->num_addresses; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
sc_args.addr =
(struct sockaddr *)args->lb_addresses[i].resolved_address->addr;
sc_args.addr_len = args->lb_addresses[i].resolved_address->len;
p->user_data[i] = p->user_data_vtable.copy(args->lb_addresses[i].user_data);
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
@ -666,9 +645,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
if (p->lb_tokens != NULL) {
sd->lb_token = &p->lb_tokens[i];
}
sd->user_data = p->user_data[i];
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);

@ -175,7 +175,14 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy_args lb_policy_args;
result = grpc_resolver_result_create();
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = addresses;
lb_policy_args.num_addresses = addresses->naddrs;
lb_policy_args.lb_addresses =
gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
memset(lb_policy_args.lb_addresses, 0,
sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
for (size_t i = 0; i < addresses->naddrs; ++i) {
lb_policy_args.lb_addresses[i].resolved_address = &r->addresses->addrs[i];
}
lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);

@ -125,10 +125,18 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_resolver_result *result = grpc_resolver_result_create();
grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = r->addresses;
lb_policy_args.num_addresses = r->addresses->naddrs;
lb_policy_args.lb_addresses =
gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
memset(lb_policy_args.lb_addresses, 0,
sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
for (size_t i = 0; i < lb_policy_args.num_addresses; ++i) {
lb_policy_args.lb_addresses[i].resolved_address = &r->addresses->addrs[i];
}
lb_policy_args.client_channel_factory = r->client_channel_factory;
grpc_lb_policy *lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
gpr_free(lb_policy_args.lb_addresses);
grpc_resolver_result_set_lb_policy(result, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = 1;

@ -278,7 +278,7 @@ static void ref_md_locked(mdtab_shard *shard,
internal_metadata *md DEBUG_ARGS) {
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"ELM REF:%p:%d->%d: '%s' = '%s'", md,
"ELM REF:%p:%zu->%zu: '%s' = '%s'", (void *)md,
gpr_atm_no_barrier_load(&md->refcnt),
gpr_atm_no_barrier_load(&md->refcnt) + 1,
grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
@ -566,7 +566,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdstr *mkey,
shard->elems[idx] = md;
gpr_mu_init(&md->mu_user_data);
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "ELM NEW:%p:%d: '%s' = '%s'", md,
gpr_log(GPR_DEBUG, "ELM NEW:%p:%zu: '%s' = '%s'", (void *)md,
gpr_atm_no_barrier_load(&md->refcnt),
grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
grpc_mdstr_as_c_string((grpc_mdstr *)md->value));
@ -639,7 +639,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) {
if (is_mdelem_static(gmd)) return gmd;
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"ELM REF:%p:%d->%d: '%s' = '%s'", md,
"ELM REF:%p:%zu->%zu: '%s' = '%s'", (void *)md,
gpr_atm_no_barrier_load(&md->refcnt),
gpr_atm_no_barrier_load(&md->refcnt) + 1,
grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
@ -649,7 +649,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) {
this function - meaning that no adjustment to mdtab_free is necessary,
simplifying the logic here to be just an atomic increment */
/* use C assert to have this removed in opt builds */
assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1);
GPR_ASSERT(gpr_atm_no_barrier_load(&md->refcnt) >= 1);
gpr_atm_no_barrier_fetch_add(&md->refcnt, 1);
return gmd;
}
@ -660,14 +660,16 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) {
if (is_mdelem_static(gmd)) return;
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"ELM UNREF:%p:%d->%d: '%s' = '%s'", md,
"ELM UNREF:%p:%zu->%zu: '%s' = '%s'", (void *)md,
gpr_atm_no_barrier_load(&md->refcnt),
gpr_atm_no_barrier_load(&md->refcnt) - 1,
grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
grpc_mdstr_as_c_string((grpc_mdstr *)md->value));
#endif
uint32_t hash = GRPC_MDSTR_KV_HASH(md->key->hash, md->value->hash);
if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) {
const gpr_atm prev_refcount = gpr_atm_full_fetch_add(&md->refcnt, -1);
GPR_ASSERT(prev_refcount >= 1);
if (1 == prev_refcount) {
/* once the refcount hits zero, some other thread can come along and
free md at any time: it's unsafe from this point on to access it */
mdtab_shard *shard =
@ -676,10 +678,12 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) {
}
}
const char *grpc_mdstr_as_c_string(grpc_mdstr *s) {
const char *grpc_mdstr_as_c_string(const grpc_mdstr *s) {
return (const char *)GPR_SLICE_START_PTR(s->slice);
}
size_t grpc_mdstr_length(const grpc_mdstr *s) { return GRPC_MDSTR_LENGTH(s); }
grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *gs DEBUG_ARGS) {
internal_string *s = (internal_string *)gs;
if (is_mdstr_static(gs)) return gs;

@ -147,7 +147,7 @@ void grpc_mdelem_unref(grpc_mdelem *md);
/* Recover a char* from a grpc_mdstr. The returned string is null terminated.
Does not promise that the returned string has no embedded nulls however. */
const char *grpc_mdstr_as_c_string(grpc_mdstr *s);
const char *grpc_mdstr_as_c_string(const grpc_mdstr *s);
#define GRPC_MDSTR_LENGTH(s) (GPR_SLICE_LENGTH(s->slice))

Loading…
Cancel
Save