Assume that subchannels start in state IDLE.

pull/12878/head
Mark D. Roth 8 years ago
parent e9b1083791
commit 6e5ce7288d
  1. 2
      include/grpc/impl/codegen/connectivity_state.h
  2. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  3. 6
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  4. 2
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  5. 41
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
  6. 2
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  7. 4
      src/core/ext/filters/client_channel/subchannel.h
  8. 3
      src/core/lib/transport/connectivity_state.cc
  9. 4
      test/cpp/end2end/client_lb_end2end_test.cc

@ -25,8 +25,6 @@ extern "C" {
/** Connectivity state of a channel. */
typedef enum {
/** channel has just been initialized */
GRPC_CHANNEL_INIT = -1,
/** channel is idle */
GRPC_CHANNEL_IDLE,
/** channel is connecting */

@ -611,7 +611,6 @@ static void update_lb_connectivity_status_locked(
case GRPC_CHANNEL_SHUTDOWN:
GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
break;
case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY:
@ -1790,7 +1789,6 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
// embedded RR policy. Note that the current RR policy, if any, will stay in
// effect until an update from the new lb_call is received.
switch (glb_policy->lb_channel_connectivity) {
case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
/* resub. */

@ -312,6 +312,10 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
exec_ctx, p->subchannel_list, "pf_update_includes_selected");
}
p->subchannel_list = subchannel_list;
if (p->selected->connected_subchannel != NULL) {
sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected->connected_subchannel, "pf_update_includes_selected");
}
p->selected = sd;
destroy_unselected_subchannels_locked(exec_ctx, p);
// If there was a previously pending update (which may or may
@ -442,8 +446,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
while (true) {
switch (sd->curr_connectivity_state) {
case GRPC_CHANNEL_INIT:
GPR_UNREACHABLE_CODE(return );
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list.

@ -334,6 +334,7 @@ static void update_state_counters_locked(grpc_lb_subchannel_data *sd) {
GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle;
}
sd->prev_connectivity_state = sd->curr_connectivity_state;
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@ -451,7 +452,6 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Update state counters and determine new overall state.
update_state_counters_locked(sd);
sd->prev_connectivity_state = sd->curr_connectivity_state;
const grpc_connectivity_state new_policy_connectivity_state =
update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
// If the sd's new state is SHUTDOWN, unref the subchannel, and if the new

@ -137,34 +137,13 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create(
}
continue;
}
grpc_error *error;
// Get the connectivity state of the subchannel. Already existing ones may
// be in a state other than INIT.
const grpc_connectivity_state subchannel_connectivity_state =
grpc_subchannel_check_connectivity(subchannel, &error);
if (error != GRPC_ERROR_NONE) {
// The subchannel is in error (e.g. shutting down). Ignore it.
if (GRPC_TRACER_ON(*tracer)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(GPR_DEBUG,
"[%s %p] subchannel for address uri %s shutting down, ignoring",
tracer->name, subchannel_list->policy, address_uri);
gpr_free(address_uri);
}
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error");
GRPC_ERROR_UNREF(error);
continue;
}
if (GRPC_TRACER_ON(*tracer)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR
": Created subchannel %p for address uri %s; "
"initial connectivity state: %s",
": Created subchannel %p for address uri %s",
tracer->name, p, subchannel_list, subchannel_index, subchannel,
address_uri,
grpc_connectivity_state_name(subchannel_connectivity_state));
address_uri);
gpr_free(address_uri);
}
grpc_lb_subchannel_data *sd =
@ -174,16 +153,11 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create(
GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
connectivity_changed_cb, sd,
grpc_combiner_scheduler(args->combiner));
// Use some sentinel value outside of the range of
// grpc_connectivity_state to signal an undefined previous state.
sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
sd->curr_connectivity_state = subchannel_connectivity_state;
sd->pending_connectivity_state_unsafe = subchannel_connectivity_state;
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(sd->subchannel),
"ready_at_sl_creation");
}
// We assume that the current state is IDLE. If not, we'll get a
// callback telling us that.
sd->prev_connectivity_state = GRPC_CHANNEL_IDLE;
sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE;
sd->user_data_vtable = addresses->user_data_vtable;
if (sd->user_data_vtable != NULL) {
sd->user_data =
@ -191,6 +165,7 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create(
}
}
subchannel_list->num_subchannels = subchannel_index;
subchannel_list->num_idle = subchannel_index;
return subchannel_list;
}

@ -97,7 +97,7 @@ struct grpc_lb_subchannel_list {
/** Index into subchannels of the one we're currently checking.
* Used when connecting to subchannels serially instead of in parallel. */
// TODO(roth): When we have time, we can probably make this go away
// and the index dynamically by subtracting
// and compute the index dynamically by subtracting
// subchannel_list->subchannels from the subchannel_data pointer.
size_t checking_subchannel;

@ -127,8 +127,8 @@ void grpc_connected_subchannel_process_transport_op(
grpc_connectivity_state grpc_subchannel_check_connectivity(
grpc_subchannel *channel, grpc_error **error);
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the channel */
/** Calls notify when the connectivity state of a channel becomes different
from *state. Updates *state with the new state of the channel. */
void grpc_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_subchannel *channel,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state,

@ -29,8 +29,6 @@ grpc_tracer_flag grpc_connectivity_state_trace =
const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
switch (state) {
case GRPC_CHANNEL_INIT:
return "INIT";
case GRPC_CHANNEL_IDLE:
return "IDLE";
case GRPC_CHANNEL_CONNECTING:
@ -174,7 +172,6 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_name(state), reason, error, error_string);
}
switch (state) {
case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_READY:

@ -305,7 +305,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
ports.clear();
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET none *******");
grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT;
grpc_connectivity_state channel_state;
do {
channel_state = channel_->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);
@ -481,7 +481,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
// An empty update will result in the channel going into TRANSIENT_FAILURE.
ports.clear();
SetNextResolution(ports);
grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT;
grpc_connectivity_state channel_state;
do {
channel_state = channel_->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);

Loading…
Cancel
Save