Remove error from connectivity state tracking.

pull/18628/head
Mark D. Roth 6 years ago
parent 9144c6ab16
commit 432c97e1ba
  1. 34
      src/core/ext/filters/client_channel/client_channel.cc
  2. 3
      src/core/ext/filters/client_channel/client_channel_channelz.cc
  3. 1
      src/core/ext/filters/client_channel/lb_policy.h
  4. 24
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 67
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  6. 48
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  7. 12
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  8. 22
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  9. 24
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  10. 40
      src/core/ext/filters/client_channel/subchannel.cc
  11. 5
      src/core/ext/filters/client_channel/subchannel.h
  12. 18
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  13. 6
      src/core/ext/transport/inproc/inproc_transport.cc
  14. 42
      src/core/lib/transport/connectivity_state.cc
  15. 8
      src/core/lib/transport/connectivity_state.h
  16. 4
      test/core/transport/connectivity_state_test.cc
  17. 7
      test/core/util/test_lb_policies.cc

@ -227,13 +227,11 @@ namespace {
class ConnectivityStateAndPickerSetter { class ConnectivityStateAndPickerSetter {
public: public:
ConnectivityStateAndPickerSetter( ConnectivityStateAndPickerSetter(
channel_data* chand, grpc_connectivity_state state, channel_data* chand, grpc_connectivity_state state, const char* reason,
grpc_error* state_error, const char* reason,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
: chand_(chand), picker_(std::move(picker)) { : chand_(chand), picker_(std::move(picker)) {
// Update connectivity state here, while holding control plane combiner. // Update connectivity state here, while holding control plane combiner.
grpc_connectivity_state_set(&chand->state_tracker, state, state_error, grpc_connectivity_state_set(&chand->state_tracker, state, reason);
reason);
if (chand->channelz_node != nullptr) { if (chand->channelz_node != nullptr) {
chand->channelz_node->AddTraceEvent( chand->channelz_node->AddTraceEvent(
channelz::ChannelTrace::Severity::Info, channelz::ChannelTrace::Severity::Info,
@ -456,7 +454,7 @@ class ClientChannelControlHelper
} }
void UpdateState( void UpdateState(
grpc_connectivity_state state, grpc_error* state_error, grpc_connectivity_state state,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override { UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
grpc_error* disconnect_error = grpc_error* disconnect_error =
chand_->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE); chand_->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE);
@ -464,17 +462,14 @@ class ClientChannelControlHelper
const char* extra = disconnect_error == GRPC_ERROR_NONE const char* extra = disconnect_error == GRPC_ERROR_NONE
? "" ? ""
: " (ignoring -- channel shutting down)"; : " (ignoring -- channel shutting down)";
gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s", gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
chand_, grpc_connectivity_state_name(state), grpc_connectivity_state_name(state), picker.get(), extra);
grpc_error_string(state_error), picker.get(), extra);
} }
// Do update only if not shutting down. // Do update only if not shutting down.
if (disconnect_error == GRPC_ERROR_NONE) { if (disconnect_error == GRPC_ERROR_NONE) {
// Will delete itself. // Will delete itself.
New<ConnectivityStateAndPickerSetter>(chand_, state, state_error, New<ConnectivityStateAndPickerSetter>(chand_, state, "helper",
"helper", std::move(picker)); std::move(picker));
} else {
GRPC_ERROR_UNREF(state_error);
} }
} }
@ -524,16 +519,12 @@ static bool process_resolver_result_locked(
} }
static grpc_error* do_ping_locked(channel_data* chand, grpc_transport_op* op) { static grpc_error* do_ping_locked(channel_data* chand, grpc_transport_op* op) {
grpc_error* error = GRPC_ERROR_NONE; if (grpc_connectivity_state_check(&chand->state_tracker) !=
grpc_connectivity_state state = GRPC_CHANNEL_READY) {
grpc_connectivity_state_get(&chand->state_tracker, &error); return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
if (state != GRPC_CHANNEL_READY) {
grpc_error* new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"channel not connected", &error, 1);
GRPC_ERROR_UNREF(error);
return new_error;
} }
LoadBalancingPolicy::PickArgs pick; LoadBalancingPolicy::PickArgs pick;
grpc_error* error = GRPC_ERROR_NONE;
chand->picker->Pick(&pick, &error); chand->picker->Pick(&pick, &error);
if (pick.connected_subchannel != nullptr) { if (pick.connected_subchannel != nullptr) {
pick.connected_subchannel->Ping(op->send_ping.on_initiate, pick.connected_subchannel->Ping(op->send_ping.on_initiate,
@ -587,8 +578,7 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
chand->resolving_lb_policy.reset(); chand->resolving_lb_policy.reset();
// Will delete itself. // Will delete itself.
grpc_core::New<grpc_core::ConnectivityStateAndPickerSetter>( grpc_core::New<grpc_core::ConnectivityStateAndPickerSetter>(
chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error), chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
"shutdown from API",
grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>( grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
grpc_core::New<LoadBalancingPolicy::TransientFailurePicker>( grpc_core::New<LoadBalancingPolicy::TransientFailurePicker>(
GRPC_ERROR_REF(op->disconnect_with_error)))); GRPC_ERROR_REF(op->disconnect_with_error))));

@ -127,8 +127,7 @@ void SubchannelNode::PopulateConnectivityState(grpc_json* json) {
if (subchannel_ == nullptr) { if (subchannel_ == nullptr) {
state = GRPC_CHANNEL_SHUTDOWN; state = GRPC_CHANNEL_SHUTDOWN;
} else { } else {
state = subchannel_->CheckConnectivity(nullptr, state = subchannel_->CheckConnectivity(true /* inhibit_health_checking */);
true /* inhibit_health_checking */);
} }
json = grpc_json_create_child(nullptr, json, "state", nullptr, json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false); GRPC_JSON_OBJECT, false);

@ -185,7 +185,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Sets the connectivity state and returns a new picker to be used /// Sets the connectivity state and returns a new picker to be used
/// by the client channel. /// by the client channel.
virtual void UpdateState(grpc_connectivity_state state, virtual void UpdateState(grpc_connectivity_state state,
grpc_error* state_error,
UniquePtr<SubchannelPicker>) GRPC_ABSTRACT; UniquePtr<SubchannelPicker>) GRPC_ABSTRACT;
/// Requests that the resolver re-resolve. /// Requests that the resolver re-resolve.

@ -282,7 +282,7 @@ class GrpcLb : public LoadBalancingPolicy {
Subchannel* CreateSubchannel(const grpc_channel_args& args) override; Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target, grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override; const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, grpc_error* state_error, void UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) override; UniquePtr<SubchannelPicker> picker) override;
void RequestReresolution() override; void RequestReresolution() override;
@ -622,12 +622,8 @@ grpc_channel* GrpcLb::Helper::CreateChannel(const char* target,
} }
void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
grpc_error* state_error,
UniquePtr<SubchannelPicker> picker) { UniquePtr<SubchannelPicker> picker) {
if (parent_->shutting_down_) { if (parent_->shutting_down_) return;
GRPC_ERROR_UNREF(state_error);
return;
}
// If this request is from the pending child policy, ignore it until // If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place. // it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) { if (CalledByPendingChild()) {
@ -637,10 +633,7 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
parent_.get(), this, parent_->pending_child_policy_.get(), parent_.get(), this, parent_->pending_child_policy_.get(),
grpc_connectivity_state_name(state)); grpc_connectivity_state_name(state));
} }
if (state != GRPC_CHANNEL_READY) { if (state != GRPC_CHANNEL_READY) return;
GRPC_ERROR_UNREF(state_error);
return;
}
grpc_pollset_set_del_pollset_set( grpc_pollset_set_del_pollset_set(
parent_->child_policy_->interested_parties(), parent_->child_policy_->interested_parties(),
parent_->interested_parties()); parent_->interested_parties());
@ -648,7 +641,6 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
parent_->child_policy_ = std::move(parent_->pending_child_policy_); parent_->child_policy_ = std::move(parent_->pending_child_policy_);
} else if (!CalledByCurrentChild()) { } else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it. // This request is from an outdated child, so ignore it.
GRPC_ERROR_UNREF(state_error);
return; return;
} }
// Record whether child policy reports READY. // Record whether child policy reports READY.
@ -683,8 +675,7 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
parent_.get(), this, grpc_connectivity_state_name(state), parent_.get(), this, grpc_connectivity_state_name(state),
picker.get()); picker.get());
} }
parent_->channel_control_helper()->UpdateState(state, state_error, parent_->channel_control_helper()->UpdateState(state, std::move(picker));
std::move(picker));
return; return;
} }
// Cases 2 and 3a: wrap picker from the child in our own picker. // Cases 2 and 3a: wrap picker from the child in our own picker.
@ -699,10 +690,9 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
client_stats = parent_->lb_calld_->client_stats()->Ref(); client_stats = parent_->lb_calld_->client_stats()->Ref();
} }
parent_->channel_control_helper()->UpdateState( parent_->channel_control_helper()->UpdateState(
state, state_error, state, UniquePtr<SubchannelPicker>(
UniquePtr<SubchannelPicker>( New<Picker>(parent_.get(), parent_->serverlist_,
New<Picker>(parent_.get(), parent_->serverlist_, std::move(picker), std::move(picker), std::move(client_stats))));
std::move(client_stats))));
} }
void GrpcLb::Helper::RequestReresolution() { void GrpcLb::Helper::RequestReresolution() {

@ -73,7 +73,7 @@ class PickFirst : public LoadBalancingPolicy {
: SubchannelData(subchannel_list, address, subchannel, combiner) {} : SubchannelData(subchannel_list, address, subchannel, combiner) {}
void ProcessConnectivityChangeLocked( void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override; grpc_connectivity_state connectivity_state) override;
// Processes the connectivity change to READY for an unselected subchannel. // Processes the connectivity change to READY for an unselected subchannel.
void ProcessUnselectedReadyLocked(); void ProcessUnselectedReadyLocked();
@ -191,10 +191,11 @@ void PickFirst::ExitIdleLocked() {
idle_ = false; idle_ = false;
if (subchannel_list_ == nullptr || if (subchannel_list_ == nullptr ||
subchannel_list_->num_subchannels() == 0) { subchannel_list_->num_subchannels() == 0) {
grpc_error* error = grpc_error* error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No addresses to connect to"); GRPC_ERROR_CREATE_FROM_STATIC_STRING("No addresses to connect to"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), GRPC_CHANNEL_TRANSIENT_FAILURE,
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error))); UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
} else { } else {
subchannel_list_->subchannel(0) subchannel_list_->subchannel(0)
@ -268,9 +269,11 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
// haven't gotten a non-empty update by the time the application tries // haven't gotten a non-empty update by the time the application tries
// to start a new call.) // to start a new call.)
if (!idle_) { if (!idle_) {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"); grpc_error* error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), GRPC_CHANNEL_TRANSIENT_FAILURE,
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error))); UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
} }
return; return;
@ -284,9 +287,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
// check and instead do it in ExitIdleLocked(). // check and instead do it in ExitIdleLocked().
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
PickFirstSubchannelData* sd = subchannel_list->subchannel(i); PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
grpc_error* error = GRPC_ERROR_NONE; grpc_connectivity_state state = sd->CheckConnectivityStateLocked();
grpc_connectivity_state state = sd->CheckConnectivityStateLocked(&error);
GRPC_ERROR_UNREF(error);
if (state == GRPC_CHANNEL_READY) { if (state == GRPC_CHANNEL_READY) {
subchannel_list_ = std::move(subchannel_list); subchannel_list_ = std::move(subchannel_list);
sd->StartConnectivityWatchLocked(); sd->StartConnectivityWatchLocked();
@ -340,7 +341,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
} }
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) { grpc_connectivity_state connectivity_state) {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
AutoChildRefsUpdater guard(p); AutoChildRefsUpdater guard(p);
// The notification must be for a subchannel in either the current or // The notification must be for a subchannel in either the current or
@ -371,17 +372,16 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list. // Set our state to that of the pending subchannel list.
if (p->subchannel_list_->in_transient_failure()) { if (p->subchannel_list_->in_transient_failure()) {
grpc_error* new_error = grpc_error* error = grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"selected subchannel failed; switching to pending update", "selected subchannel failed; switching to pending update"),
&error, 1); GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error), GRPC_CHANNEL_TRANSIENT_FAILURE,
UniquePtr<SubchannelPicker>( UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
New<TransientFailurePicker>(new_error)));
} else { } else {
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, GRPC_CHANNEL_CONNECTING,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref()))); UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
} }
} else { } else {
@ -395,7 +395,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->selected_ = nullptr; p->selected_ = nullptr;
StopConnectivityWatchLocked(); StopConnectivityWatchLocked();
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, GRPC_CHANNEL_IDLE,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref()))); UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
} else { } else {
// This is unlikely but can happen when a subchannel has been asked // This is unlikely but can happen when a subchannel has been asked
@ -403,19 +403,17 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// some connectivity state notifications. // some connectivity state notifications.
if (connectivity_state == GRPC_CHANNEL_READY) { if (connectivity_state == GRPC_CHANNEL_READY) {
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, GRPC_ERROR_NONE, GRPC_CHANNEL_READY, UniquePtr<SubchannelPicker>(New<Picker>(
UniquePtr<SubchannelPicker>( connected_subchannel()->Ref())));
New<Picker>(connected_subchannel()->Ref())));
} else { // CONNECTING } else { // CONNECTING
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
connectivity_state, GRPC_ERROR_REF(error), connectivity_state,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref()))); UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
} }
// Renew notification. // Renew notification.
RenewConnectivityWatchLocked(); RenewConnectivityWatchLocked();
} }
} }
GRPC_ERROR_UNREF(error);
return; return;
} }
// If we get here, there are two possible cases: // If we get here, there are two possible cases:
@ -452,13 +450,13 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
subchannel_list()->set_in_transient_failure(true); subchannel_list()->set_in_transient_failure(true);
// Only report new state in case 1. // Only report new state in case 1.
if (subchannel_list() == p->subchannel_list_.get()) { if (subchannel_list() == p->subchannel_list_.get()) {
grpc_error* new_error = grpc_error* error = grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"failed to connect to all addresses", &error, 1); "failed to connect to all addresses"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error), GRPC_CHANNEL_TRANSIENT_FAILURE,
UniquePtr<SubchannelPicker>( UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
New<TransientFailurePicker>(new_error)));
} }
} }
sd->CheckConnectivityStateAndStartWatchingLocked(); sd->CheckConnectivityStateAndStartWatchingLocked();
@ -469,7 +467,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// Only update connectivity state in case 1. // Only update connectivity state in case 1.
if (subchannel_list() == p->subchannel_list_.get()) { if (subchannel_list() == p->subchannel_list_.get()) {
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, GRPC_CHANNEL_CONNECTING,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref()))); UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
} }
// Renew notification. // Renew notification.
@ -479,7 +477,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
case GRPC_CHANNEL_SHUTDOWN: case GRPC_CHANNEL_SHUTDOWN:
GPR_UNREACHABLE_CODE(break); GPR_UNREACHABLE_CODE(break);
} }
GRPC_ERROR_UNREF(error);
} }
void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
@ -509,7 +506,7 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
// Cases 1 and 2. // Cases 1 and 2.
p->selected_ = this; p->selected_ = this;
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, GRPC_ERROR_NONE, GRPC_CHANNEL_READY,
UniquePtr<SubchannelPicker>(New<Picker>(connected_subchannel()->Ref()))); UniquePtr<SubchannelPicker>(New<Picker>(connected_subchannel()->Ref())));
if (grpc_lb_pick_first_trace.enabled()) { if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel()); gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
@ -520,9 +517,7 @@ void PickFirst::PickFirstSubchannelData::
CheckConnectivityStateAndStartWatchingLocked() { CheckConnectivityStateAndStartWatchingLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
// Check current state. // Check current state.
grpc_error* error = GRPC_ERROR_NONE; grpc_connectivity_state current_state = CheckConnectivityStateLocked();
grpc_connectivity_state current_state = CheckConnectivityStateLocked(&error);
GRPC_ERROR_UNREF(error);
// Start watch. // Start watch.
StartConnectivityWatchLocked(); StartConnectivityWatchLocked();
// If current state is READY, select the subchannel now, since we started // If current state is READY, select the subchannel now, since we started

@ -92,11 +92,11 @@ class RoundRobin : public LoadBalancingPolicy {
} }
void UpdateConnectivityStateLocked( void UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state, grpc_error* error); grpc_connectivity_state connectivity_state);
private: private:
void ProcessConnectivityChangeLocked( void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override; grpc_connectivity_state connectivity_state) override;
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
}; };
@ -119,7 +119,6 @@ class RoundRobin : public LoadBalancingPolicy {
} }
~RoundRobinSubchannelList() { ~RoundRobinSubchannelList() {
GRPC_ERROR_UNREF(last_transient_failure_error_);
RoundRobin* p = static_cast<RoundRobin*>(policy()); RoundRobin* p = static_cast<RoundRobin*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list"); p->Unref(DEBUG_LOCATION, "subchannel_list");
} }
@ -129,11 +128,8 @@ class RoundRobin : public LoadBalancingPolicy {
// Updates the counters of subchannels in each state when a // Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state. // subchannel transitions from old_state to new_state.
// transient_failure_error is the error that is reported when
// new_state is TRANSIENT_FAILURE.
void UpdateStateCountersLocked(grpc_connectivity_state old_state, void UpdateStateCountersLocked(grpc_connectivity_state old_state,
grpc_connectivity_state new_state, grpc_connectivity_state new_state);
grpc_error* transient_failure_error);
// If this subchannel list is the RR policy's current subchannel // If this subchannel list is the RR policy's current subchannel
// list, updates the RR policy's connectivity state based on the // list, updates the RR policy's connectivity state based on the
@ -148,7 +144,6 @@ class RoundRobin : public LoadBalancingPolicy {
size_t num_ready_ = 0; size_t num_ready_ = 0;
size_t num_connecting_ = 0; size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0; size_t num_transient_failure_ = 0;
grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE;
}; };
class Picker : public SubchannelPicker { class Picker : public SubchannelPicker {
@ -317,11 +312,10 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
// subchannel already used by some other channel may have a non-IDLE // subchannel already used by some other channel may have a non-IDLE
// state. // state.
for (size_t i = 0; i < num_subchannels(); ++i) { for (size_t i = 0; i < num_subchannels(); ++i) {
grpc_error* error = GRPC_ERROR_NONE;
grpc_connectivity_state state = grpc_connectivity_state state =
subchannel(i)->CheckConnectivityStateLocked(&error); subchannel(i)->CheckConnectivityStateLocked();
if (state != GRPC_CHANNEL_IDLE) { if (state != GRPC_CHANNEL_IDLE) {
subchannel(i)->UpdateConnectivityStateLocked(state, error); subchannel(i)->UpdateConnectivityStateLocked(state);
} }
} }
// Start connectivity watch for each subchannel. // Start connectivity watch for each subchannel.
@ -335,8 +329,7 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
} }
void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
grpc_connectivity_state old_state, grpc_connectivity_state new_state, grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
grpc_error* transient_failure_error) {
GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN); GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN);
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
if (old_state == GRPC_CHANNEL_READY) { if (old_state == GRPC_CHANNEL_READY) {
@ -356,8 +349,6 @@ void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
} else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++num_transient_failure_; ++num_transient_failure_;
} }
GRPC_ERROR_UNREF(last_transient_failure_error_);
last_transient_failure_error_ = transient_failure_error;
} }
// Sets the RR policy's connectivity state and generates a new picker based // Sets the RR policy's connectivity state and generates a new picker based
@ -384,20 +375,21 @@ void RoundRobin::RoundRobinSubchannelList::
if (num_ready_ > 0) { if (num_ready_ > 0) {
/* 1) READY */ /* 1) READY */
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, GRPC_ERROR_NONE, GRPC_CHANNEL_READY, UniquePtr<SubchannelPicker>(New<Picker>(p, this)));
UniquePtr<SubchannelPicker>(New<Picker>(p, this)));
} else if (num_connecting_ > 0) { } else if (num_connecting_ > 0) {
/* 2) CONNECTING */ /* 2) CONNECTING */
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, GRPC_CHANNEL_CONNECTING,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref()))); UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
} else if (num_transient_failure_ == num_subchannels()) { } else if (num_transient_failure_ == num_subchannels()) {
/* 3) TRANSIENT_FAILURE */ /* 3) TRANSIENT_FAILURE */
grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"connections to all backends failing"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
p->channel_control_helper()->UpdateState( p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(last_transient_failure_error_), UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(
GRPC_ERROR_REF(last_transient_failure_error_))));
} }
} }
@ -432,7 +424,7 @@ void RoundRobin::RoundRobinSubchannelList::
} }
void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked( void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) { grpc_connectivity_state connectivity_state) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
if (grpc_lb_round_robin_trace.enabled()) { if (grpc_lb_round_robin_trace.enabled()) {
gpr_log( gpr_log(
@ -445,12 +437,12 @@ void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
grpc_connectivity_state_name(connectivity_state)); grpc_connectivity_state_name(connectivity_state));
} }
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
connectivity_state, error); connectivity_state);
last_connectivity_state_ = connectivity_state; last_connectivity_state_ = connectivity_state;
} }
void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) { grpc_connectivity_state connectivity_state) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr); GPR_ASSERT(subchannel() != nullptr);
// If the new state is TRANSIENT_FAILURE, re-resolve. // If the new state is TRANSIENT_FAILURE, re-resolve.
@ -470,7 +462,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
// Renew connectivity watch. // Renew connectivity watch.
RenewConnectivityWatchLocked(); RenewConnectivityWatchLocked();
// Update state counters. // Update state counters.
UpdateConnectivityStateLocked(connectivity_state, error); UpdateConnectivityStateLocked(connectivity_state);
// Update overall state and renew notification. // Update overall state and renew notification.
subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
} }
@ -494,9 +486,11 @@ void RoundRobin::UpdateLocked(UpdateArgs args) {
if (latest_pending_subchannel_list_->num_subchannels() == 0) { if (latest_pending_subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately promote the new list to the // If the new list is empty, immediately promote the new list to the
// current list and transition to TRANSIENT_FAILURE. // current list and transition to TRANSIENT_FAILURE.
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"); grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), GRPC_CHANNEL_TRANSIENT_FAILURE,
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error))); UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
subchannel_list_ = std::move(latest_pending_subchannel_list_); subchannel_list_ = std::move(latest_pending_subchannel_list_);
} else if (subchannel_list_ == nullptr) { } else if (subchannel_list_ == nullptr) {

@ -51,7 +51,7 @@ class MySubchannelData
: public SubchannelData<MySubchannelList, MySubchannelData> { : public SubchannelData<MySubchannelList, MySubchannelData> {
public: public:
void ProcessConnectivityChangeLocked( void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override { grpc_connectivity_state connectivity_state) override {
// ...code to handle connectivity changes... // ...code to handle connectivity changes...
} }
}; };
@ -101,10 +101,10 @@ class SubchannelData {
// pending (i.e., between calling StartConnectivityWatchLocked() or // pending (i.e., between calling StartConnectivityWatchLocked() or
// RenewConnectivityWatchLocked() and the resulting invocation of // RenewConnectivityWatchLocked() and the resulting invocation of
// ProcessConnectivityChangeLocked()). // ProcessConnectivityChangeLocked()).
grpc_connectivity_state CheckConnectivityStateLocked(grpc_error** error) { grpc_connectivity_state CheckConnectivityStateLocked() {
GPR_ASSERT(!connectivity_notification_pending_); GPR_ASSERT(!connectivity_notification_pending_);
pending_connectivity_state_unsafe_ = subchannel()->CheckConnectivity( pending_connectivity_state_unsafe_ = subchannel()->CheckConnectivity(
error, subchannel_list_->inhibit_health_checking()); subchannel_list_->inhibit_health_checking());
UpdateConnectedSubchannelLocked(); UpdateConnectedSubchannelLocked();
return pending_connectivity_state_unsafe_; return pending_connectivity_state_unsafe_;
} }
@ -153,8 +153,7 @@ class SubchannelData {
// Implementations must invoke either RenewConnectivityWatchLocked() or // Implementations must invoke either RenewConnectivityWatchLocked() or
// StopConnectivityWatchLocked() before returning. // StopConnectivityWatchLocked() before returning.
virtual void ProcessConnectivityChangeLocked( virtual void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_connectivity_state connectivity_state) GRPC_ABSTRACT;
grpc_error* error) GRPC_ABSTRACT;
// Unrefs the subchannel. // Unrefs the subchannel.
void UnrefSubchannelLocked(const char* reason); void UnrefSubchannelLocked(const char* reason);
@ -462,8 +461,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
return; return;
} }
// Call the subclass's ProcessConnectivityChangeLocked() method. // Call the subclass's ProcessConnectivityChangeLocked() method.
sd->ProcessConnectivityChangeLocked(sd->pending_connectivity_state_unsafe_, sd->ProcessConnectivityChangeLocked(sd->pending_connectivity_state_unsafe_);
GRPC_ERROR_REF(error));
} }
template <typename SubchannelListType, typename SubchannelDataType> template <typename SubchannelListType, typename SubchannelDataType>

@ -301,7 +301,7 @@ class XdsLb : public LoadBalancingPolicy {
Subchannel* CreateSubchannel(const grpc_channel_args& args) override; Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target, grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override; const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, grpc_error* state_error, void UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) override; UniquePtr<SubchannelPicker> picker) override;
void RequestReresolution() override; void RequestReresolution() override;
void set_child(LoadBalancingPolicy* child) { child_ = child; } void set_child(LoadBalancingPolicy* child) { child_ = child; }
@ -1565,6 +1565,7 @@ void XdsLb::LocalityMap::LocalityEntry::Orphan() {
// //
// LocalityEntry::Helper implementation // LocalityEntry::Helper implementation
// //
bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByPendingChild() const { bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByPendingChild() const {
GPR_ASSERT(child_ != nullptr); GPR_ASSERT(child_ != nullptr);
return child_ == entry_->pending_child_policy_.get(); return child_ == entry_->pending_child_policy_.get();
@ -1594,12 +1595,8 @@ grpc_channel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateChannel(
} }
void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState( void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
grpc_connectivity_state state, grpc_error* state_error, grpc_connectivity_state state, UniquePtr<SubchannelPicker> picker) {
UniquePtr<SubchannelPicker> picker) { if (entry_->parent_->shutting_down_) return;
if (entry_->parent_->shutting_down_) {
GRPC_ERROR_UNREF(state_error);
return;
}
// If this request is from the pending child policy, ignore it until // If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place. // it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) { if (CalledByPendingChild()) {
@ -1609,10 +1606,7 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
entry_->parent_.get(), this, entry_->pending_child_policy_.get(), entry_->parent_.get(), this, entry_->pending_child_policy_.get(),
grpc_connectivity_state_name(state)); grpc_connectivity_state_name(state));
} }
if (state != GRPC_CHANNEL_READY) { if (state != GRPC_CHANNEL_READY) return;
GRPC_ERROR_UNREF(state_error);
return;
}
grpc_pollset_set_del_pollset_set( grpc_pollset_set_del_pollset_set(
entry_->child_policy_->interested_parties(), entry_->child_policy_->interested_parties(),
entry_->parent_->interested_parties()); entry_->parent_->interested_parties());
@ -1620,7 +1614,6 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
entry_->child_policy_ = std::move(entry_->pending_child_policy_); entry_->child_policy_ = std::move(entry_->pending_child_policy_);
} else if (!CalledByCurrentChild()) { } else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it. // This request is from an outdated child, so ignore it.
GRPC_ERROR_UNREF(state_error);
return; return;
} }
// TODO(juanlishen): When in fallback mode, pass the child picker // TODO(juanlishen): When in fallback mode, pass the child picker
@ -1632,9 +1625,8 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
? nullptr ? nullptr
: entry_->parent_->lb_chand_->lb_calld()->client_stats(); : entry_->parent_->lb_chand_->lb_calld()->client_stats();
entry_->parent_->channel_control_helper()->UpdateState( entry_->parent_->channel_control_helper()->UpdateState(
state, state_error, state, UniquePtr<SubchannelPicker>(
UniquePtr<SubchannelPicker>( New<Picker>(std::move(picker), std::move(client_stats))));
New<Picker>(std::move(picker), std::move(client_stats))));
} }
void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() { void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() {

@ -119,13 +119,9 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
return parent_->channel_control_helper()->CreateChannel(target, args); return parent_->channel_control_helper()->CreateChannel(target, args);
} }
void UpdateState(grpc_connectivity_state state, grpc_error* state_error, void UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) override { UniquePtr<SubchannelPicker> picker) override {
if (parent_->resolver_ == nullptr) { if (parent_->resolver_ == nullptr) return; // Shutting down.
// shutting down.
GRPC_ERROR_UNREF(state_error);
return;
}
// If this request is from the pending child policy, ignore it until // If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place. // it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) { if (CalledByPendingChild()) {
@ -136,10 +132,7 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
parent_.get(), this, child_, parent_.get(), this, child_,
grpc_connectivity_state_name(state)); grpc_connectivity_state_name(state));
} }
if (state != GRPC_CHANNEL_READY) { if (state != GRPC_CHANNEL_READY) return;
GRPC_ERROR_UNREF(state_error);
return;
}
grpc_pollset_set_del_pollset_set( grpc_pollset_set_del_pollset_set(
parent_->lb_policy_->interested_parties(), parent_->lb_policy_->interested_parties(),
parent_->interested_parties()); parent_->interested_parties());
@ -147,11 +140,9 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
parent_->lb_policy_ = std::move(parent_->pending_lb_policy_); parent_->lb_policy_ = std::move(parent_->pending_lb_policy_);
} else if (!CalledByCurrentChild()) { } else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it. // This request is from an outdated child, so ignore it.
GRPC_ERROR_UNREF(state_error);
return; return;
} }
parent_->channel_control_helper()->UpdateState(state, state_error, parent_->channel_control_helper()->UpdateState(state, std::move(picker));
std::move(picker));
} }
void RequestReresolution() override { void RequestReresolution() override {
@ -234,8 +225,7 @@ grpc_error* ResolvingLoadBalancingPolicy::Init(const grpc_channel_args& args) {
} }
// Return our picker to the channel. // Return our picker to the channel.
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, GRPC_CHANNEL_IDLE, UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
@ -313,7 +303,7 @@ void ResolvingLoadBalancingPolicy::StartResolvingLocked() {
GPR_ASSERT(!started_resolving_); GPR_ASSERT(!started_resolving_);
started_resolving_ = true; started_resolving_ = true;
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, GRPC_CHANNEL_CONNECTING,
UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref()))); UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
resolver_->StartLocked(); resolver_->StartLocked();
} }
@ -334,7 +324,7 @@ void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolver transient failure", &error, 1); "Resolver transient failure", &error, 1);
channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(state_error), GRPC_CHANNEL_TRANSIENT_FAILURE,
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(state_error))); UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(state_error)));
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);

@ -332,10 +332,9 @@ class Subchannel::ConnectedSubchannelStateWatcher
health_state = GRPC_CHANNEL_CONNECTING; health_state = GRPC_CHANNEL_CONNECTING;
} }
// Report initial state. // Report initial state.
c->SetConnectivityStateLocked(GRPC_CHANNEL_READY, GRPC_ERROR_NONE, c->SetConnectivityStateLocked(GRPC_CHANNEL_READY, "subchannel_connected");
"subchannel_connected");
grpc_connectivity_state_set(&c->state_and_health_tracker_, health_state, grpc_connectivity_state_set(&c->state_and_health_tracker_, health_state,
GRPC_ERROR_NONE, "subchannel_connected"); "subchannel_connected");
} }
~ConnectedSubchannelStateWatcher() { ~ConnectedSubchannelStateWatcher() {
@ -367,11 +366,10 @@ class Subchannel::ConnectedSubchannelStateWatcher
c->connected_subchannel_watcher_.reset(); c->connected_subchannel_watcher_.reset();
self->last_connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE; self->last_connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE;
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error),
"reflect_child"); "reflect_child");
grpc_connectivity_state_set(&c->state_and_health_tracker_, grpc_connectivity_state_set(&c->state_and_health_tracker_,
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "reflect_child"); "reflect_child");
c->backoff_begun_ = false; c->backoff_begun_ = false;
c->backoff_.Reset(); c->backoff_.Reset();
c->MaybeStartConnectingLocked(); c->MaybeStartConnectingLocked();
@ -388,11 +386,11 @@ class Subchannel::ConnectedSubchannelStateWatcher
// from READY to CONNECTING or IDLE. // from READY to CONNECTING or IDLE.
self->last_connectivity_state_ = self->pending_connectivity_state_; self->last_connectivity_state_ = self->pending_connectivity_state_;
c->SetConnectivityStateLocked(self->pending_connectivity_state_, c->SetConnectivityStateLocked(self->pending_connectivity_state_,
GRPC_ERROR_REF(error), "reflect_child"); "reflect_child");
if (self->pending_connectivity_state_ != GRPC_CHANNEL_READY) { if (self->pending_connectivity_state_ != GRPC_CHANNEL_READY) {
grpc_connectivity_state_set(&c->state_and_health_tracker_, grpc_connectivity_state_set(&c->state_and_health_tracker_,
self->pending_connectivity_state_, self->pending_connectivity_state_,
GRPC_ERROR_REF(error), "reflect_child"); "reflect_child");
} }
c->connected_subchannel_->NotifyOnStateChange( c->connected_subchannel_->NotifyOnStateChange(
nullptr, &self->pending_connectivity_state_, nullptr, &self->pending_connectivity_state_,
@ -415,8 +413,7 @@ class Subchannel::ConnectedSubchannelStateWatcher
self->health_check_client_ != nullptr) { self->health_check_client_ != nullptr) {
if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) { if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) {
grpc_connectivity_state_set(&c->state_and_health_tracker_, grpc_connectivity_state_set(&c->state_and_health_tracker_,
self->health_state_, self->health_state_, "health_changed");
GRPC_ERROR_REF(error), "health_changed");
} }
self->health_check_client_->NotifyOnHealthChange( self->health_check_client_->NotifyOnHealthChange(
&self->health_state_, &self->on_health_changed_); &self->health_state_, &self->on_health_changed_);
@ -740,11 +737,10 @@ channelz::SubchannelNode* Subchannel::channelz_node() {
} }
grpc_connectivity_state Subchannel::CheckConnectivity( grpc_connectivity_state Subchannel::CheckConnectivity(
grpc_error** error, bool inhibit_health_checking) { bool inhibit_health_checking) {
MutexLock lock(&mu_);
grpc_connectivity_state_tracker* tracker = grpc_connectivity_state_tracker* tracker =
inhibit_health_checking ? &state_tracker_ : &state_and_health_tracker_; inhibit_health_checking ? &state_tracker_ : &state_and_health_tracker_;
grpc_connectivity_state state = grpc_connectivity_state_get(tracker, error); grpc_connectivity_state state = grpc_connectivity_state_check(tracker);
return state; return state;
} }
@ -852,7 +848,6 @@ const char* SubchannelConnectivityStateChangeString(
} // namespace } // namespace
void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state, void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
grpc_error* error,
const char* reason) { const char* reason) {
if (channelz_node_ != nullptr) { if (channelz_node_ != nullptr) {
channelz_node_->AddTraceEvent( channelz_node_->AddTraceEvent(
@ -860,7 +855,7 @@ void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
grpc_slice_from_static_string( grpc_slice_from_static_string(
SubchannelConnectivityStateChangeString(state))); SubchannelConnectivityStateChangeString(state)));
} }
grpc_connectivity_state_set(&state_tracker_, state, error, reason); grpc_connectivity_state_set(&state_tracker_, state, reason);
} }
void Subchannel::MaybeStartConnectingLocked() { void Subchannel::MaybeStartConnectingLocked() {
@ -935,11 +930,9 @@ void Subchannel::ContinueConnectingLocked() {
next_attempt_deadline_ = backoff_.NextAttemptTime(); next_attempt_deadline_ = backoff_.NextAttemptTime();
args.deadline = std::max(next_attempt_deadline_, min_deadline); args.deadline = std::max(next_attempt_deadline_, min_deadline);
args.channel_args = args_; args.channel_args = args_;
SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, "connecting");
"connecting");
grpc_connectivity_state_set(&state_and_health_tracker_, grpc_connectivity_state_set(&state_and_health_tracker_,
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, GRPC_CHANNEL_CONNECTING, "connecting");
"connecting");
grpc_connector_connect(connector_, &args, &connecting_result_, grpc_connector_connect(connector_, &args, &connecting_result_,
&on_connecting_finished_); &on_connecting_finished_);
} }
@ -956,16 +949,11 @@ void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) {
} else if (c->disconnected_) { } else if (c->disconnected_) {
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
} else { } else {
const char* errmsg = grpc_error_string(error); gpr_log(GPR_INFO, "Connect failed: %s", grpc_error_string(error));
gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
error =
grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Connect Failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connect_failed"); "connect_failed");
grpc_connectivity_state_set(&c->state_and_health_tracker_, grpc_connectivity_state_set(&c->state_and_health_tracker_,
GRPC_CHANNEL_TRANSIENT_FAILURE, error, GRPC_CHANNEL_TRANSIENT_FAILURE,
"connect_failed"); "connect_failed");
c->MaybeStartConnectingLocked(); c->MaybeStartConnectingLocked();
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");

@ -207,8 +207,7 @@ class Subchannel {
channelz::SubchannelNode* channelz_node(); channelz::SubchannelNode* channelz_node();
// Polls the current connectivity state of the subchannel. // Polls the current connectivity state of the subchannel.
grpc_connectivity_state CheckConnectivity(grpc_error** error, grpc_connectivity_state CheckConnectivity(bool inhibit_health_checking);
bool inhibit_health_checking);
// When the connectivity state of the subchannel changes from \a *state, // When the connectivity state of the subchannel changes from \a *state,
// invokes \a notify and updates \a *state with the new state. // invokes \a notify and updates \a *state with the new state.
@ -241,7 +240,7 @@ class Subchannel {
// Sets the subchannel's connectivity state to \a state. // Sets the subchannel's connectivity state to \a state.
void SetConnectivityStateLocked(grpc_connectivity_state state, void SetConnectivityStateLocked(grpc_connectivity_state state,
grpc_error* error, const char* reason); const char* reason);
// Methods for connection. // Methods for connection.
void MaybeStartConnectingLocked(); void MaybeStartConnectingLocked();

@ -119,7 +119,7 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t);
static void connectivity_state_set(grpc_chttp2_transport* t, static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state, grpc_connectivity_state state,
grpc_error* error, const char* reason); const char* reason);
static void benign_reclaimer_locked(void* t, grpc_error* error); static void benign_reclaimer_locked(void* t, grpc_error* error);
static void destructive_reclaimer_locked(void* t, grpc_error* error); static void destructive_reclaimer_locked(void* t, grpc_error* error);
@ -592,8 +592,7 @@ static void close_transport_locked(grpc_chttp2_transport* t,
} }
GPR_ASSERT(error != GRPC_ERROR_NONE); GPR_ASSERT(error != GRPC_ERROR_NONE);
t->closed_with_error = GRPC_ERROR_REF(error); t->closed_with_error = GRPC_ERROR_REF(error);
connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, "close_transport");
"close_transport");
if (t->ping_state.is_delayed_ping_timer_set) { if (t->ping_state.is_delayed_ping_timer_set) {
grpc_timer_cancel(&t->ping_state.delayed_ping_timer); grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
} }
@ -1171,8 +1170,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
/* lie: use transient failure from the transport to indicate goaway has been /* lie: use transient failure from the transport to indicate goaway has been
* received */ * received */
connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, "got_goaway");
GRPC_ERROR_REF(t->goaway_error), "got_goaway");
} }
static void maybe_start_some_streams(grpc_chttp2_transport* t) { static void maybe_start_some_streams(grpc_chttp2_transport* t) {
@ -1194,10 +1192,8 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
t->next_stream_id += 2; t->next_stream_id += 2;
if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) { if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
connectivity_state_set( connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
t, GRPC_CHANNEL_TRANSIENT_FAILURE, "no_more_stream_ids");
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
"no_more_stream_ids");
} }
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
@ -2804,9 +2800,9 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
static void connectivity_state_set(grpc_chttp2_transport* t, static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state, grpc_connectivity_state state,
grpc_error* error, const char* reason) { const char* reason) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "set connectivity_state=%d", state)); GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "set connectivity_state=%d", state));
grpc_connectivity_state_set(&t->channel_callback.state_tracker, state, error, grpc_connectivity_state_set(&t->channel_callback.state_tracker, state,
reason); reason);
} }

@ -1088,10 +1088,8 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
void close_transport_locked(inproc_transport* t) { void close_transport_locked(inproc_transport* t) {
INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed); INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
grpc_connectivity_state_set( grpc_connectivity_state_set(&t->connectivity, GRPC_CHANNEL_SHUTDOWN,
&t->connectivity, GRPC_CHANNEL_SHUTDOWN, "close transport");
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Closing transport."),
"close transport");
if (!t->is_closed) { if (!t->is_closed) {
t->is_closed = true; t->is_closed = true;
/* Also end all streams on this transport */ /* Also end all streams on this transport */

@ -48,7 +48,6 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker,
grpc_connectivity_state init_state, grpc_connectivity_state init_state,
const char* name) { const char* name) {
gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state); gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state);
tracker->current_error = GRPC_ERROR_NONE;
tracker->watchers = nullptr; tracker->watchers = nullptr;
tracker->name = gpr_strdup(name); tracker->name = gpr_strdup(name);
} }
@ -69,7 +68,6 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker) {
GRPC_CLOSURE_SCHED(w->notify, error); GRPC_CLOSURE_SCHED(w->notify, error);
gpr_free(w); gpr_free(w);
} }
GRPC_ERROR_UNREF(tracker->current_error);
gpr_free(tracker->name); gpr_free(tracker->name);
} }
@ -84,20 +82,6 @@ grpc_connectivity_state grpc_connectivity_state_check(
return cur; return cur;
} }
grpc_connectivity_state grpc_connectivity_state_get(
grpc_connectivity_state_tracker* tracker, grpc_error** error) {
grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
gpr_atm_no_barrier_load(&tracker->current_state_atm));
if (grpc_connectivity_state_trace.enabled()) {
gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name,
grpc_connectivity_state_name(cur));
}
if (error != nullptr) {
*error = GRPC_ERROR_REF(tracker->current_error);
}
return cur;
}
bool grpc_connectivity_state_has_watchers( bool grpc_connectivity_state_has_watchers(
grpc_connectivity_state_tracker* connectivity_state) { grpc_connectivity_state_tracker* connectivity_state) {
return connectivity_state->watchers != nullptr; return connectivity_state->watchers != nullptr;
@ -140,7 +124,7 @@ bool grpc_connectivity_state_notify_on_state_change(
} else { } else {
if (cur != *current) { if (cur != *current) {
*current = cur; *current = cur;
GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_REF(tracker->current_error)); GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_NONE);
} else { } else {
grpc_connectivity_state_watcher* w = grpc_connectivity_state_watcher* w =
static_cast<grpc_connectivity_state_watcher*>(gpr_malloc(sizeof(*w))); static_cast<grpc_connectivity_state_watcher*>(gpr_malloc(sizeof(*w)));
@ -155,29 +139,15 @@ bool grpc_connectivity_state_notify_on_state_change(
void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker, void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
grpc_connectivity_state state, grpc_connectivity_state state,
grpc_error* error, const char* reason) { const char* reason) {
grpc_connectivity_state cur = static_cast<grpc_connectivity_state>( grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
gpr_atm_no_barrier_load(&tracker->current_state_atm)); gpr_atm_no_barrier_load(&tracker->current_state_atm));
grpc_connectivity_state_watcher* w; grpc_connectivity_state_watcher* w;
if (grpc_connectivity_state_trace.enabled()) { if (grpc_connectivity_state_trace.enabled()) {
const char* error_string = grpc_error_string(error); gpr_log(GPR_INFO, "SET: %p %s: %s --> %s [%s]", tracker, tracker->name,
gpr_log(GPR_INFO, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker, grpc_connectivity_state_name(cur),
tracker->name, grpc_connectivity_state_name(cur), grpc_connectivity_state_name(state), reason);
grpc_connectivity_state_name(state), reason, error, error_string);
}
switch (state) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_READY:
GPR_ASSERT(error == GRPC_ERROR_NONE);
break;
case GRPC_CHANNEL_SHUTDOWN:
case GRPC_CHANNEL_TRANSIENT_FAILURE:
GPR_ASSERT(error != GRPC_ERROR_NONE);
break;
} }
GRPC_ERROR_UNREF(tracker->current_error);
tracker->current_error = error;
if (cur == state) { if (cur == state) {
return; return;
} }
@ -189,7 +159,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
if (grpc_connectivity_state_trace.enabled()) { if (grpc_connectivity_state_trace.enabled()) {
gpr_log(GPR_INFO, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify); gpr_log(GPR_INFO, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify);
} }
GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_REF(tracker->current_error)); GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_NONE);
gpr_free(w); gpr_free(w);
} }
} }

@ -37,8 +37,6 @@ typedef struct grpc_connectivity_state_watcher {
typedef struct { typedef struct {
/** current grpc_connectivity_state */ /** current grpc_connectivity_state */
gpr_atm current_state_atm; gpr_atm current_state_atm;
/** error associated with state */
grpc_error* current_error;
/** all our watchers */ /** all our watchers */
grpc_connectivity_state_watcher* watchers; grpc_connectivity_state_watcher* watchers;
/** a name to help debugging */ /** a name to help debugging */
@ -59,7 +57,6 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker);
* external lock */ * external lock */
void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker, void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
grpc_connectivity_state state, grpc_connectivity_state state,
grpc_error* associated_error,
const char* reason); const char* reason);
/** Return true if this connectivity state has watchers. /** Return true if this connectivity state has watchers.
@ -71,11 +68,6 @@ bool grpc_connectivity_state_has_watchers(
grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker* tracker); grpc_connectivity_state_tracker* tracker);
/** Return the last seen connectivity state, and the associated error.
Access must be serialized with an external lock. */
grpc_connectivity_state grpc_connectivity_state_get(
grpc_connectivity_state_tracker* tracker, grpc_error** error);
/** Return 1 if the channel should start connecting, 0 otherwise. /** Return 1 if the channel should start connecting, 0 otherwise.
If current==NULL cancel notify if it is already queued (success==0 in that If current==NULL cancel notify if it is already queued (success==0 in that
case). case).

@ -60,13 +60,9 @@ static void test_connectivity_state_name(void) {
static void test_check(void) { static void test_check(void) {
grpc_connectivity_state_tracker tracker; grpc_connectivity_state_tracker tracker;
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_error* error;
gpr_log(GPR_DEBUG, "test_check"); gpr_log(GPR_DEBUG, "test_check");
grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx"); grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx");
GPR_ASSERT(grpc_connectivity_state_get(&tracker, &error) ==
GRPC_CHANNEL_IDLE);
GPR_ASSERT(grpc_connectivity_state_check(&tracker) == GRPC_CHANNEL_IDLE); GPR_ASSERT(grpc_connectivity_state_check(&tracker) == GRPC_CHANNEL_IDLE);
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_connectivity_state_destroy(&tracker); grpc_connectivity_state_destroy(&tracker);
} }

@ -150,12 +150,11 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
return parent_->channel_control_helper()->CreateChannel(target, args); return parent_->channel_control_helper()->CreateChannel(target, args);
} }
void UpdateState(grpc_connectivity_state state, grpc_error* state_error, void UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) override { UniquePtr<SubchannelPicker> picker) override {
parent_->channel_control_helper()->UpdateState( parent_->channel_control_helper()->UpdateState(
state, state_error, state, UniquePtr<SubchannelPicker>(
UniquePtr<SubchannelPicker>( New<Picker>(std::move(picker), cb_, user_data_)));
New<Picker>(std::move(picker), cb_, user_data_)));
} }
void RequestReresolution() override { void RequestReresolution() override {

Loading…
Cancel
Save