Refactor code for generating balancer channel args.

pull/14455/head
Mark D. Roth 7 years ago
parent 824b21e13a
commit bd0f15119a
  1. 115
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 52
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc
  3. 24
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
  4. 133
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
  5. 2
      test/cpp/end2end/grpclb_end2end_test.cc

@ -918,34 +918,8 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// helper code for creating balancer channel
//
// Helper function to construct a target info entry.
grpc_slice_hash_table_entry BalancerEntryCreate(const char* address,
const char* balancer_name) {
grpc_slice_hash_table_entry entry;
entry.key = grpc_slice_from_copied_string(address);
entry.value = gpr_strdup(balancer_name);
return entry;
}
// Comparison function used for slice_hash_table vtable.
int BalancerNameCmp(void* a, void* b) {
const char* a_str = static_cast<const char*>(a);
const char* b_str = static_cast<const char*>(b);
return strcmp(a_str, b_str);
}
/* Returns the channel args for the LB channel, used to create a bidirectional
* stream for the reception of load balancing updates.
*
* Inputs:
* - \a addresses: corresponding to the balancers.
* - \a response_generator: in order to propagate updates from the resolver
* above the grpclb policy.
* - \a args: other args inherited from the grpclb policy. */
grpc_channel_args* BuildBalancerChannelArgs(
const grpc_lb_addresses* addresses,
FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
grpc_lb_addresses* ExtractBalancerAddresses(
const grpc_lb_addresses* addresses) {
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
@ -955,9 +929,6 @@ grpc_channel_args* BuildBalancerChannelArgs(
GPR_ASSERT(num_grpclb_addrs > 0);
grpc_lb_addresses* lb_addresses =
grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
grpc_slice_hash_table_entry* targets_info_entries =
(grpc_slice_hash_table_entry*)gpr_zalloc(sizeof(*targets_info_entries) *
num_grpclb_addrs);
size_t lb_addresses_idx = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (!addresses->addresses[i].is_balancer) continue;
@ -965,32 +936,71 @@ grpc_channel_args* BuildBalancerChannelArgs(
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
char* addr_str;
GPR_ASSERT(grpc_sockaddr_to_string(
&addr_str, &addresses->addresses[i].address, true) > 0);
targets_info_entries[lb_addresses_idx] =
BalancerEntryCreate(addr_str, addresses->addresses[i].balancer_name);
gpr_free(addr_str);
grpc_lb_addresses_set_address(
lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
addresses->addresses[i].address.len, false /* is balancer */,
addresses->addresses[i].balancer_name, nullptr /* user data */);
}
GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
grpc_slice_hash_table* targets_info = grpc_slice_hash_table_create(
num_grpclb_addrs, targets_info_entries, gpr_free, BalancerNameCmp);
gpr_free(targets_info_entries);
grpc_channel_args* lb_channel_args =
grpc_lb_policy_grpclb_build_lb_channel_args(targets_info,
response_generator, args);
grpc_arg lb_channel_addresses_arg =
grpc_lb_addresses_create_channel_arg(lb_addresses);
grpc_channel_args* result = grpc_channel_args_copy_and_add(
lb_channel_args, &lb_channel_addresses_arg, 1);
grpc_slice_hash_table_unref(targets_info);
grpc_channel_args_destroy(lb_channel_args);
return lb_addresses;
}
/* Returns the channel args for the LB channel, used to create a bidirectional
* stream for the reception of load balancing updates.
*
* Inputs:
* - \a addresses: corresponding to the balancers.
* - \a response_generator: in order to propagate updates from the resolver
* above the grpclb policy.
* - \a args: other args inherited from the grpclb policy. */
grpc_channel_args* BuildBalancerChannelArgs(
const grpc_lb_addresses* addresses,
FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
// Channel args to remove.
static const char* args_to_remove[] = {
// LB policy name, since we want to use the default (pick_first) in
// the LB channel.
GRPC_ARG_LB_POLICY_NAME,
// The channel arg for the server URI, since that will be different for
// the LB channel than for the parent channel. The client channel
// factory will re-add this arg with the right value.
GRPC_ARG_SERVER_URI,
// The resolved addresses, which will be generated by the name resolver
// used in the LB channel. Note that the LB channel will use the fake
// resolver, so this won't actually generate a query to DNS (or some
// other name service). However, the addresses returned by the fake
// resolver will have is_balancer=false, whereas our own addresses have
// is_balancer=true. We need the LB channel to return addresses with
// is_balancer=false so that it does not wind up recursively using the
// grpclb LB policy, as per the special case logic in client_channel.c.
GRPC_ARG_LB_ADDRESSES,
// The fake resolver response generator, because we are replacing it
// with the one from the grpclb policy, used to propagate updates to
// the LB channel.
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
};
// Channel args to add.
const grpc_arg args_to_add[] = {
// New LB addresses.
// Note that we pass these in both when creating the LB channel
// and via the fake resolver. The latter is what actually gets used.
grpc_lb_addresses_create_channel_arg(lb_addresses),
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
response_generator),
};
// Construct channel args.
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
// Make any necessary modifications for security.
new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
// Clean up.
grpc_lb_addresses_destroy(lb_addresses);
return result;
return new_args;
}
//
@ -1292,8 +1302,9 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
if (lb_channel_ == nullptr) {
char* uri_str;
gpr_asprintf(&uri_str, "fake:///%s", server_name_);
lb_channel_ = grpc_lb_policy_grpclb_create_lb_channel(
uri_str, client_channel_factory(), lb_channel_args);
lb_channel_ = grpc_client_channel_factory_create_channel(
client_channel_factory(), uri_str,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
GPR_ASSERT(lb_channel_ != nullptr);
gpr_free(uri_str);
}

@ -16,57 +16,9 @@
*
*/
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
grpc_channel* grpc_lb_policy_grpclb_create_lb_channel(
const char* lb_service_target_addresses,
grpc_client_channel_factory* client_channel_factory,
grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args(
grpc_channel_args* args) {
grpc_channel* lb_channel = grpc_client_channel_factory_create_channel(
client_channel_factory, lb_service_target_addresses,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, args);
return lb_channel;
}
grpc_channel_args* grpc_lb_policy_grpclb_build_lb_channel_args(
grpc_slice_hash_table* targets_info,
grpc_core::FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
const grpc_arg to_add[] = {
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
response_generator)};
/* We remove:
*
* - The channel arg for the LB policy name, since we want to use the default
* (pick_first) in this case.
*
* - The channel arg for the resolved addresses, since that will be generated
* by the name resolver used in the LB channel. Note that the LB channel
* will use the fake resolver, so this won't actually generate a query
* to DNS (or some other name service). However, the addresses returned by
* the fake resolver will have is_balancer=false, whereas our own
* addresses have is_balancer=true. We need the LB channel to return
* addresses with is_balancer=false so that it does not wind up recursively
* using the grpclb LB policy, as per the special case logic in
* client_channel.c.
*
* - The channel arg for the server URI, since that will be different for the
* LB channel than for the parent channel (the client channel factory will
* re-add this arg with the right value).
*
* - The fake resolver generator, because we are replacing it with the one
* from the grpclb policy, used to propagate updates to the LB channel. */
static const char* keys_to_remove[] = {
GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI,
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
return grpc_channel_args_copy_and_add_and_remove(
args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add,
GPR_ARRAY_SIZE(to_add));
return args;
}

@ -20,25 +20,15 @@
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/slice/slice_hash_table.h"
/** Create the channel used for communicating with an LB service.
* Note that an LB *service* may be comprised of several LB *servers*.
*
* \a lb_service_target_addresses is the target URI containing the addresses
* from resolving the LB service's name (eg, ipv4:10.0.0.1:1234,10.2.3.4:9876).
* \a client_channel_factory will be used for the creation of the LB channel,
* alongside the channel args passed in \a args. */
grpc_channel* grpc_lb_policy_grpclb_create_lb_channel(
const char* lb_service_target_addresses,
grpc_client_channel_factory* client_channel_factory,
/// Makes any necessary modifications to \a args for use in the grpclb
/// balancer channel.
///
/// Takes ownership of \a args.
///
/// Caller takes ownership of the returned args.
grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args(
grpc_channel_args* args);
grpc_channel_args* grpc_lb_policy_grpclb_build_lb_channel_args(
grpc_slice_hash_table* targets_info,
grpc_core::FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H \
*/

@ -16,11 +16,14 @@
*
*/
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@ -28,73 +31,81 @@
#include "src/core/lib/security/transport/lb_targets_info.h"
#include "src/core/lib/slice/slice_internal.h"
grpc_channel* grpc_lb_policy_grpclb_create_lb_channel(
const char* lb_service_target_addresses,
grpc_client_channel_factory* client_channel_factory,
static void destroy_balancer_name(void* balancer_name) {
gpr_free(balancer_name);
}
static grpc_slice_hash_table_entry targets_info_entry_create(
const char* address, const char* balancer_name) {
grpc_slice_hash_table_entry entry;
entry.key = grpc_slice_from_copied_string(address);
entry.value = gpr_strdup(balancer_name);
return entry;
}
static int balancer_name_cmp_fn(void* a, void* b) {
const char* a_str = static_cast<const char*>(a);
const char* b_str = static_cast<const char*>(b);
return strcmp(a_str, b_str);
}
static grpc_slice_hash_table* build_targets_info_table(
grpc_lb_addresses* addresses) {
grpc_slice_hash_table_entry* targets_info_entries =
static_cast<grpc_slice_hash_table_entry*>(
gpr_zalloc(sizeof(*targets_info_entries) * addresses->num_addresses));
for (size_t i = 0; i < addresses->num_addresses; ++i) {
char* addr_str;
GPR_ASSERT(grpc_sockaddr_to_string(
&addr_str, &addresses->addresses[i].address, true) > 0);
targets_info_entries[i] = targets_info_entry_create(
addr_str, addresses->addresses[i].balancer_name);
gpr_free(addr_str);
}
grpc_slice_hash_table* targets_info = grpc_slice_hash_table_create(
addresses->num_addresses, targets_info_entries, destroy_balancer_name,
balancer_name_cmp_fn);
gpr_free(targets_info_entries);
return targets_info;
}
grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args(
grpc_channel_args* args) {
grpc_channel_args* new_args = args;
const char* args_to_remove[1];
size_t num_args_to_remove = 0;
grpc_arg args_to_add[2];
size_t num_args_to_add = 0;
// Add arg for targets info table.
const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES);
GPR_ASSERT(arg != nullptr);
GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
grpc_lb_addresses* addresses =
static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
grpc_slice_hash_table* targets_info = build_targets_info_table(addresses);
args_to_add[num_args_to_add++] =
grpc_lb_targets_info_create_channel_arg(targets_info);
// Substitute the channel credentials with a version without call
// credentials: the load balancer is not necessarily trusted to handle
// bearer token credentials.
grpc_channel_credentials* channel_credentials =
grpc_channel_credentials_find_in_args(args);
grpc_channel_credentials* creds_sans_call_creds = nullptr;
if (channel_credentials != nullptr) {
/* Substitute the channel credentials with a version without call
* credentials: the load balancer is not necessarily trusted to handle
* bearer token credentials */
static const char* keys_to_remove[] = {GRPC_ARG_CHANNEL_CREDENTIALS};
grpc_channel_credentials* creds_sans_call_creds =
creds_sans_call_creds =
grpc_channel_credentials_duplicate_without_call_credentials(
channel_credentials);
GPR_ASSERT(creds_sans_call_creds != nullptr);
grpc_arg args_to_add[] = {
grpc_channel_credentials_to_arg(creds_sans_call_creds)};
/* Create the new set of channel args */
new_args = grpc_channel_args_copy_and_add_and_remove(
args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
grpc_channel_credentials_unref(creds_sans_call_creds);
args_to_remove[num_args_to_remove++] = GRPC_ARG_CHANNEL_CREDENTIALS;
args_to_add[num_args_to_add++] =
grpc_channel_credentials_to_arg(creds_sans_call_creds);
}
grpc_channel* lb_channel = grpc_client_channel_factory_create_channel(
client_channel_factory, lb_service_target_addresses,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args);
if (channel_credentials != nullptr) {
grpc_channel_args_destroy(new_args);
grpc_channel_args* result = grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add);
// Clean up.
grpc_channel_args_destroy(args);
grpc_slice_hash_table_unref(targets_info);
if (creds_sans_call_creds != nullptr) {
grpc_channel_credentials_unref(creds_sans_call_creds);
}
return lb_channel;
}
grpc_channel_args* grpc_lb_policy_grpclb_build_lb_channel_args(
grpc_slice_hash_table* targets_info,
grpc_core::FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
const grpc_arg to_add[] = {
grpc_lb_targets_info_create_channel_arg(targets_info),
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
response_generator)};
/* We remove:
*
* - The channel arg for the LB policy name, since we want to use the default
* (pick_first) in this case.
*
* - The channel arg for the resolved addresses, since that will be generated
* by the name resolver used in the LB channel. Note that the LB channel
* will use the fake resolver, so this won't actually generate a query
* to DNS (or some other name service). However, the addresses returned by
* the fake resolver will have is_balancer=false, whereas our own
* addresses have is_balancer=true. We need the LB channel to return
* addresses with is_balancer=false so that it does not wind up recursively
* using the grpclb LB policy, as per the special case logic in
* client_channel.c.
*
* - The channel arg for the server URI, since that will be different for the
* LB channel than for the parent channel (the client channel factory will
* re-add this arg with the right value).
*
* - The fake resolver generator, because we are replacing it with the one
* from the grpclb policy, used to propagate updates to the LB channel. */
static const char* keys_to_remove[] = {
GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI,
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
/* Add the targets info table to be used for secure naming */
return grpc_channel_args_copy_and_add_and_remove(
args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add,
GPR_ARRAY_SIZE(to_add));
return result;
}

@ -58,6 +58,8 @@
// - Test handling of creation of faulty RR instance by having the LB return a
// serverlist with non-existent backends after having initially returned a
// valid one.
// - test using secure credentials and make sure we don't send call
// credentials to the balancer
//
// Findings from end to end testing to be covered here:
// - Handling of LB servers restart, including reconnection after backing-off

Loading…
Cancel
Save