PF: Check connectivity state before watching

reviewable/pr16306/r1
Juanli Shen 6 years ago
parent 4bdb0e398c
commit 9b72650125
  1. 103
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 20
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  3. 40
      test/cpp/end2end/client_lb_end2end_test.cc

@ -80,6 +80,11 @@ class PickFirst : public LoadBalancingPolicy {
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
// Processes the connectivity change to READY for an unselected subchannel.
void ProcessUnselectedReadyLocked();
void StartConnectivityWatchLocked() override;
};
class PickFirstSubchannelList
@ -519,41 +524,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// select in place of the current one.
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_.
if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
"replace %p",
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
p->selected_ = this;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
subchannel());
}
// Drop all other subchannels, since we are now connected.
p->DestroyUnselectedSubchannelsLocked();
// Update any calls that were waiting for a pick.
PickState* pick;
while ((pick = p->pending_picks_)) {
p->pending_picks_ = pick->next;
pick->connected_subchannel =
p->selected_->connected_subchannel()->Ref();
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
p->selected_->subchannel());
}
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
ProcessUnselectedReadyLocked();
// Renew notification.
RenewConnectivityWatchLocked();
break;
@ -595,6 +566,68 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
GRPC_ERROR_UNREF(error);
}
void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
GPR_ASSERT(p->selected_ != this);
GPR_ASSERT(connectivity_state() == GRPC_CHANNEL_READY);
// If we get here, there are two possible cases:
// 1. We do not currently have a selected subchannel, and the update is
// for a subchannel in p->subchannel_list_ that we're trying to
// connect to. The goal here is to find a subchannel that we can
// select.
// 2. We do currently have a selected subchannel, and the update is
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
subchannel_list() == p->latest_pending_subchannel_list_.get());
// Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
"replace %p",
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "subchannel_ready");
p->selected_ = this;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
}
// Drop all other subchannels, since we are now connected.
p->DestroyUnselectedSubchannelsLocked();
// Update any calls that were waiting for a pick.
PickState* pick;
while ((pick = p->pending_picks_)) {
p->pending_picks_ = pick->next;
pick->connected_subchannel = p->selected_->connected_subchannel()->Ref();
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p",
p->selected_->subchannel());
}
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
}
void PickFirst::PickFirstSubchannelData::StartConnectivityWatchLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
grpc_error* error = GRPC_ERROR_NONE;
if (p->selected_ != this &&
CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
// We must process the READY subchannel before we start watching it.
// Otherwise, we won't know it's READY because we will be waiting for its
// connectivity state to change from READY.
ProcessUnselectedReadyLocked();
}
GRPC_ERROR_UNREF(error);
SubchannelData::StartConnectivityWatchLocked();
}
//
// factory
//

@ -115,7 +115,7 @@ class SubchannelData {
// Starts watching the connectivity state of the subchannel.
// ProcessConnectivityChangeLocked() will be called when the
// connectivity state changes.
void StartConnectivityWatchLocked();
virtual void StartConnectivityWatchLocked();
// Renews watching the connectivity state of the subchannel.
void RenewConnectivityWatchLocked();
@ -154,6 +154,10 @@ class SubchannelData {
grpc_connectivity_state connectivity_state,
grpc_error* error) GRPC_ABSTRACT;
// Returns the connectivity state. Must be called only while there is no
// connectivity notification pending.
grpc_connectivity_state connectivity_state() const;
private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
// Returns true if the connectivity state should be reported.
@ -316,6 +320,13 @@ void SubchannelData<SubchannelListType,
}
}
template <typename SubchannelListType, typename SubchannelDataType>
grpc_connectivity_state SubchannelData<
SubchannelListType, SubchannelDataType>::connectivity_state() const {
GPR_ASSERT(!connectivity_notification_pending_);
return pending_connectivity_state_unsafe_;
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType,
SubchannelDataType>::StartConnectivityWatchLocked() {
@ -350,7 +361,8 @@ void SubchannelData<SubchannelListType,
subchannel_,
grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
}
GPR_ASSERT(connectivity_notification_pending_);
GPR_ASSERT(!connectivity_notification_pending_);
connectivity_notification_pending_ = true;
grpc_subchannel_notify_on_state_change(
subchannel_, subchannel_list_->policy()->interested_parties(),
&pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
@ -367,8 +379,7 @@ void SubchannelData<SubchannelListType,
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_);
}
GPR_ASSERT(connectivity_notification_pending_);
connectivity_notification_pending_ = false;
GPR_ASSERT(!connectivity_notification_pending_);
subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch");
}
@ -442,6 +453,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_),
grpc_error_string(error), sd->subchannel_list_->shutting_down());
}
sd->connectivity_notification_pending_ = false;
// If shutting down, unref subchannel and stop watching.
if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) {
sd->UnrefSubchannelLocked("connectivity_shutdown");

@ -573,6 +573,46 @@ TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports);
auto channel_1 = BuildChannel("pick_first");
auto stub_1 = BuildStub(channel_1);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
WaitForServer(stub_1, 0, DEBUG_LOCATION);
gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
servers_[0]->Shutdown();
// Channel 1 will receive a re-resolution containing the same server. It will
// create a new subchannel and hold a ref to it.
servers_.clear();
StartServers(1, ports);
gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
auto channel_2 = BuildChannel("pick_first");
auto stub_2 = BuildStub(channel_2);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
servers_[0]->Shutdown();
// Channel 2 will also receive a re-resolution containing the same server.
// Both channels will ref the same subchannel that failed.
servers_.clear();
StartServers(1, ports);
// Wait for a while so that the disconnection has triggered the connectivity
// notification. Otherwise, the subchannel may be picked but will fail soon.
sleep(1);
gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
// The first call after the server restart will succeed.
CheckRpcSendOk(stub_2, DEBUG_LOCATION);
gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******");
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobin) {
// Start servers and send one RPC per server.
const int kNumServers = 3;

Loading…
Cancel
Save