Merge pull request #11761 from dgquintas/lb_update_connector_fix

RR: fix subchannel list handing
pull/9090/merge
David G. Quintas 8 years ago committed by GitHub
commit 0c9ce72218
  1. 159
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
  2. 7
      src/core/ext/filters/client_channel/subchannel_index.c
  3. 12
      src/core/ext/filters/client_channel/subchannel_index.h
  4. 5
      test/cpp/end2end/client_lb_end2end_test.cc

@ -142,6 +142,21 @@ struct rr_subchannel_list {
bool shutting_down;
};
static rr_subchannel_list *rr_subchannel_list_create(round_robin_lb_policy *p,
size_t num_subchannels) {
rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list));
subchannel_list->policy = p;
subchannel_list->subchannels =
gpr_zalloc(sizeof(subchannel_data) * num_subchannels);
subchannel_list->num_subchannels = num_subchannels;
gpr_ref_init(&subchannel_list->refcount, 1);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Created subchannel list %p for %lu subchannels",
(void *)p, (void *)subchannel_list, (unsigned long)num_subchannels);
}
return subchannel_list;
}
static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
rr_subchannel_list *subchannel_list) {
GPR_ASSERT(subchannel_list->shutting_down);
@ -171,9 +186,9 @@ static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list,
gpr_ref_non_zero(&subchannel_list->refcount);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu",
gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu (%s)",
(void *)subchannel_list->policy, (void *)subchannel_list,
(unsigned long)(count - 1), (unsigned long)count);
(unsigned long)(count - 1), (unsigned long)count, reason);
}
}
@ -183,9 +198,9 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
const bool done = gpr_unref(&subchannel_list->refcount);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu",
gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu (%s)",
(void *)subchannel_list->policy, (void *)subchannel_list,
(unsigned long)(count + 1), (unsigned long)count);
(unsigned long)(count + 1), (unsigned long)count, reason);
}
if (done) {
rr_subchannel_list_destroy(exec_ctx, subchannel_list);
@ -194,19 +209,13 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The
* watcher's callback will ultimately unref \a subchannel_list. */
static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
rr_subchannel_list *subchannel_list,
static void rr_subchannel_list_shutdown_and_unref(
grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list,
const char *reason) {
if (subchannel_list->shutting_down) {
GPR_ASSERT(!subchannel_list->shutting_down);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Subchannel list %p already shutting down",
(void *)subchannel_list);
}
return;
};
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p",
(void *)subchannel_list);
gpr_log(GPR_DEBUG, "[RR %p] Shutting down subchannel_list %p (%s)",
(void *)subchannel_list->policy, (void *)subchannel_list, reason);
}
GPR_ASSERT(!subchannel_list->shutting_down);
subchannel_list->shutting_down = true;
@ -214,10 +223,12 @@ static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
subchannel_data *sd = &subchannel_list->subchannels[i];
if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe.
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
"Unsubscribing from subchannel %p as part of shutting down "
gpr_log(
GPR_DEBUG,
"[RR %p] Unsubscribing from subchannel %p as part of shutting down "
"subchannel_list %p",
(void *)sd->subchannel, (void *)subchannel_list);
(void *)subchannel_list->policy, (void *)sd->subchannel,
(void *)subchannel_list);
}
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
NULL,
@ -294,7 +305,8 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p,
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol);
gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p",
(void *)pol, (void *)pol);
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p);
@ -303,7 +315,8 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol);
gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p",
(void *)pol, (void *)pol);
}
p->shutdown = true;
pending_pick *pp;
@ -318,9 +331,18 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
const bool latest_is_current =
p->subchannel_list == p->latest_pending_subchannel_list;
rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
"sl_shutdown_rr_shutdown");
p->subchannel_list = NULL;
if (!latest_is_current && p->latest_pending_subchannel_list != NULL &&
!p->latest_pending_subchannel_list->shutting_down) {
rr_subchannel_list_shutdown_and_unref(exec_ctx,
p->latest_pending_subchannel_list,
"sl_shutdown_pending_rr_shutdown");
p->latest_pending_subchannel_list = NULL;
}
}
static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
@ -376,8 +398,8 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx,
p->started_picking = true;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
subchannel_data *sd = &p->subchannel_list->subchannels[i];
GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
rr_subchannel_list_ref(sd->subchannel_list, "start_picking");
GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked");
rr_subchannel_list_ref(sd->subchannel_list, "started_picking");
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->pending_connectivity_state_unsafe,
@ -399,7 +421,7 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol);
}
if (p->subchannel_list != NULL) {
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
@ -415,8 +437,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(
GPR_DEBUG,
"[RR %p] PICKED TARGET <-- SUBCHANNEL %p (CONNECTED %p) (SL %p, "
"INDEX %lu)",
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
"index %lu)",
(void *)p, (void *)sd->subchannel, (void *)*target,
(void *)sd->subchannel_list, (unsigned long)next_ready_index);
}
@ -545,22 +567,27 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
// If the policy is shutting down, unref and return.
if (p->shutdown) {
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown");
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
"pol_shutdown+started_picking");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown");
return;
}
if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) {
// the subchannel list associated with sd has been discarded. This callback
// corresponds to the unsubscription.
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown");
// corresponds to the unsubscription. The unrefs correspond to the picking
// ref (start_picking_locked or update_started_picking).
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
"sl_shutdown+started_picking");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown+picking");
return;
}
// Dispose of outdated subchannel lists.
if (sd->subchannel_list != p->subchannel_list &&
sd->subchannel_list != p->latest_pending_subchannel_list) {
// sd belongs to an outdated subchannel_list: get rid of it.
rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated");
rr_subchannel_list_shutdown_and_unref(exec_ctx, sd->subchannel_list,
"sl_outdated");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_outdated");
return;
}
// Now that we're inside the combiner, copy the pending connectivity
@ -583,7 +610,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
sd->user_data = NULL;
}
if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* the policy is shutting down. Flush all the pending picks... */
// the policy is shutting down. Flush all the pending picks...
pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
@ -592,8 +619,9 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(pp);
}
}
/* unref the "rr_connectivity" weak ref from start_picking */
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown");
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
"sd_shutdown+started_picking");
// unref the "rr_connectivity_update" weak ref from start_picking.
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
"rr_connectivity_sd_shutdown");
} else { // sd not in SHUTDOWN
@ -618,10 +646,10 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
if (p->subchannel_list != NULL) {
// dispose of the current subchannel_list
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
"sl_shutdown_rr_update_connectivity");
rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
"sl_phase_out_shutdown");
}
p->subchannel_list = sd->subchannel_list;
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = NULL;
}
/* at this point we know there's at least one suitable subchannel. Go
@ -632,8 +660,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
subchannel_data *selected =
&p->subchannel_list->subchannels[next_ready_index];
if (p->pending_picks != NULL) {
/* if the selected subchannel is going to be used for the pending
* picks, update the last picked pointer */
// if the selected subchannel is going to be used for the pending
// picks, update the last picked pointer
update_last_ready_subchannel_index_locked(p, next_ready_index);
}
pending_pick *pp;
@ -647,16 +675,17 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %lu)",
(void *)selected->subchannel,
(unsigned long)next_ready_index);
"[RR %p] Fulfilling pending pick. Target <-- subchannel %p "
"(subchannel_list %p, index %lu)",
(void *)p, (void *)selected->subchannel,
(void *)p->subchannel_list, (unsigned long)next_ready_index);
}
GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
}
/* renew notification: reuses the "rr_connectivity" weak ref on the policy
* as well as the sd->subchannel_list ref. */
/* renew notification: reuses the "rr_connectivity_update" weak ref on the
* policy as well as the sd->subchannel_list ref. */
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->pending_connectivity_state_unsafe,
@ -714,8 +743,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
} else {
// otherwise, keep using the current subchannel list (ignore this update).
gpr_log(GPR_ERROR,
"No valid LB addresses channel arg for Round Robin %p update, "
"ignoring.",
"[RR %p] No valid LB addresses channel arg for update, ignoring.",
(void *)p);
}
return;
@ -731,24 +759,27 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"rr_update_empty");
if (p->subchannel_list != NULL) {
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
"sl_shutdown_rr_update");
rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
"sl_shutdown_empty_update");
p->subchannel_list = NULL;
}
return;
}
size_t subchannel_index = 0;
rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list));
subchannel_list->policy = p;
subchannel_list->subchannels =
gpr_zalloc(sizeof(subchannel_data) * num_addrs);
subchannel_list->num_subchannels = num_addrs;
gpr_ref_init(&subchannel_list->refcount, 1);
p->latest_pending_subchannel_list = subchannel_list;
rr_subchannel_list *subchannel_list = rr_subchannel_list_create(p, num_addrs);
if (p->latest_pending_subchannel_list != NULL && p->started_picking) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels",
(void *)subchannel_list, (unsigned long)num_addrs);
gpr_log(GPR_DEBUG,
"[RR %p] Shutting down latest pending subchannel list %p, about "
"to be "
"replaced by newer latest %p",
(void *)p, (void *)p->latest_pending_subchannel_list,
(void *)subchannel_list);
}
rr_subchannel_list_shutdown_and_unref(
exec_ctx, p->latest_pending_subchannel_list, "sl_outdated_dont_smash");
}
p->latest_pending_subchannel_list = subchannel_list;
grpc_subchannel_args sc_args;
/* We need to remove the LB addresses in order to be able to compare the
* subchannel keys of subchannels from a different batch of addresses. */
@ -772,11 +803,12 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(GPR_DEBUG,
"index %lu: Created subchannel %p for address uri %s into "
gpr_log(
GPR_DEBUG,
"[RR %p] index %lu: Created subchannel %p for address uri %s into "
"subchannel_list %p",
(unsigned long)subchannel_index, (void *)subchannel, address_uri,
(void *)subchannel_list);
(void *)p, (unsigned long)subchannel_index, (void *)subchannel,
address_uri, (void *)subchannel_list);
gpr_free(address_uri);
}
grpc_channel_args_destroy(exec_ctx, new_args);
@ -815,10 +847,11 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
// The policy isn't picking yet. Save the update for later, disposing of
// previous version if any.
if (p->subchannel_list != NULL) {
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
"rr_update_before_started_picking");
}
p->subchannel_list = subchannel_list;
p->latest_pending_subchannel_list = NULL;
}
}
@ -848,7 +881,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Created Round Robin %p with %lu subchannels", (void *)p,
gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void *)p,
(unsigned long)p->subchannel_list->num_subchannels);
}
return &p->base;

@ -40,6 +40,8 @@ struct grpc_subchannel_key {
GPR_TLS_DECL(subchannel_index_exec_ctx);
static bool g_force_creation = false;
static void enter_ctx(grpc_exec_ctx *exec_ctx) {
GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == 0);
gpr_tls_set(&subchannel_index_exec_ctx, (intptr_t)exec_ctx);
@ -84,6 +86,7 @@ static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) {
int grpc_subchannel_key_compare(const grpc_subchannel_key *a,
const grpc_subchannel_key *b) {
if (g_force_creation) return false;
int c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
if (a->args.filter_count > 0) {
@ -250,3 +253,7 @@ void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx,
leave_ctx(exec_ctx);
}
void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) {
g_force_creation = force_creation;
}

@ -59,4 +59,16 @@ void grpc_subchannel_index_init(void);
/** Shutdown the subchannel index (global) */
void grpc_subchannel_index_shutdown(void);
/** \em TEST ONLY.
* If \a force_creation is true, all key comparisons will be false, resulting in
* new subchannels always being created. Otherwise, the keys will be compared as
* usual.
*
* This function is *not* threadsafe on purpose: it should *only* be used in
* test code.
*
* Tests using this function \em MUST run tests with and without \a
* force_creation set. */
void grpc_subchannel_index_test_only_set_force_creation(bool force_creation);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H */

@ -35,6 +35,7 @@
extern "C" {
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
}
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@ -331,11 +332,15 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_);
}
for (const bool force_creation : {true, false}) {
grpc_subchannel_index_test_only_set_force_creation(force_creation);
gpr_log(GPR_INFO, "Force subchannel creation: %d", force_creation);
for (size_t i = 0; i < 1000; ++i) {
std::random_shuffle(ports.begin(), ports.end());
SetNextResolution(ports);
if (i % 10 == 0) SendRpc();
}
}
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
}

Loading…
Cancel
Save