As per review

reviewable/pr16306/r2
Juanli Shen 6 years ago
parent 9b72650125
commit 186df431de
  1. 28
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 20
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  3. 21
      test/cpp/end2end/client_lb_end2end_test.cc

@ -84,7 +84,7 @@ class PickFirst : public LoadBalancingPolicy {
// Processes the connectivity change to READY for an unselected subchannel. // Processes the connectivity change to READY for an unselected subchannel.
void ProcessUnselectedReadyLocked(); void ProcessUnselectedReadyLocked();
void StartConnectivityWatchLocked() override; void CheckConnectivityStateAndStartWatchingLocked();
}; };
class PickFirstSubchannelList class PickFirstSubchannelList
@ -252,7 +252,8 @@ void PickFirst::StartPickingLocked() {
if (subchannel_list_ != nullptr) { if (subchannel_list_ != nullptr) {
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
subchannel_list_->subchannel(i)->StartConnectivityWatchLocked(); subchannel_list_->subchannel(i)
->CheckConnectivityStateAndStartWatchingLocked();
break; break;
} }
} }
@ -391,7 +392,8 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// If we've started picking, start trying to connect to the first // If we've started picking, start trying to connect to the first
// subchannel in the new list. // subchannel in the new list.
if (started_picking_) { if (started_picking_) {
subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); subchannel_list_->subchannel(0)
->CheckConnectivityStateAndStartWatchingLocked();
} }
} else { } else {
// We do have a selected subchannel. // We do have a selected subchannel.
@ -445,7 +447,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// subchannel in the new list. // subchannel in the new list.
if (started_picking_) { if (started_picking_) {
latest_pending_subchannel_list_->subchannel(0) latest_pending_subchannel_list_->subchannel(0)
->StartConnectivityWatchLocked(); ->CheckConnectivityStateAndStartWatchingLocked();
} }
} }
} }
@ -545,7 +547,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "exhausted_subchannels"); GRPC_ERROR_REF(error), "exhausted_subchannels");
} }
sd->StartConnectivityWatchLocked(); sd->CheckConnectivityStateAndStartWatchingLocked();
break; break;
} }
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING:
@ -568,8 +570,15 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
GPR_ASSERT(p->selected_ != this); if (p->selected_ == this) {
GPR_ASSERT(connectivity_state() == GRPC_CHANNEL_READY); if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_ERROR,
"Pick First %p calling ProcessUnselectedReadyLocked() on "
"selected subchannel %p",
p, subchannel());
}
return;
}
// If we get here, there are two possible cases: // If we get here, there are two possible cases:
// 1. We do not currently have a selected subchannel, and the update is // 1. We do not currently have a selected subchannel, and the update is
// for a subchannel in p->subchannel_list_ that we're trying to // for a subchannel in p->subchannel_list_ that we're trying to
@ -614,7 +623,8 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
} }
} }
void PickFirst::PickFirstSubchannelData::StartConnectivityWatchLocked() { void PickFirst::PickFirstSubchannelData::
CheckConnectivityStateAndStartWatchingLocked() {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
if (p->selected_ != this && if (p->selected_ != this &&
@ -625,7 +635,7 @@ void PickFirst::PickFirstSubchannelData::StartConnectivityWatchLocked() {
ProcessUnselectedReadyLocked(); ProcessUnselectedReadyLocked();
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
SubchannelData::StartConnectivityWatchLocked(); StartConnectivityWatchLocked();
} }
// //

@ -115,7 +115,7 @@ class SubchannelData {
// Starts watching the connectivity state of the subchannel. // Starts watching the connectivity state of the subchannel.
// ProcessConnectivityChangeLocked() will be called when the // ProcessConnectivityChangeLocked() will be called when the
// connectivity state changes. // connectivity state changes.
virtual void StartConnectivityWatchLocked(); void StartConnectivityWatchLocked();
// Renews watching the connectivity state of the subchannel. // Renews watching the connectivity state of the subchannel.
void RenewConnectivityWatchLocked(); void RenewConnectivityWatchLocked();
@ -154,10 +154,6 @@ class SubchannelData {
grpc_connectivity_state connectivity_state, grpc_connectivity_state connectivity_state,
grpc_error* error) GRPC_ABSTRACT; grpc_error* error) GRPC_ABSTRACT;
// Returns the connectivity state. Must be called only while there is no
// connectivity notification pending.
grpc_connectivity_state connectivity_state() const;
private: private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_. // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
// Returns true if the connectivity state should be reported. // Returns true if the connectivity state should be reported.
@ -320,13 +316,6 @@ void SubchannelData<SubchannelListType,
} }
} }
template <typename SubchannelListType, typename SubchannelDataType>
grpc_connectivity_state SubchannelData<
SubchannelListType, SubchannelDataType>::connectivity_state() const {
GPR_ASSERT(!connectivity_notification_pending_);
return pending_connectivity_state_unsafe_;
}
template <typename SubchannelListType, typename SubchannelDataType> template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, void SubchannelData<SubchannelListType,
SubchannelDataType>::StartConnectivityWatchLocked() { SubchannelDataType>::StartConnectivityWatchLocked() {
@ -361,8 +350,7 @@ void SubchannelData<SubchannelListType,
subchannel_, subchannel_,
grpc_connectivity_state_name(pending_connectivity_state_unsafe_)); grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
} }
GPR_ASSERT(!connectivity_notification_pending_); GPR_ASSERT(connectivity_notification_pending_);
connectivity_notification_pending_ = true;
grpc_subchannel_notify_on_state_change( grpc_subchannel_notify_on_state_change(
subchannel_, subchannel_list_->policy()->interested_parties(), subchannel_, subchannel_list_->policy()->interested_parties(),
&pending_connectivity_state_unsafe_, &connectivity_changed_closure_); &pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
@ -379,7 +367,8 @@ void SubchannelData<SubchannelListType,
subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_); subchannel_);
} }
GPR_ASSERT(!connectivity_notification_pending_); GPR_ASSERT(connectivity_notification_pending_);
connectivity_notification_pending_ = false;
subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch"); subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch");
} }
@ -453,7 +442,6 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_), grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_),
grpc_error_string(error), sd->subchannel_list_->shutting_down()); grpc_error_string(error), sd->subchannel_list_->shutting_down());
} }
sd->connectivity_notification_pending_ = false;
// If shutting down, unref subchannel and stop watching. // If shutting down, unref subchannel and stop watching.
if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) { if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) {
sd->UnrefSubchannelLocked("connectivity_shutdown"); sd->UnrefSubchannelLocked("connectivity_shutdown");

@ -291,6 +291,17 @@ class ClientLbEnd2endTest : public ::testing::Test {
ResetCounters(); ResetCounters();
} }
bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
const gpr_timespec deadline =
grpc_timeout_seconds_to_deadline(timeout_seconds);
grpc_connectivity_state state;
while ((state = channel->GetState(false /* try_to_connect */)) ==
GRPC_CHANNEL_READY) {
if (!channel->WaitForStateChange(state, deadline)) return false;
}
return true;
}
bool SeenAllServers() { bool SeenAllServers() {
for (const auto& server : servers_) { for (const auto& server : servers_) {
if (server->service_.request_count() == 0) return false; if (server->service_.request_count() == 0) return false;
@ -590,18 +601,22 @@ TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
gpr_log(GPR_INFO, "****** SERVER RESTARTED *******"); gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
auto channel_2 = BuildChannel("pick_first"); auto channel_2 = BuildChannel("pick_first");
auto stub_2 = BuildStub(channel_2); auto stub_2 = BuildStub(channel_2);
// TODO(juanlishen): This resolution result will only be visible to channel 2
// since the response generator is only associated with channel 2 now. We
// should change the response generator to be able to deliver updates to
// multiple channels at once.
SetNextResolution(ports); SetNextResolution(ports);
gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******"); gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
WaitForServer(stub_2, 0, DEBUG_LOCATION, true); WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******"); gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
servers_[0]->Shutdown(); servers_[0]->Shutdown();
// Wait until the disconnection has triggered the connectivity notification.
// Otherwise, the subchannel may be picked for next call but will fail soon.
EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
// Channel 2 will also receive a re-resolution containing the same server. // Channel 2 will also receive a re-resolution containing the same server.
// Both channels will ref the same subchannel that failed. // Both channels will ref the same subchannel that failed.
servers_.clear(); servers_.clear();
StartServers(1, ports); StartServers(1, ports);
// Wait for a while so that the disconnection has triggered the connectivity
// notification. Otherwise, the subchannel may be picked but will fail soon.
sleep(1);
gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******"); gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******"); gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
// The first call after the server restart will succeed. // The first call after the server restart will succeed.

Loading…
Cancel
Save