Revert "Revert "Revert "grpclb re-resolution"""

pull/14412/head
David G. Quintas 7 years ago committed by GitHub
parent 1d168541fd
commit f66cf0d70d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      include/grpc/impl/codegen/grpc_types.h
  2. 7
      src/core/ext/filters/client_channel/lb_policy.cc
  3. 4
      src/core/ext/filters/client_channel/lb_policy.h
  4. 133
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 11
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  6. 11
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  7. 143
      test/cpp/end2end/grpclb_end2end_test.cc

@ -296,7 +296,7 @@ typedef struct {
#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms"
/* Timeout in milliseconds to wait for the serverlist from the grpclb load
balancer before using fallback backend addresses from the resolver.
If 0, fallback will never be used. Default value is 10000. */
If 0, fallback will never be used. */
#define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms"
/** If non-zero, grpc server's cronet compression workaround will be enabled */
#define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \

@ -118,8 +118,7 @@ void grpc_lb_policy_update_locked(grpc_lb_policy* policy,
void grpc_lb_policy_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution) {
GPR_ASSERT(policy->request_reresolution == nullptr);
policy->request_reresolution = request_reresolution;
policy->vtable->set_reresolve_closure_locked(policy, request_reresolution);
}
void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
@ -134,8 +133,8 @@ void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
grpc_lb_trace->name(), policy, grpc_error_string(error));
}
} else {
if (grpc_lb_trace->enabled()) {
gpr_log(GPR_DEBUG, "%s %p: no available re-resolution closure.",
if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) {
gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.",
grpc_lb_trace->name(), policy);
}
}

@ -107,6 +107,10 @@ struct grpc_lb_policy_vtable {
void (*update_locked)(grpc_lb_policy* policy,
const grpc_lb_policy_args* args);
/** \see grpc_lb_policy_set_reresolve_closure */
void (*set_reresolve_closure_locked)(grpc_lb_policy* policy,
grpc_closure* request_reresolution);
};
#ifndef NDEBUG

@ -249,7 +249,7 @@ typedef struct glb_lb_policy {
/** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy* rr_policy;
/** the connectivity state of the embedded RR policy */
grpc_closure on_rr_connectivity_changed;
grpc_connectivity_state rr_connectivity_state;
bool started_picking;
@ -292,12 +292,6 @@ typedef struct glb_lb_policy {
/** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed;
/** called upon changes to the RR's connectivity. */
grpc_closure rr_on_connectivity_changed;
/** called upon reresolution request from the RR policy. */
grpc_closure rr_on_reresolution_requested;
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
@ -596,8 +590,9 @@ static grpc_lb_addresses* extract_backend_addresses_locked(
return backend_addresses;
}
static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy,
grpc_error* rr_state_error) {
static void update_lb_connectivity_status_locked(
glb_lb_policy* glb_policy, grpc_connectivity_state rr_state,
grpc_error* rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
/* The new connectivity status is a function of the previous one and the new
@ -629,7 +624,7 @@ static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy,
*
* (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
switch (glb_policy->rr_connectivity_state) {
switch (rr_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN:
GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
@ -643,12 +638,11 @@ static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy,
gpr_log(
GPR_INFO,
"[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
glb_policy,
grpc_connectivity_state_name(glb_policy->rr_connectivity_state),
glb_policy, grpc_connectivity_state_name(rr_state),
glb_policy->rr_policy);
}
grpc_connectivity_state_set(&glb_policy->state_tracker,
glb_policy->rr_connectivity_state, rr_state_error,
grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state,
rr_state_error,
"update_lb_connectivity_status_locked");
}
@ -746,36 +740,11 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) {
gpr_free(args);
}
static void rr_on_reresolution_requested_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (glb_policy->shutting_down || error != GRPC_ERROR_NONE) {
GRPC_LB_POLICY_UNREF(&glb_policy->base,
"rr_on_reresolution_requested_locked");
return;
}
if (grpc_lb_glb_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
glb_policy, glb_policy->rr_policy);
}
// If we are talking to a balancer, we expect to get updated addresses form
// the balancer, so we can ignore the re-resolution request from the RR
// policy. Otherwise, handle the re-resolution request using glb's original
// re-resolution closure.
if (glb_policy->lb_calld == nullptr ||
!glb_policy->lb_calld->seen_initial_response) {
grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
GRPC_ERROR_NONE);
}
// Give back the wrapper closure to the RR policy.
grpc_lb_policy_set_reresolve_closure_locked(
glb_policy->rr_policy, &glb_policy->rr_on_reresolution_requested);
}
static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error);
static void create_rr_locked(glb_lb_policy* glb_policy,
grpc_lb_policy_args* args) {
GPR_ASSERT(glb_policy->rr_policy == nullptr);
grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args);
if (new_rr_policy == nullptr) {
gpr_log(GPR_ERROR,
@ -788,25 +757,29 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
glb_policy->rr_policy);
return;
}
GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_reresolution_requested_locked");
grpc_lb_policy_set_reresolve_closure_locked(
new_rr_policy, &glb_policy->rr_on_reresolution_requested);
new_rr_policy, glb_policy->base.request_reresolution);
glb_policy->base.request_reresolution = nullptr;
glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr;
glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked(
glb_policy->rr_policy, &rr_state_error);
/* Connectivity state is a function of the RR policy updated/created */
update_lb_connectivity_status_locked(glb_policy, rr_state_error);
update_lb_connectivity_status_locked(
glb_policy, glb_policy->rr_connectivity_state, rr_state_error);
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed,
on_rr_connectivity_changed_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
/* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_connectivity_changed_locked");
GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change_locked(
glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
&glb_policy->rr_on_connectivity_changed);
&glb_policy->on_rr_connectivity_changed);
grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
// Send pending picks to RR policy.
pending_pick* pp;
@ -854,18 +827,28 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) {
lb_policy_args_destroy(args);
}
static void rr_on_connectivity_changed_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg);
if (glb_policy->shutting_down) {
GRPC_LB_POLICY_UNREF(&glb_policy->base,
"rr_on_connectivity_changed_locked");
GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
return;
}
if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* An RR policy that has transitioned into the SHUTDOWN connectivity state
* should not be considered for picks or updates: the SHUTDOWN state is a
* sink, policies can't transition back from it. .*/
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown");
glb_policy->rr_policy = nullptr;
GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
return;
}
update_lb_connectivity_status_locked(glb_policy, GRPC_ERROR_REF(error));
// Resubscribe. Reuse the "rr_on_connectivity_changed_locked" ref.
/* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
update_lb_connectivity_status_locked(
glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error));
/* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */
grpc_lb_policy_notify_on_state_change_locked(
glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
&glb_policy->rr_on_connectivity_changed);
&glb_policy->on_rr_connectivity_changed);
}
static void destroy_balancer_name(void* balancer_name) {
@ -988,6 +971,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
if (glb_policy->rr_policy != nullptr) {
grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr);
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
} else {
grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
}
// We destroy the LB channel here because
// glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
@ -999,7 +984,6 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
}
grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "glb_shutdown");
grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
// Clear pending picks.
pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = nullptr;
@ -1639,8 +1623,6 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
lb_calld, lb_calld->lb_call, grpc_error_string(error));
gpr_free(status_details);
}
grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
GRPC_ERROR_NONE);
// If this lb_calld is still in use, this call ended because of a failure so
// we want to retry connecting. Otherwise, we have deliberately ended this
// call and no further action is required.
@ -1669,15 +1651,16 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
glb_policy->fallback_timer_callback_pending = false;
/* If we receive a serverlist after the timer fires but before this callback
* actually runs, don't fall back. */
if (glb_policy->serverlist == nullptr && !glb_policy->shutting_down &&
error == GRPC_ERROR_NONE) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Falling back to use backends from resolver",
glb_policy);
if (glb_policy->serverlist == nullptr) {
if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Falling back to use backends from resolver",
glb_policy);
}
GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
rr_handover_locked(glb_policy);
}
GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
rr_handover_locked(glb_policy);
}
GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
}
@ -1798,6 +1781,19 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
}
}
static void glb_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution) {
glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(policy);
GPR_ASSERT(!glb_policy->shutting_down);
GPR_ASSERT(glb_policy->base.request_reresolution == nullptr);
if (glb_policy->rr_policy != nullptr) {
grpc_lb_policy_set_reresolve_closure_locked(glb_policy->rr_policy,
request_reresolution);
} else {
glb_policy->base.request_reresolution = request_reresolution;
}
}
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
@ -1809,7 +1805,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_exit_idle_locked,
glb_check_connectivity_locked,
glb_notify_on_state_change_locked,
glb_update_locked};
glb_update_locked,
glb_set_reresolve_closure_locked};
static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
@ -1890,12 +1887,6 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
return nullptr;
}
grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&glb_policy->rr_on_connectivity_changed,
rr_on_connectivity_changed_locked, glb_policy,
grpc_combiner_scheduler(args->combiner));
GRPC_CLOSURE_INIT(&glb_policy->rr_on_reresolution_requested,
rr_on_reresolution_requested_locked, glb_policy,
grpc_combiner_scheduler(args->combiner));
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));

@ -520,6 +520,14 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
}
}
static void pf_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution) {
pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(policy);
GPR_ASSERT(!p->shutdown);
GPR_ASSERT(policy->request_reresolution == nullptr);
policy->request_reresolution = request_reresolution;
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown_locked,
@ -530,7 +538,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_exit_idle_locked,
pf_check_connectivity_locked,
pf_notify_on_state_change_locked,
pf_update_locked};
pf_update_locked,
pf_set_reresolve_closure_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {}

@ -620,6 +620,14 @@ static void rr_update_locked(grpc_lb_policy* policy,
}
}
static void rr_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution) {
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(policy);
GPR_ASSERT(!p->shutdown);
GPR_ASSERT(policy->request_reresolution == nullptr);
policy->request_reresolution = request_reresolution;
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy,
rr_shutdown_locked,
@ -630,7 +638,8 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_exit_idle_locked,
rr_check_connectivity_locked,
rr_notify_on_state_change_locked,
rr_update_locked};
rr_update_locked,
rr_set_reresolve_closure_locked};
static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}

@ -192,7 +192,6 @@ class BalancerServiceImpl : public BalancerService {
gpr_log(GPR_INFO, "LB[%p]: recv msg '%s'", this,
request.DebugString().c_str());
// TODO(juanlishen): Initial response should always be the first response.
if (client_load_reporting_interval_seconds_ > 0) {
LoadBalanceResponse initial_response;
initial_response.mutable_initial_response()
@ -445,7 +444,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
void WaitForBackend(size_t backend_idx) {
do {
SendRpc();
CheckRpcSendOk();
} while (backends_[backend_idx]->request_count() == 0);
ResetBackendCounters();
}
@ -664,6 +663,9 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent two responses.
EXPECT_EQ(2U, balancer_servers_[0].service_->response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(SingleBalancerTest, Fallback) {
@ -872,6 +874,8 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
// machinery to either update the LB responses "on the fly" or instruct
// backends which ports to restart on.
CheckRpcSendFailure();
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
class UpdatesTest : public GrpclbEnd2endTest {
@ -935,6 +939,8 @@ TEST_F(UpdatesTest, UpdateBalancers) {
EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
// Send an update with the same set of LBs as the one in SetUp() in order to
@ -1006,6 +1012,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
// doesn't assign the second backend.
EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
balancers_[0]->NotifyDoneWithServerlists();
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
@ -1088,134 +1097,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U);
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
}
TEST_F(UpdatesTest, ReresolveDeadBalancer) {
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
SetNextResolution(addresses);
addresses.clear();
addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
SetNextReresolutionResponse(addresses);
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
// Kill backend 0.
gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************");
if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown();
gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************");
CheckRpcSendFailure();
balancers_[1]->NotifyDoneWithServerlists();
balancers_[2]->NotifyDoneWithServerlists();
EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
// Kill balancer 0.
gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************");
balancers_[0]->NotifyDoneWithServerlists();
if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown();
gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************");
balancers_[0]->NotifyDoneWithServerlists();
balancers_[1]->NotifyDoneWithServerlists();
balancers_[2]->NotifyDoneWithServerlists();
// Balancer 0 got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
// Balancer 1 may have received a request if re-resolution is done quickly
// enough.
EXPECT_GE(balancer_servers_[1].service_->request_count(), 0U);
EXPECT_GE(balancer_servers_[1].service_->response_count(), 0U);
EXPECT_LE(balancer_servers_[1].service_->request_count(), 1U);
EXPECT_LE(balancer_servers_[1].service_->response_count(), 1U);
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
// Wait until re-resolution has finished, as signaled by the second backend
// receiving a request.
WaitForBackend(1);
// This is serviced by the new serverlist.
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
balancers_[0]->NotifyDoneWithServerlists();
balancers_[1]->NotifyDoneWithServerlists();
balancers_[2]->NotifyDoneWithServerlists();
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
}
TEST_F(UpdatesTest, ReresolveDeadBackend) {
ResetStub(500);
// The first resolution contains the addresses of a balancer that never
// responds, and a fallback backend.
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
addresses.emplace_back(AddressData{backend_servers_[0].port_, false, ""});
SetNextResolution(addresses);
// The re-resolution result will contain the addresses of the same balancer
// and a new fallback backend.
addresses.clear();
addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
addresses.emplace_back(AddressData{backend_servers_[1].port_, false, ""});
SetNextReresolutionResponse(addresses);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// All 10 requests should have gone to the fallback backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
// Kill backend 0.
gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************");
if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown();
gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************");
// Wait until re-resolution has finished, as signaled by the second backend
// receiving a request.
WaitForBackend(1);
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
balancers_[0]->NotifyDoneWithServerlists();
balancers_[1]->NotifyDoneWithServerlists();
balancers_[2]->NotifyDoneWithServerlists();
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[0].service_->response_count());
EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(SingleBalancerTest, Drop) {

Loading…
Cancel
Save