Merge pull request #7746 from dgquintas/lb_add_md

Updates to gRPC LB
pull/8112/head
David G. Quintas 9 years ago committed by GitHub
commit b410f80bc4
  1. 10
      src/core/ext/client_config/client_channel.c
  2. 10
      src/core/ext/client_config/lb_policy.c
  3. 60
      src/core/ext/client_config/lb_policy.h
  4. 13
      src/core/ext/client_config/lb_policy_factory.h
  5. 384
      src/core/ext/lb_policy/grpclb/grpclb.c
  6. 5
      src/core/ext/lb_policy/grpclb/load_balancer_api.c
  7. 1
      src/core/ext/lb_policy/grpclb/load_balancer_api.h
  8. 8
      src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
  9. 30
      src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
  10. 31
      src/core/ext/lb_policy/pick_first/pick_first.c
  11. 95
      src/core/ext/lb_policy/round_robin/round_robin.c
  12. 10
      src/core/ext/resolver/dns/native/dns_resolver.c
  13. 10
      src/core/ext/resolver/sockaddr/sockaddr_resolver.c
  14. 18
      src/core/lib/transport/metadata.c
  15. 2
      src/core/lib/transport/metadata.h
  16. 4
      src/proto/grpc/lb/v1/load_balancer.options
  17. 29
      src/proto/grpc/lb/v1/load_balancer.proto
  18. 44
      test/cpp/grpclb/grpclb_api_test.cc
  19. 154
      test/cpp/grpclb/grpclb_test.cc
  20. 2
      third_party/nanopb
  21. 2
      tools/codegen/core/gen_nano_proto.sh
  22. 2
      tools/run_tests/sanity/check_submodules.sh

@ -394,6 +394,8 @@ typedef struct client_channel_call_data {
grpc_closure next_step;
grpc_call_stack *owning_call;
grpc_linked_mdelem lb_token_mdelem;
} call_data;
static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
@ -566,9 +568,11 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
int r;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
gpr_mu_unlock(&chand->mu);
r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent,
initial_metadata, initial_metadata_flags,
connected_subchannel, on_ready);
const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
initial_metadata_flags,
&calld->lb_token_mdelem};
r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
return r;

@ -100,13 +100,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
}
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata,
initial_metadata_flags, target, on_complete);
return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data,
on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,

@ -53,23 +53,38 @@ struct grpc_lb_policy {
grpc_pollset_set *interested_parties;
};
/** Extra arguments for an LB pick */
typedef struct grpc_lb_policy_pick_args {
/** Parties interested in the pick's progress */
grpc_polling_entity *pollent;
/** Initial metadata associated with the picking call. */
grpc_metadata_batch *initial_metadata;
/** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */
uint32_t initial_metadata_flags;
/** Storage for LB token in \a initial_metadata, or NULL if not used */
grpc_linked_mdelem *lb_token_mdelem_storage;
} grpc_lb_policy_pick_args;
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** implement grpc_lb_policy_pick */
/** \see grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete);
/** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
/** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
/** \see grpc_lb_policy_ping_one */
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@ -83,8 +98,7 @@ struct grpc_lb_policy_vtable {
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy. Calling with a NULL \a
state cancels the subscription.
*/
state cancels the subscription. */
void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
grpc_connectivity_state *state,
@ -124,26 +138,34 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
/** Given initial metadata in \a initial_metadata, find an appropriate
target for this rpc, and 'return' it by calling \a on_complete after setting
\a target.
Picking can be asynchronous. Any IO should be done under \a pollent. */
/** Find an appropriate target for this call, based on \a pick_args.
Picking can be synchronous or asynchronous. In the synchronous case, when a
pick is readily available, it'll be returned in \a target and a non-zero
value will be returned.
In the asynchronous case, zero is returned and \a on_complete will be called
once \a target and \a user_data have been set. Any IO should be done under
\a pick_args->pollent. The opaque \a user_data output argument corresponds
to information that may need be propagated from the LB policy. It may be
NULL. Errors are signaled by receiving a NULL \a *target. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete);
/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
/** Cancel picks for \a target.
The \a on_complete callback of the pending picks will be invoked with \a
*target set to NULL. */
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
/** Cancel all pending picks which have:
(initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq */
/** Cancel all pending picks for which their \a initial_metadata_flags (as given
in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
when AND'd with \a initial_metadata_flags_mask */
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,

@ -47,8 +47,19 @@ struct grpc_lb_policy_factory {
const grpc_lb_policy_factory_vtable *vtable;
};
/** A resolved address alongside any LB related information associated with it.
* \a user_data, if not NULL, contains opaque data meant to be consumed by the
* gRPC LB policy. Note that no all LB policies support \a user_data as input.
* Those who don't will simply ignore it and will correspondingly return NULL in
* their namesake pick() output argument. */
typedef struct grpc_lb_address {
grpc_resolved_address *resolved_address;
void *user_data;
} grpc_lb_address;
typedef struct grpc_lb_policy_args {
grpc_resolved_addresses *addresses;
grpc_lb_address *addresses;
size_t num_addresses;
grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;

@ -76,9 +76,9 @@
* operations in progress over the old RR instance. This is done by
* decreasing the reference count on the old policy. The moment no more
* references are held on the old RR policy, it'll be destroyed and \a
* rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state.
* At this point we can transition to a new RR instance safely, which is done
* once again via \a rr_handover().
* glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
* state. At this point we can transition to a new RR instance safely, which
* is done once again via \a rr_handover().
*
*
* Once a RR policy instance is in place (and getting updated as described),
@ -96,6 +96,8 @@
* - Implement LB service forwarding (point 2c. in the doc's diagram).
*/
#include <errno.h>
#include <string.h>
#include <grpc/byte_buffer_reader.h>
@ -109,18 +111,57 @@
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/static_metadata.h"
int grpc_lb_glb_trace = 0;
static void lb_addrs_destroy(grpc_lb_address *lb_addresses,
size_t num_addresses) {
/* free "resolved" addresses memblock */
gpr_free(lb_addresses->resolved_address);
for (size_t i = 0; i < num_addresses; ++i) {
if (lb_addresses[i].user_data != NULL) {
GRPC_MDELEM_UNREF(lb_addresses[i].user_data);
}
}
gpr_free(lb_addresses);
}
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
static void initial_metadata_add_lb_token(
grpc_metadata_batch *initial_metadata,
grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
GPR_ASSERT(lb_token_mdelem_storage != NULL);
GPR_ASSERT(lb_token != NULL);
grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
lb_token);
}
typedef struct wrapped_rr_closure_arg {
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
grpc_closure *wrapped_closure;
/* the pick's initial metadata, kept in order to append the LB token for the
* pick */
grpc_metadata_batch *initial_metadata;
/* the picked target, used to determine which LB token to add to the pick's
* initial metadata */
grpc_connected_subchannel **target;
/* the LB token associated with the pick */
grpc_mdelem *lb_token;
/* storage for the lb token initial metadata mdelem */
grpc_linked_mdelem *lb_token_mdelem_storage;
/* The RR instance related to the closure */
grpc_lb_policy *rr_policy;
@ -143,6 +184,11 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
}
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
initial_metadata_add_lb_token(wc_arg->initial_metadata,
wc_arg->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
gpr_free(wc_arg->owning_pending_node);
}
@ -164,6 +210,9 @@ typedef struct pending_pick {
/* the initial metadata for the pick. See grpc_lb_policy_pick() */
grpc_metadata_batch *initial_metadata;
/* storage for the lb token initial metadata mdelem */
grpc_linked_mdelem *lb_token_mdelem_storage;
/* bitmask passed to pick() and used for selective cancelling. See
* grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
@ -180,20 +229,23 @@ typedef struct pending_pick {
wrapped_rr_closure_arg wrapped_on_complete_arg;
} pending_pick;
static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
static void add_pending_pick(pending_pick **root,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pending_pick *pp = gpr_malloc(sizeof(*pp));
memset(pp, 0, sizeof(pending_pick));
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root;
pp->pollent = pollent;
pp->pollent = pick_args->pollent;
pp->target = target;
pp->initial_metadata = initial_metadata;
pp->initial_metadata_flags = initial_metadata_flags;
pp->initial_metadata = pick_args->initial_metadata;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
pick_args->lb_token_mdelem_storage;
grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
&pp->wrapped_on_complete_arg);
*root = pp;
@ -252,6 +304,12 @@ typedef struct glb_lb_policy {
* response has arrived. */
grpc_grpclb_serverlist *serverlist;
/** total number of valid addresses received in \a serverlist */
size_t num_ok_serverlist_addresses;
/** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */
grpc_lb_address *lb_addresses;
/** list of picks that are waiting on RR's policy connectivity */
pending_pick *pending_picks;
@ -279,58 +337,142 @@ struct rr_connectivity_data {
glb_lb_policy *glb_policy;
};
static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
const grpc_grpclb_serverlist *serverlist,
glb_lb_policy *glb_policy) {
/* TODO(dgq): support mixed ip version */
GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers);
for (size_t i = 0; i < serverlist->num_servers; ++i) {
gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address,
serverlist->servers[i]->port);
static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
bool log) {
const grpc_grpclb_ip_address *ip = &server->ip_address;
if (server->port >> 16 != 0) {
if (log) {
gpr_log(GPR_ERROR,
"Invalid port '%d' at index %zu of serverlist. Ignoring.",
server->port, idx);
}
return false;
}
size_t uri_path_len;
char *concat_ipports = gpr_strjoin_sep(
(const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
if (ip->size != 4 && ip->size != 16) {
if (log) {
gpr_log(GPR_ERROR,
"Expected IP to be 4 or 16 bytes, got %d at index %zu of "
"serverlist. Ignoring",
ip->size, idx);
}
return false;
}
return true;
}
grpc_lb_policy_args args;
args.client_channel_factory = glb_policy->cc_factory;
args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
args.addresses->naddrs = serverlist->num_servers;
args.addresses->addrs =
gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs);
size_t out_addrs_idx = 0;
/* populate \a addresses according to \a serverlist. Returns the number of
* addresses successfully parsed and added to \a addresses */
static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
grpc_lb_address **lb_addresses) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
* memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
grpc_uri uri;
struct sockaddr_storage sa;
size_t sa_len;
uri.path = host_ports[i];
if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
args.addresses->addrs[out_addrs_idx].len = sa_len;
++out_addrs_idx;
if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
}
if (num_valid == 0) {
return 0;
}
/* allocate the memory block for the "resolved" addresses. */
grpc_resolved_address *r_addrs_memblock =
gpr_malloc(sizeof(grpc_resolved_address) * num_valid);
memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid);
grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid);
memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid);
/* second pass: actually populate the addresses and LB tokens (aka user data
* to the outside world) to be read by the RR policy during its creation.
* Given that the validity tests are very cheap, they are performed again
* instead of marking the valid ones during the first pass, as this would
* incurr in an allocation due to the arbitrary number of server */
size_t addr_idx = 0;
for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
GPR_ASSERT(addr_idx < num_valid);
const grpc_grpclb_server *server = serverlist->servers[sl_idx];
if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
grpc_lb_address *const lb_addr = &lb_addrs[addr_idx];
/* address processing */
const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
const grpc_grpclb_ip_address *ip = &server->ip_address;
lb_addr->resolved_address = &r_addrs_memblock[addr_idx];
struct sockaddr_storage *sa =
(struct sockaddr_storage *)lb_addr->resolved_address->addr;
size_t *sa_len = &lb_addr->resolved_address->len;
*sa_len = 0;
if (ip->size == 4) {
struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
*sa_len = sizeof(struct sockaddr_in);
memset(addr4, 0, *sa_len);
addr4->sin_family = AF_INET;
memcpy(&addr4->sin_addr, ip->bytes, ip->size);
addr4->sin_port = netorder_port;
} else if (ip->size == 16) {
struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
*sa_len = sizeof(struct sockaddr_in6);
memset(addr6, 0, *sa_len);
addr6->sin6_family = AF_INET;
memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
addr6->sin6_port = netorder_port;
}
GPR_ASSERT(*sa_len > 0);
/* lb token processing */
if (server->has_load_balance_token) {
const size_t lb_token_size =
GPR_ARRAY_SIZE(server->load_balance_token) - 1;
grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
(uint8_t *)server->load_balance_token, lb_token_size);
lb_addr->user_data = grpc_mdelem_from_metadata_strings(
GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
} else {
gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
host_ports[i]);
gpr_log(GPR_ERROR,
"Missing LB token for backend address '%s'. The empty token will "
"be used instead",
grpc_sockaddr_to_uri((struct sockaddr *)sa));
lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
}
++addr_idx;
}
GPR_ASSERT(addr_idx == num_valid);
*lb_addresses = lb_addrs;
return num_valid;
}
static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
const grpc_grpclb_serverlist *serverlist,
glb_lb_policy *glb_policy) {
GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
grpc_lb_policy_args args;
memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
const size_t num_ok_addresses =
process_serverlist(serverlist, &args.addresses);
args.num_addresses = num_ok_addresses;
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
gpr_free(concat_ipports);
for (size_t i = 0; i < serverlist->num_servers; i++) {
gpr_free(host_ports[i]);
if (glb_policy->lb_addresses != NULL) {
/* dispose of the previous version */
lb_addrs_destroy(glb_policy->lb_addresses,
glb_policy->num_ok_serverlist_addresses);
}
gpr_free(host_ports);
gpr_free(args.addresses->addrs);
gpr_free(args.addresses);
glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
glb_policy->lb_addresses = args.addresses;
return rr;
}
static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_error *error) {
GRPC_ERROR_REF(error);
GPR_ASSERT(glb_policy->serverlist != NULL &&
glb_policy->serverlist->num_servers > 0);
glb_policy->rr_policy =
create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
@ -345,8 +487,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
&glb_policy->rr_connectivity->on_change);
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
glb_policy->rr_connectivity->state, error,
"rr_handover");
glb_policy->rr_connectivity->state,
GRPC_ERROR_REF(error), "rr_handover");
grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
/* flush pending ops */
@ -359,9 +501,12 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent,
pp->initial_metadata, pp->initial_metadata_flags,
pp->target, &pp->wrapped_on_complete);
const grpc_lb_policy_pick_args pick_args = {
pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
pp->lb_token_mdelem_storage};
grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
(void **)&pp->wrapped_on_complete_arg.lb_token,
&pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
}
@ -378,13 +523,13 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
&pping->wrapped_notify);
pping->wrapped_notify_arg.owning_pending_node = pping;
}
GRPC_ERROR_UNREF(error);
}
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
rr_connectivity_data *rr_conn_data = arg;
glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
if (glb_policy->serverlist != NULL) {
/* a RR policy is shutting down but there's a serverlist available ->
@ -398,8 +543,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (error == GRPC_ERROR_NONE) {
/* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
rr_conn_data->state, error,
"rr_connectivity_changed");
rr_conn_data->state, GRPC_ERROR_REF(error),
"glb_rr_connectivity_changed");
/* resubscribe */
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state,
@ -408,7 +553,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(rr_conn_data);
}
}
GRPC_ERROR_UNREF(error);
}
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@ -418,31 +562,43 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
memset(glb_policy, 0, sizeof(*glb_policy));
/* All input addresses in args->addresses come from a resolver that claims
* they are LB services. It's the resolver's responsibility to make sure this
* they are LB services. It's the resolver's responsibility to make sure
* this
* policy is only instantiated and used in that case.
*
* Create a client channel over them to communicate with a LB service */
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
if (args->addresses->naddrs == 0) {
if (args->num_addresses == 0) {
return NULL;
}
/* construct a target from the args->addresses, in the form
if (args->addresses[0].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
/* construct a target from the addresses in args, given in the form
* ipvX://ip1:port1,ip2:port2,...
* TODO(dgq): support mixed ip version */
char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs);
addr_strs[0] =
grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]);
for (size_t i = 1; i < args->addresses->naddrs; i++) {
GPR_ASSERT(grpc_sockaddr_to_string(
&addr_strs[i],
(const struct sockaddr *)&args->addresses->addrs[i],
true) == 0);
char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses);
addr_strs[0] = grpc_sockaddr_to_uri(
(const struct sockaddr *)&args->addresses[0].resolved_address->addr);
for (size_t i = 1; i < args->num_addresses; i++) {
if (args->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
GPR_ASSERT(
grpc_sockaddr_to_string(
&addr_strs[i],
(const struct sockaddr *)&args->addresses[i].resolved_address->addr,
true) == 0);
}
size_t uri_path_len;
char *target_uri_str = gpr_strjoin_sep(
(const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len);
(const char **)addr_strs, args->num_addresses, ",", &uri_path_len);
/* will pick using pick_first */
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
@ -450,7 +606,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
gpr_free(target_uri_str);
for (size_t i = 0; i < args->addresses->naddrs; i++) {
for (size_t i = 0; i < args->num_addresses; i++) {
gpr_free(addr_strs[i]);
}
gpr_free(addr_strs);
@ -463,7 +619,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
rr_connectivity_data *rr_connectivity =
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed,
grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
rr_connectivity);
rr_connectivity->glb_policy = glb_policy;
glb_policy->rr_connectivity = rr_connectivity;
@ -486,6 +642,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
gpr_mu_destroy(&glb_policy->mu);
lb_addrs_destroy(glb_policy->lb_addresses,
glb_policy->num_ok_serverlist_addresses);
gpr_free(glb_policy);
}
@ -546,7 +705,6 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@ -576,7 +734,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
exec_ctx, pp->pollent, glb_policy->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@ -603,12 +760,21 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
grpc_exec_ctx_sched(
exec_ctx, on_complete,
GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
"won't work without it. Failing"),
NULL);
return 1;
}
gpr_mu_lock(&glb_policy->mu);
int r;
@ -621,28 +787,34 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
glb_policy->wc_arg.wrapped_closure = on_complete;
glb_policy->wc_arg.lb_token_mdelem_storage =
pick_args->lb_token_mdelem_storage;
glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
glb_policy->wc_arg.owning_pending_node = NULL;
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg);
r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
initial_metadata, initial_metadata_flags, target,
r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
(void **)&glb_policy->wc_arg.lb_token,
&glb_policy->wrapped_on_complete);
if (r != 0) {
/* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
* policy and notify the original callback */
glb_policy->wc_arg.wrapped_closure = NULL;
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
(intptr_t)glb_policy->wc_arg.rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure,
GRPC_ERROR_NONE, NULL);
/* add the load reporting initial metadata */
initial_metadata_add_lb_token(
pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token));
}
} else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
glb_policy->base.interested_parties);
add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata,
initial_metadata_flags, target, on_complete);
add_pending_pick(&glb_policy->pending_picks, pick_args, target,
on_complete);
if (!glb_policy->started_picking) {
start_picking(exec_ctx, glb_policy);
@ -702,9 +874,6 @@ typedef struct lb_client_data {
/* called once initial metadata's been sent */
grpc_closure md_sent;
/* called once initial metadata's been received */
grpc_closure md_rcvd;
/* called once the LoadBalanceRequest has been sent to the LB server. See
* src/proto/grpc/.../load_balancer.proto */
grpc_closure req_sent;
@ -741,7 +910,6 @@ typedef struct lb_client_data {
} lb_client_data;
static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
@ -756,7 +924,6 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
gpr_mu_init(&lb_client->mu);
grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client);
grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
@ -855,23 +1022,6 @@ static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_op ops[1];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
op->flags = 0;
op->reserved = NULL;
op++;
grpc_call_error call_error = grpc_call_start_batch_and_execute(
exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
&lb_client->md_rcvd);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
lb_client_data *lb_client = arg;
GPR_ASSERT(lb_client->lb_call);
grpc_op ops[1];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = lb_client->request_payload;
@ -886,11 +1036,18 @@ static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
lb_client_data *lb_client = arg;
GPR_ASSERT(lb_client->lb_call);
grpc_op ops[1];
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &lb_client->response_payload;
op->flags = 0;
@ -909,8 +1066,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_op *op = ops;
if (lb_client->response_payload != NULL) {
/* Received data from the LB server. Look inside
* lb_client->response_payload, for
* a serverlist. */
* lb_client->response_payload, for a serverlist. */
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
@ -947,7 +1103,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
} else {
/* unref the RR policy, eventually leading to its substitution with a
* new one constructed from the received serverlist (see
* rr_connectivity_changed) */
* glb_rr_connectivity_changed) */
GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
"serverlist_received");
}
@ -1010,8 +1166,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
lb_client->status, lb_client->status_details,
lb_client->status_details_capacity);
}
/* TODO(dgq): deal with stream termination properly (fire up another one? fail
* the original call?) */
/* TODO(dgq): deal with stream termination properly (fire up another one?
* fail the original call?) */
}
/* Code wiring the policy with the rest of the core */

@ -57,6 +57,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
if (dec_arg->first_pass) { /* count how many server do we have */
grpc_grpclb_server server;
if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
}
dec_arg->num_servers++;
@ -69,6 +70,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers);
}
if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
}
dec_arg->servers[dec_arg->decoding_idx++] = server;
@ -118,6 +120,7 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse(
grpc_grpclb_response res;
memset(&res, 0, sizeof(grpc_grpclb_response));
if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) {
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
grpc_grpclb_initial_response *initial_res =
@ -145,6 +148,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
arg.first_pass = true;
status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
@ -152,6 +156,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
status =
pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}

@ -45,6 +45,7 @@ extern "C" {
#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128
typedef grpc_lb_v1_Server_ip_address_t grpc_grpclb_ip_address;
typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request;
typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response;
typedef grpc_lb_v1_Server grpc_grpclb_server;

@ -31,10 +31,11 @@
*
*/
/* Automatically generated nanopb constant definitions */
/* Generated by nanopb-0.3.5-dev */
/* Generated by nanopb-0.3.7-dev */
#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@ -72,7 +73,7 @@ const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3] = {
};
const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3] = {
PB_FIELD( 2, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v1_Duration_fields),
PB_LAST_FIELD
};
@ -84,7 +85,7 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
};
const pb_field_t grpc_lb_v1_Server_fields[5] = {
PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_request, load_balance_token, 0),
@ -116,3 +117,4 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request)
#endif
/* @@protoc_insertion_point(eof) */

@ -31,11 +31,12 @@
*
*/
/* Automatically generated nanopb header */
/* Generated by nanopb-0.3.5-dev */
/* Generated by nanopb-0.3.7-dev */
#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
#ifndef PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
#define PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
#include "third_party/nanopb/pb.h"
/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@ -52,6 +53,7 @@ typedef struct _grpc_lb_v1_ClientStats {
int64_t client_rpc_errors;
bool has_dropped_requests;
int64_t dropped_requests;
/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
} grpc_lb_v1_ClientStats;
typedef struct _grpc_lb_v1_Duration {
@ -59,22 +61,26 @@ typedef struct _grpc_lb_v1_Duration {
int64_t seconds;
bool has_nanos;
int32_t nanos;
/* @@protoc_insertion_point(struct:grpc_lb_v1_Duration) */
} grpc_lb_v1_Duration;
typedef struct _grpc_lb_v1_InitialLoadBalanceRequest {
bool has_name;
char name[128];
/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceRequest) */
} grpc_lb_v1_InitialLoadBalanceRequest;
typedef PB_BYTES_ARRAY_T(16) grpc_lb_v1_Server_ip_address_t;
typedef struct _grpc_lb_v1_Server {
bool has_ip_address;
char ip_address[46];
grpc_lb_v1_Server_ip_address_t ip_address;
bool has_port;
int32_t port;
bool has_load_balance_token;
char load_balance_token[64];
char load_balance_token[65];
bool has_drop_request;
bool drop_request;
/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
@ -82,6 +88,7 @@ typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
char load_balancer_delegate[64];
bool has_client_stats_report_interval;
grpc_lb_v1_Duration client_stats_report_interval;
/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceResponse) */
} grpc_lb_v1_InitialLoadBalanceResponse;
typedef struct _grpc_lb_v1_LoadBalanceRequest {
@ -89,12 +96,14 @@ typedef struct _grpc_lb_v1_LoadBalanceRequest {
grpc_lb_v1_InitialLoadBalanceRequest initial_request;
bool has_client_stats;
grpc_lb_v1_ClientStats client_stats;
/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceRequest) */
} grpc_lb_v1_LoadBalanceRequest;
typedef struct _grpc_lb_v1_ServerList {
pb_callback_t servers;
bool has_expiration_interval;
grpc_lb_v1_Duration expiration_interval;
/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
} grpc_lb_v1_ServerList;
typedef struct _grpc_lb_v1_LoadBalanceResponse {
@ -102,6 +111,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
grpc_lb_v1_InitialLoadBalanceResponse initial_response;
bool has_server_list;
grpc_lb_v1_ServerList server_list;
/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceResponse) */
} grpc_lb_v1_LoadBalanceResponse;
/* Default values for struct fields */
@ -114,7 +124,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_Server_init_default {false, "", false, 0, false, "", false, 0}
#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
@ -122,7 +132,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_Server_init_zero {false, "", false, 0, false, "", false, 0}
#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
/* Field tags (for use in manual encoding/decoding) */
#define grpc_lb_v1_ClientStats_total_requests_tag 1
@ -135,7 +145,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
#define grpc_lb_v1_Server_drop_request_tag 4
#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 2
#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 3
#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
@ -161,7 +171,8 @@ extern const pb_field_t grpc_lb_v1_Server_fields[5];
#define grpc_lb_v1_ClientStats_size 33
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
#define grpc_lb_v1_Server_size 127
/* grpc_lb_v1_ServerList_size depends on runtime parameters */
#define grpc_lb_v1_Server_size 98
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
@ -174,5 +185,6 @@ extern const pb_field_t grpc_lb_v1_Server_fields[5];
#ifdef __cplusplus
} /* extern "C" */
#endif
/* @@protoc_insertion_point(eof) */
#endif

@ -199,10 +199,8 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
@ -225,13 +223,13 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollent = pollent;
pp->pollent = pick_args->pollent;
pp->target = target;
pp->initial_metadata_flags = initial_metadata_flags;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@ -443,20 +441,25 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
if (args->addresses->naddrs == 0) return NULL;
if (args->num_addresses == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
p->subchannels =
gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs);
memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_addresses);
memset(p->subchannels, 0, sizeof(*p->subchannels) * args->num_addresses);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
for (size_t i = 0; i < args->addresses->naddrs; i++) {
for (size_t i = 0; i < args->num_addresses; i++) {
if (args->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
sc_args.addr =
(struct sockaddr *)(args->addresses[i].resolved_address->addr);
sc_args.addr_len = (size_t)args->addresses[i].resolved_address->len;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);

@ -66,6 +66,7 @@
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
typedef struct round_robin_lb_policy round_robin_lb_policy;
@ -76,15 +77,32 @@ int grpc_lb_round_robin_trace = 0;
* Once a pick is available, \a target is updated and \a on_complete called. */
typedef struct pending_pick {
struct pending_pick *next;
/* polling entity for the pick()'s async notification */
grpc_polling_entity *pollent;
/* output argument where to store the pick()ed user_data. It'll be NULL if no
* such data is present or there's an error (the definite test for errors is
* \a target being NULL). */
void **user_data;
/* bitmask passed to pick() and used for selective cancelling. See
* grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
/* output argument where to store the pick()ed connected subchannel, or NULL
* upon error. */
grpc_connected_subchannel **target;
/* to be invoked once the pick() has completed (regardless of success) */
grpc_closure *on_complete;
} pending_pick;
/** List of subchannels in a connectivity READY state */
typedef struct ready_list {
grpc_subchannel *subchannel;
/* references namesake entry in subchannel_data */
void *user_data;
struct ready_list *next;
struct ready_list *prev;
} ready_list;
@ -102,12 +120,21 @@ typedef struct {
ready_list *ready_list_node;
/** last observed connectivity */
grpc_connectivity_state connectivity_state;
/** the subchannel's target user data */
void *user_data;
} subchannel_data;
struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
/** total number of addresses received at creation time */
size_t num_addresses;
/** array holding the borrowed and opaque pointers to incoming user data, one
* per incoming address. These individual pointers will be returned as-is in
* successful picks. */
void **user_data_pointers;
/** all our subchannels */
size_t num_subchannels;
subchannel_data **subchannels;
@ -166,16 +193,19 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) {
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)",
p->ready_list_last_pick, p->ready_list_last_pick->subchannel);
(void *)p->ready_list_last_pick,
(void *)p->ready_list_last_pick->subchannel);
}
}
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
* csc to the list of ready subchannels. */
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
grpc_subchannel *sc) {
subchannel_data *sd) {
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
new_elem->subchannel = sc;
memset(new_elem, 0, sizeof(ready_list));
new_elem->subchannel = sd->subchannel;
new_elem->user_data = sd->user_data;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@ -189,7 +219,8 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
p->ready_list.prev = new_elem;
}
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc);
gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
(void *)new_elem, (void *)sd->subchannel);
}
return new_elem;
}
@ -216,8 +247,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
}
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node,
node->subchannel);
gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node,
(void *)node->subchannel);
}
node->next = NULL;
@ -229,9 +260,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
size_t i;
ready_list *elem;
for (i = 0; i < p->num_subchannels; i++) {
for (size_t i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
gpr_free(sd);
@ -251,6 +281,8 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(elem);
elem = tmp;
}
gpr_free(p->user_data_pointers);
gpr_free(p);
}
@ -337,7 +369,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
p->started_picking = 1;
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p,
gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p,
p->num_subchannels);
}
@ -361,38 +393,40 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
ready_list *selected;
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
/* readily available, report right away */
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
*user_data = selected->user_data;
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target,
selected);
"[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
(void *)*target, (void *)selected);
}
/* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p);
return 1;
} else {
/* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollent = pollent;
pp->pollent = pick_args->pollent;
pp->target = target;
pp->on_complete = on_complete;
pp->initial_metadata_flags = initial_metadata_flags;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->user_data = user_data;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@ -421,7 +455,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
sd->ready_list_node = add_connected_sc_locked(p, sd);
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
@ -433,12 +467,14 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target =
grpc_subchannel_get_connected_subchannel(selected->subchannel);
*pp->user_data = selected->user_data;
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
(void *)selected->subchannel, (void *)selected);
}
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
@ -570,20 +606,25 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
if (args->num_addresses == 0) return NULL;
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
p->subchannels =
gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs);
memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
p->num_addresses = args->num_addresses;
p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses);
memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
p->user_data_pointers = gpr_malloc(sizeof(void *) * p->num_addresses);
memset(p->user_data_pointers, 0, sizeof(void *) * p->num_addresses);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
for (size_t i = 0; i < args->addresses->naddrs; i++) {
for (size_t i = 0; i < p->num_addresses; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
sc_args.addr = (struct sockaddr *)args->addresses[i].resolved_address->addr;
sc_args.addr_len = args->addresses[i].resolved_address->len;
p->user_data_pointers[i] = args->addresses[i].user_data;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
@ -595,12 +636,14 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
sd->user_data = p->user_data_pointers[i];
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
}
}
if (subchannel_idx == 0) {
/* couldn't create any subchannel. Bail out */
gpr_free(p->subchannels);
gpr_free(p);
return NULL;

@ -175,10 +175,18 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy_args lb_policy_args;
result = grpc_resolver_result_create();
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = addresses;
lb_policy_args.num_addresses = addresses->naddrs;
lb_policy_args.addresses =
gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
memset(lb_policy_args.addresses, 0,
sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
for (size_t i = 0; i < addresses->naddrs; ++i) {
lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i];
}
lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
gpr_free(lb_policy_args.addresses);
if (lb_policy != NULL) {
grpc_resolver_result_set_lb_policy(result, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");

@ -125,10 +125,18 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_resolver_result *result = grpc_resolver_result_create();
grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = r->addresses;
lb_policy_args.num_addresses = r->addresses->naddrs;
lb_policy_args.addresses =
gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
memset(lb_policy_args.addresses, 0,
sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
for (size_t i = 0; i < lb_policy_args.num_addresses; ++i) {
lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i];
}
lb_policy_args.client_channel_factory = r->client_channel_factory;
grpc_lb_policy *lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
gpr_free(lb_policy_args.addresses);
grpc_resolver_result_set_lb_policy(result, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = 1;

@ -278,7 +278,7 @@ static void ref_md_locked(mdtab_shard *shard,
internal_metadata *md DEBUG_ARGS) {
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"ELM REF:%p:%d->%d: '%s' = '%s'", md,
"ELM REF:%p:%zu->%zu: '%s' = '%s'", (void *)md,
gpr_atm_no_barrier_load(&md->refcnt),
gpr_atm_no_barrier_load(&md->refcnt) + 1,
grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
@ -566,7 +566,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdstr *mkey,
shard->elems[idx] = md;
gpr_mu_init(&md->mu_user_data);
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "ELM NEW:%p:%d: '%s' = '%s'", md,
gpr_log(GPR_DEBUG, "ELM NEW:%p:%zu: '%s' = '%s'", (void *)md,
gpr_atm_no_barrier_load(&md->refcnt),
grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
grpc_mdstr_as_c_string((grpc_mdstr *)md->value));
@ -639,7 +639,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) {
if (is_mdelem_static(gmd)) return gmd;
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"ELM REF:%p:%d->%d: '%s' = '%s'", md,
"ELM REF:%p:%zu->%zu: '%s' = '%s'", (void *)md,
gpr_atm_no_barrier_load(&md->refcnt),
gpr_atm_no_barrier_load(&md->refcnt) + 1,
grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
@ -649,7 +649,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) {
this function - meaning that no adjustment to mdtab_free is necessary,
simplifying the logic here to be just an atomic increment */
/* use C assert to have this removed in opt builds */
assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1);
GPR_ASSERT(gpr_atm_no_barrier_load(&md->refcnt) >= 1);
gpr_atm_no_barrier_fetch_add(&md->refcnt, 1);
return gmd;
}
@ -660,14 +660,16 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) {
if (is_mdelem_static(gmd)) return;
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"ELM UNREF:%p:%d->%d: '%s' = '%s'", md,
"ELM UNREF:%p:%zu->%zu: '%s' = '%s'", (void *)md,
gpr_atm_no_barrier_load(&md->refcnt),
gpr_atm_no_barrier_load(&md->refcnt) - 1,
grpc_mdstr_as_c_string((grpc_mdstr *)md->key),
grpc_mdstr_as_c_string((grpc_mdstr *)md->value));
#endif
uint32_t hash = GRPC_MDSTR_KV_HASH(md->key->hash, md->value->hash);
if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) {
const gpr_atm prev_refcount = gpr_atm_full_fetch_add(&md->refcnt, -1);
GPR_ASSERT(prev_refcount >= 1);
if (1 == prev_refcount) {
/* once the refcount hits zero, some other thread can come along and
free md at any time: it's unsafe from this point on to access it */
mdtab_shard *shard =
@ -676,10 +678,12 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) {
}
}
const char *grpc_mdstr_as_c_string(grpc_mdstr *s) {
const char *grpc_mdstr_as_c_string(const grpc_mdstr *s) {
return (const char *)GPR_SLICE_START_PTR(s->slice);
}
size_t grpc_mdstr_length(const grpc_mdstr *s) { return GRPC_MDSTR_LENGTH(s); }
grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *gs DEBUG_ARGS) {
internal_string *s = (internal_string *)gs;
if (is_mdstr_static(gs)) return gs;

@ -147,7 +147,7 @@ void grpc_mdelem_unref(grpc_mdelem *md);
/* Recover a char* from a grpc_mdstr. The returned string is null terminated.
Does not promise that the returned string has no embedded nulls however. */
const char *grpc_mdstr_as_c_string(grpc_mdstr *s);
const char *grpc_mdstr_as_c_string(const grpc_mdstr *s);
#define GRPC_MDSTR_LENGTH(s) (GPR_SLICE_LENGTH(s->slice))

@ -1,6 +1,6 @@
grpc.lb.v1.InitialLoadBalanceRequest.name max_size:128
grpc.lb.v1.InitialLoadBalanceResponse.client_config max_size:64
grpc.lb.v1.InitialLoadBalanceResponse.load_balancer_delegate max_size:64
grpc.lb.v1.Server.ip_address max_size:46
grpc.lb.v1.Server.load_balance_token max_size:64
grpc.lb.v1.Server.ip_address max_size:16
grpc.lb.v1.Server.load_balance_token max_size:65
load_balancer.proto no_unions:true

@ -32,7 +32,6 @@ syntax = "proto3";
package grpc.lb.v1;
message Duration {
// Signed seconds of the span of time. Must be from -315,576,000,000
// to +315,576,000,000 inclusive.
int64 seconds = 1;
@ -93,16 +92,11 @@ message LoadBalanceResponse {
}
message InitialLoadBalanceResponse {
oneof initial_response_type {
// TODO(zhangkun83): ClientConfig not yet defined
//ClientConfig client_config = 1;
// This is an application layer redirect that indicates the client should
// use the specified server for load balancing. When this field is set in
// the response, the client should open a separate connection to the
// load_balancer_delegate and call the BalanceLoad method.
string load_balancer_delegate = 2;
}
// This is an application layer redirect that indicates the client should use
// the specified server for load balancing. When this field is non-empty in
// the response, the client should open a separate connection to the
// load_balancer_delegate and call the BalanceLoad method.
string load_balancer_delegate = 1;
// This interval defines how often the client should send the client stats
// to the load balancer. Stats should only be reported when the duration is
@ -125,14 +119,17 @@ message ServerList {
}
message Server {
// A resolved address and port for the server. The IP address string may
// A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address.
string ip_address = 1;
bytes ip_address = 1;
// A resolved port number for the server.
int32 port = 2;
// An opaque token that is passed from the client to the server in metadata.
// The server may expect this token to indicate that the request from the
// client was load balanced.
// An opaque but printable token given to the frontend for each pick. All
// frontend requests for that pick must include the token in its initial
// metadata. The token is used by the backend to verify the request and to
// allow the backend to report load to the gRPC LB system.
string load_balance_token = 3;
// Indicates whether this particular request should be dropped by the client

@ -31,10 +31,12 @@
*
*/
#include <grpc++/impl/codegen/config.h>
#include <gtest/gtest.h>
#include <string>
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/proto/grpc/lb/v1/load_balancer.pb.h" // C++ version
namespace grpc {
@ -45,8 +47,28 @@ using grpc::lb::v1::LoadBalanceResponse;
class GrpclbTest : public ::testing::Test {};
grpc::string Ip4ToPackedString(const char* ip_str) {
struct in_addr ip4;
GPR_ASSERT(inet_pton(AF_INET, ip_str, &ip4) == 1);
return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4));
}
grpc::string PackedStringToIp(const grpc_grpclb_ip_address& pb_ip) {
char ip_str[46] = {0};
int af = -1;
if (pb_ip.size == 4) {
af = AF_INET;
} else if (pb_ip.size == 16) {
af = AF_INET6;
} else {
abort();
}
GPR_ASSERT(inet_ntop(af, pb_ip.bytes, ip_str, 46) != NULL);
return ip_str;
}
TEST_F(GrpclbTest, CreateRequest) {
const std::string service_name = "AServiceName";
const grpc::string service_name = "AServiceName";
LoadBalanceRequest request;
grpc_grpclb_request* c_req = grpc_grpclb_request_create(service_name.c_str());
gpr_slice slice = grpc_grpclb_request_encode(c_req);
@ -65,7 +87,7 @@ TEST_F(GrpclbTest, ParseInitialResponse) {
initial_response->mutable_client_stats_report_interval();
client_stats_report_interval->set_seconds(123);
client_stats_report_interval->set_nanos(456);
const std::string encoded_response = response.SerializeAsString();
const grpc::string encoded_response = response.SerializeAsString();
gpr_slice encoded_slice =
gpr_slice_from_copied_string(encoded_response.c_str());
@ -82,29 +104,31 @@ TEST_F(GrpclbTest, ParseResponseServerList) {
LoadBalanceResponse response;
auto* serverlist = response.mutable_server_list();
auto* server = serverlist->add_servers();
server->set_ip_address("127.0.0.1");
server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
server->set_port(12345);
server->set_drop_request(true);
server = response.mutable_server_list()->add_servers();
server->set_ip_address("10.0.0.1");
server->set_ip_address(Ip4ToPackedString("10.0.0.1"));
server->set_port(54321);
server->set_drop_request(false);
auto* expiration_interval = serverlist->mutable_expiration_interval();
expiration_interval->set_seconds(888);
expiration_interval->set_nanos(999);
const std::string encoded_response = response.SerializeAsString();
gpr_slice encoded_slice =
gpr_slice_from_copied_string(encoded_response.c_str());
const grpc::string encoded_response = response.SerializeAsString();
const gpr_slice encoded_slice = gpr_slice_from_copied_buffer(
encoded_response.data(), encoded_response.size());
grpc_grpclb_serverlist* c_serverlist =
grpc_grpclb_response_parse_serverlist(encoded_slice);
ASSERT_EQ(c_serverlist->num_servers, 2ul);
EXPECT_TRUE(c_serverlist->servers[0]->has_ip_address);
EXPECT_TRUE(strcmp(c_serverlist->servers[0]->ip_address, "127.0.0.1") == 0);
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address),
"127.0.0.1");
EXPECT_EQ(c_serverlist->servers[0]->port, 12345);
EXPECT_TRUE(c_serverlist->servers[0]->drop_request);
EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address);
EXPECT_TRUE(strcmp(c_serverlist->servers[1]->ip_address, "10.0.0.1") == 0);
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1");
EXPECT_EQ(c_serverlist->servers[1]->port, 54321);
EXPECT_FALSE(c_serverlist->servers[1]->drop_request);

@ -37,7 +37,10 @@
#include <cstring>
#include <string>
#include <gtest/gtest.h>
#include <grpc/grpc.h>
#include <grpc/impl/codegen/byte_buffer_reader.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
@ -46,9 +49,11 @@
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc++/impl/codegen/config.h>
extern "C" {
#include "src/core/ext/client_config/client_channel.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/support/tmpfile.h"
#include "src/core/lib/surface/channel.h"
@ -61,16 +66,19 @@ extern "C" {
#include "src/proto/grpc/lb/v1/load_balancer.pb.h"
#define NUM_BACKENDS 4
#define PAYLOAD "hello you"
// TODO(dgq): Other scenarios in need of testing:
// - Send an empty serverlist update and verify that the client request blocks
// until a new serverlist with actual contents is available.
// - Send identical serverlist update
// - Send a serverlist with faulty ip:port addresses (port > 2^16, etc).
// - Test reception of invalid serverlist
// - Test pinging
// - Test against a non-LB server. That server should return UNIMPLEMENTED and
// the call should fail.
// - Random LB server closing the stream unexpectedly.
// - Test using DNS-resolvable names (localhost?)
namespace grpc {
namespace {
@ -105,8 +113,8 @@ static gpr_slice build_response_payload_slice(
int64_t expiration_interval_secs, int32_t expiration_interval_nanos) {
// server_list {
// servers {
// ip_address: "127.0.0.1"
// port: ...
// ip_address: <in_addr/6 bytes of an IP>
// port: <16 bit uint>
// load_balance_token: "token..."
// }
// ...
@ -125,21 +133,21 @@ static gpr_slice build_response_payload_slice(
}
for (size_t i = 0; i < nports; i++) {
auto *server = serverlist->add_servers();
server->set_ip_address(host);
// TODO(dgq): test ipv6
struct in_addr ip4;
GPR_ASSERT(inet_pton(AF_INET, host, &ip4) == 1);
server->set_ip_address(
grpc::string(reinterpret_cast<const char *>(&ip4), sizeof(ip4)));
server->set_port(ports[i]);
// The following long long int cast is meant to work around the
// disfunctional implementation of std::to_string in gcc 4.4, which doesn't
// have a version for int but does have one for long long int.
server->set_load_balance_token("token" +
std::to_string((long long int)ports[i]));
string token_data = "token" + std::to_string((long long int)ports[i]);
token_data.resize(64, '-');
server->set_load_balance_token(token_data);
}
gpr_log(GPR_INFO, "generating response: %s",
response.ShortDebugString().c_str());
const gpr_slice response_slice =
gpr_slice_from_copied_string(response.SerializeAsString().c_str());
return response_slice;
const grpc::string &enc_resp = response.SerializeAsString();
return gpr_slice_from_copied_buffer(enc_resp.data(), enc_resp.size());
}
static void drain_cq(grpc_completion_queue *cq) {
@ -181,20 +189,9 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports,
cq_verify(cqv);
gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport);
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport);
// make sure we've received the initial metadata from the grpclb request.
GPR_ASSERT(request_metadata_recv.count > 0);
GPR_ASSERT(request_metadata_recv.metadata != NULL);
// receive request for backends
op = ops;
@ -208,9 +205,36 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports,
CQ_EXPECT_COMPLETION(cqv, tag(202), 1);
cq_verify(cqv);
gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport);
// TODO(dgq): validate request.
// validate initial request.
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, request_payload_recv);
gpr_slice request_payload_slice = grpc_byte_buffer_reader_readall(&bbr);
grpc::lb::v1::LoadBalanceRequest request;
request.ParseFromArray(GPR_SLICE_START_PTR(request_payload_slice),
GPR_SLICE_LENGTH(request_payload_slice));
GPR_ASSERT(request.has_initial_request());
GPR_ASSERT(request.initial_request().name() == "load.balanced.service.name");
gpr_slice_unref(request_payload_slice);
grpc_byte_buffer_reader_destroy(&bbr);
grpc_byte_buffer_destroy(request_payload_recv);
gpr_slice response_payload_slice;
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport);
for (int i = 0; i < 2; i++) {
if (i == 0) {
// First half of the ports.
@ -303,6 +327,16 @@ static void start_backend_server(server_fixture *sf) {
return;
}
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
// The following long long int cast is meant to work around the
// disfunctional implementation of std::to_string in gcc 4.4, which doesn't
// have a version for int but does have one for long long int.
string expected_token = "token" + std::to_string((long long int)sf->port);
expected_token.resize(64, '-');
GPR_ASSERT(contains_metadata(&request_metadata_recv,
"load-reporting-initial",
expected_token.c_str()));
gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport);
op = ops;
@ -321,8 +355,7 @@ static void start_backend_server(server_fixture *sf) {
gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport);
bool exit = false;
gpr_slice response_payload_slice =
gpr_slice_from_copied_string("hello you");
gpr_slice response_payload_slice = gpr_slice_from_copied_string(PAYLOAD);
while (!exit) {
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
@ -474,10 +507,9 @@ static void perform_request(client_fixture *cf) {
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
peer = grpc_call_get_peer(c);
CQ_EXPECT_COMPLETION(cqv, tag(2), 1);
cq_verify(cqv);
gpr_free(peer);
GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD));
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload_recv);
@ -583,27 +615,30 @@ static void fork_lb_server(void *arg) {
tf->lb_server_update_delay_ms);
}
static void setup_test_fixture(test_fixture *tf,
int lb_server_update_delay_ms) {
tf->lb_server_update_delay_ms = lb_server_update_delay_ms;
static test_fixture setup_test_fixture(int lb_server_update_delay_ms) {
test_fixture tf;
memset(&tf, 0, sizeof(tf));
tf.lb_server_update_delay_ms = lb_server_update_delay_ms;
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
for (int i = 0; i < NUM_BACKENDS; ++i) {
setup_server("127.0.0.1", &tf->lb_backends[i]);
gpr_thd_new(&tf->lb_backends[i].tid, fork_backend_server,
&tf->lb_backends[i], &options);
setup_server("127.0.0.1", &tf.lb_backends[i]);
gpr_thd_new(&tf.lb_backends[i].tid, fork_backend_server, &tf.lb_backends[i],
&options);
}
setup_server("127.0.0.1", &tf->lb_server);
gpr_thd_new(&tf->lb_server.tid, fork_lb_server, &tf->lb_server, &options);
setup_server("127.0.0.1", &tf.lb_server);
gpr_thd_new(&tf.lb_server.tid, fork_lb_server, &tf.lb_server, &options);
char *server_uri;
gpr_asprintf(&server_uri, "ipv4:%s?lb_policy=grpclb&lb_enabled=1",
tf->lb_server.servers_hostport);
setup_client(server_uri, &tf->client);
tf.lb_server.servers_hostport);
setup_client(server_uri, &tf.client);
gpr_free(server_uri);
return tf;
}
static void teardown_test_fixture(test_fixture *tf) {
@ -614,19 +649,13 @@ static void teardown_test_fixture(test_fixture *tf) {
teardown_server(&tf->lb_server);
}
// The LB server will send two updates: batch 1 and batch 2. Each batch
// contains
// two addresses, both of a valid and running backend server. Batch 1 is
// readily
// available and provided as soon as the client establishes the streaming
// call.
// Batch 2 is sent after a delay of \a lb_server_update_delay_ms
// milliseconds.
// The LB server will send two updates: batch 1 and batch 2. Each batch contains
// two addresses, both of a valid and running backend server. Batch 1 is readily
// available and provided as soon as the client establishes the streaming call.
// Batch 2 is sent after a delay of \a lb_server_update_delay_ms milliseconds.
static test_fixture test_update(int lb_server_update_delay_ms) {
gpr_log(GPR_INFO, "start %s(%d)", __func__, lb_server_update_delay_ms);
test_fixture tf;
memset(&tf, 0, sizeof(tf));
setup_test_fixture(&tf, lb_server_update_delay_ms);
test_fixture tf = setup_test_fixture(lb_server_update_delay_ms);
perform_request(
&tf.client); // "consumes" 1st backend server of 1st serverlist
perform_request(
@ -642,13 +671,7 @@ static test_fixture test_update(int lb_server_update_delay_ms) {
return tf;
}
} // namespace
} // namespace grpc
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
TEST(GrpclbTest, Updates) {
grpc::test_fixture tf_result;
// Clients take a bit over one second to complete a call (the last part of the
// call sleeps for 1 second while verifying the client's completion queue is
@ -683,7 +706,18 @@ int main(int argc, char **argv) {
GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced > 0);
GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced > 0);
GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0);
}
TEST(GrpclbTest, InvalidAddressInServerlist) {}
} // namespace
} // namespace grpc
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_test_init(argc, argv);
grpc_init();
const auto result = RUN_ALL_TESTS();
grpc_shutdown();
return 0;
return result;
}

@ -1 +1 @@
Subproject commit f8ac463766281625ad710900479130c7fcb4d63b
Subproject commit 68a86e96481e6bea987df8de47027847b30c325b

@ -123,7 +123,7 @@ popd
# this should be the same version as the submodule we compile against
# ideally we'd update this as a template to ensure that
pip install protobuf==3.0.0b2
pip install protobuf==3.0.0
pushd "$(dirname $INPUT_PROTO)" > /dev/null

@ -44,7 +44,7 @@ cat << EOF | awk '{ print $1 }' | sort > $want_submodules
c880e42ba1c8032d4cdde2aba0541d8a9d9fa2e9 third_party/boringssl (version_for_cocoapods_2.0-100-gc880e42)
05b155ff59114735ec8cd089f669c4c3d8f59029 third_party/gflags (v2.1.0-45-g05b155f)
c99458533a9b4c743ed51537e25989ea55944908 third_party/googletest (release-1.7.0)
f8ac463766281625ad710900479130c7fcb4d63b third_party/nanopb (nanopb-0.3.4-29-gf8ac463)
68a86e96481e6bea987df8de47027847b30c325b third_party/nanopb (nanopb-0.3.6-6-g68a86e9)
bba446bbf2ac7b0b9923d4eb07d5acd0665a8cf0 third_party/protobuf (v3.0.0-beta-4-160-gbba446b)
50893291621658f355bc5b4d450a8d06a563053d third_party/zlib (v1.2.8)
bcad91771b7f0bff28a1cac1981d7ef2b9bcef3c third_party/thrift

Loading…
Cancel
Save