grpclb in charge of its re-resolution

pull/13671/head
Juanli Shen 7 years ago
parent e17592c929
commit 33cdd57ab7
  1. 2
      include/grpc/impl/codegen/grpc_types.h
  2. 7
      src/core/ext/filters/client_channel/lb_policy.cc
  3. 4
      src/core/ext/filters/client_channel/lb_policy.h
  4. 117
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 11
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  6. 11
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  7. 161
      test/cpp/end2end/grpclb_end2end_test.cc

@ -296,7 +296,7 @@ typedef struct {
#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms"
/* Timeout in milliseconds to wait for the serverlist from the grpclb load
balancer before using fallback backend addresses from the resolver.
If 0, fallback will never be used. */
If 0, fallback will never be used. Default value is 10000. */
#define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms"
/** If non-zero, grpc server's cronet compression workaround will be enabled */
#define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \

@ -118,7 +118,8 @@ void grpc_lb_policy_update_locked(grpc_lb_policy* policy,
void grpc_lb_policy_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution) {
policy->vtable->set_reresolve_closure_locked(policy, request_reresolution);
GPR_ASSERT(policy->request_reresolution == nullptr);
policy->request_reresolution = request_reresolution;
}
void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
@ -133,8 +134,8 @@ void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
grpc_lb_trace->name(), policy, grpc_error_string(error));
}
} else {
if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) {
gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.",
if (grpc_lb_trace->enabled()) {
gpr_log(GPR_DEBUG, "%s %p: no available re-resolution closure.",
grpc_lb_trace->name(), policy);
}
}

@ -107,10 +107,6 @@ struct grpc_lb_policy_vtable {
void (*update_locked)(grpc_lb_policy* policy,
const grpc_lb_policy_args* args);
/** \see grpc_lb_policy_set_reresolve_closure */
void (*set_reresolve_closure_locked)(grpc_lb_policy* policy,
grpc_closure* request_reresolution);
};
#ifndef NDEBUG

@ -247,7 +247,7 @@ typedef struct glb_lb_policy {
/** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy* rr_policy;
grpc_closure on_rr_connectivity_changed;
/** the connectivity state of the embedded RR policy */
grpc_connectivity_state rr_connectivity_state;
bool started_picking;
@ -290,6 +290,12 @@ typedef struct glb_lb_policy {
/** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed;
/** called upon changes to the RR's connectivity. */
grpc_closure rr_on_connectivity_changed;
/** called upon reresolution request from the RR policy. */
grpc_closure rr_on_reresolution_requested;
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
@ -584,8 +590,7 @@ static grpc_lb_addresses* extract_backend_addresses_locked(
return backend_addresses;
}
static void update_lb_connectivity_status_locked(
glb_lb_policy* glb_policy, grpc_connectivity_state rr_state,
static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy,
grpc_error* rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
@ -618,7 +623,7 @@ static void update_lb_connectivity_status_locked(
*
* (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
switch (rr_state) {
switch (glb_policy->rr_connectivity_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN:
GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
@ -632,11 +637,12 @@ static void update_lb_connectivity_status_locked(
gpr_log(
GPR_INFO,
"[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
glb_policy, grpc_connectivity_state_name(rr_state),
glb_policy,
grpc_connectivity_state_name(glb_policy->rr_connectivity_state),
glb_policy->rr_policy);
}
grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state,
rr_state_error,
grpc_connectivity_state_set(&glb_policy->state_tracker,
glb_policy->rr_connectivity_state, rr_state_error,
"update_lb_connectivity_status_locked");
}
@ -733,11 +739,36 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) {
gpr_free(args);
}
static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error);
static void rr_on_reresolution_requested_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (glb_policy->shutting_down || error != GRPC_ERROR_NONE) {
GRPC_LB_POLICY_UNREF(&glb_policy->base,
"rr_on_reresolution_requested_locked");
return;
}
if (grpc_lb_glb_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
glb_policy, glb_policy->rr_policy);
}
// If we are talking to a balancer, we expect to get updated addresses form
// the balancer, so we can ignore the re-resolution request from the RR
// policy. Otherwise, handle the re-resolution request using glb's original
// re-resolution closure.
if (glb_policy->lb_calld == nullptr ||
!glb_policy->lb_calld->seen_initial_response) {
grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
GRPC_ERROR_NONE);
}
// Give back the wrapper closure to the RR policy.
grpc_lb_policy_set_reresolve_closure_locked(
glb_policy->rr_policy, &glb_policy->rr_on_reresolution_requested);
}
static void create_rr_locked(glb_lb_policy* glb_policy,
grpc_lb_policy_args* args) {
GPR_ASSERT(glb_policy->rr_policy == nullptr);
grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args);
if (new_rr_policy == nullptr) {
gpr_log(GPR_ERROR,
@ -750,29 +781,25 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
glb_policy->rr_policy);
return;
}
GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_reresolution_requested_locked");
grpc_lb_policy_set_reresolve_closure_locked(
new_rr_policy, glb_policy->base.request_reresolution);
glb_policy->base.request_reresolution = nullptr;
new_rr_policy, &glb_policy->rr_on_reresolution_requested);
glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr;
glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked(
glb_policy->rr_policy, &rr_state_error);
/* Connectivity state is a function of the RR policy updated/created */
update_lb_connectivity_status_locked(
glb_policy, glb_policy->rr_connectivity_state, rr_state_error);
update_lb_connectivity_status_locked(glb_policy, rr_state_error);
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed,
on_rr_connectivity_changed_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
/* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb");
GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_connectivity_changed_locked");
grpc_lb_policy_notify_on_state_change_locked(
glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
&glb_policy->on_rr_connectivity_changed);
&glb_policy->rr_on_connectivity_changed);
grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
// Send pending picks to RR policy.
pending_pick* pp;
@ -820,28 +847,18 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) {
lb_policy_args_destroy(args);
}
static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
static void rr_on_connectivity_changed_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (glb_policy->shutting_down) {
GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
return;
}
if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* An RR policy that has transitioned into the SHUTDOWN connectivity state
* should not be considered for picks or updates: the SHUTDOWN state is a
* sink, policies can't transition back from it. .*/
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown");
glb_policy->rr_policy = nullptr;
GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
GRPC_LB_POLICY_UNREF(&glb_policy->base,
"rr_on_connectivity_changed_locked");
return;
}
/* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
update_lb_connectivity_status_locked(
glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error));
/* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */
update_lb_connectivity_status_locked(glb_policy, GRPC_ERROR_REF(error));
// Resubscribe. Reuse the "rr_on_connectivity_changed_locked" ref.
grpc_lb_policy_notify_on_state_change_locked(
glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
&glb_policy->on_rr_connectivity_changed);
&glb_policy->rr_on_connectivity_changed);
}
static void destroy_balancer_name(void* balancer_name) {
@ -963,8 +980,6 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
if (glb_policy->rr_policy != nullptr) {
grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr);
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
} else {
grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
}
// We destroy the LB channel here because
// glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
@ -976,6 +991,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
}
grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "glb_shutdown");
grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
// Clear pending picks.
pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = nullptr;
@ -1614,6 +1630,8 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
lb_calld, lb_calld->lb_call, grpc_error_string(error));
gpr_free(status_details);
}
grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
GRPC_ERROR_NONE);
// If this lb_calld is still in use, this call ended because of a failure so
// we want to retry connecting. Otherwise, we have deliberately ended this
// call and no further action is required.
@ -1642,8 +1660,8 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
glb_policy->fallback_timer_callback_pending = false;
/* If we receive a serverlist after the timer fires but before this callback
* actually runs, don't fall back. */
if (glb_policy->serverlist == nullptr) {
if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
if (glb_policy->serverlist == nullptr && !glb_policy->shutting_down &&
error == GRPC_ERROR_NONE) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Falling back to use backends from resolver",
@ -1652,7 +1670,6 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
rr_handover_locked(glb_policy);
}
}
GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
}
@ -1773,19 +1790,6 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
}
}
static void glb_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution) {
glb_lb_policy* glb_policy = (glb_lb_policy*)policy;
GPR_ASSERT(!glb_policy->shutting_down);
GPR_ASSERT(glb_policy->base.request_reresolution == nullptr);
if (glb_policy->rr_policy != nullptr) {
grpc_lb_policy_set_reresolve_closure_locked(glb_policy->rr_policy,
request_reresolution);
} else {
glb_policy->base.request_reresolution = request_reresolution;
}
}
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
@ -1797,8 +1801,7 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_exit_idle_locked,
glb_check_connectivity_locked,
glb_notify_on_state_change_locked,
glb_update_locked,
glb_set_reresolve_closure_locked};
glb_update_locked};
static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
@ -1878,6 +1881,12 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
return nullptr;
}
grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&glb_policy->rr_on_connectivity_changed,
rr_on_connectivity_changed_locked, glb_policy,
grpc_combiner_scheduler(args->combiner));
GRPC_CLOSURE_INIT(&glb_policy->rr_on_reresolution_requested,
rr_on_reresolution_requested_locked, glb_policy,
grpc_combiner_scheduler(args->combiner));
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));

@ -519,14 +519,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
}
}
static void pf_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution) {
pick_first_lb_policy* p = (pick_first_lb_policy*)policy;
GPR_ASSERT(!p->shutdown);
GPR_ASSERT(policy->request_reresolution == nullptr);
policy->request_reresolution = request_reresolution;
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown_locked,
@ -537,8 +529,7 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_exit_idle_locked,
pf_check_connectivity_locked,
pf_notify_on_state_change_locked,
pf_update_locked,
pf_set_reresolve_closure_locked};
pf_update_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {}

@ -595,14 +595,6 @@ static void rr_update_locked(grpc_lb_policy* policy,
}
}
static void rr_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution) {
round_robin_lb_policy* p = (round_robin_lb_policy*)policy;
GPR_ASSERT(!p->shutdown);
GPR_ASSERT(policy->request_reresolution == nullptr);
policy->request_reresolution = request_reresolution;
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy,
rr_shutdown_locked,
@ -613,8 +605,7 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_exit_idle_locked,
rr_check_connectivity_locked,
rr_notify_on_state_change_locked,
rr_update_locked,
rr_set_reresolve_closure_locked};
rr_update_locked};
static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}

@ -191,6 +191,7 @@ class BalancerServiceImpl : public BalancerService {
gpr_log(GPR_INFO, "LB[%p]: recv msg '%s'", this,
request.DebugString().c_str());
// TODO(juanlishen): Initial response should always be the first response.
if (client_load_reporting_interval_seconds_ > 0) {
LoadBalanceResponse initial_response;
initial_response.mutable_initial_response()
@ -443,7 +444,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
void WaitForBackend(size_t backend_idx) {
do {
CheckRpcSendOk();
SendRpc();
} while (backends_[backend_idx]->request_count() == 0);
ResetBackendCounters();
}
@ -663,9 +664,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent two responses.
EXPECT_EQ(2U, balancer_servers_[0].service_->response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(SingleBalancerTest, Fallback) {
@ -874,8 +872,6 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
// machinery to either update the LB responses "on the fly" or instruct
// backends which ports to restart on.
CheckRpcSendFailure();
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
class UpdatesTest : public GrpclbEnd2endTest {
@ -939,8 +935,6 @@ TEST_F(UpdatesTest, UpdateBalancers) {
EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
// Send an update with the same set of LBs as the one in SetUp() in order to
@ -1012,9 +1006,6 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
// doesn't assign the second backend.
EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
balancers_[0]->NotifyDoneWithServerlists();
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
@ -1097,8 +1088,152 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U);
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(UpdatesTest, ReresolveDeadBalancer) {
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
SetNextResolution(addresses);
addresses.clear();
addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
SetNextResolutionUponError(addresses);
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
// Kill backend 0.
gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************");
if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown();
gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************");
CheckRpcSendFailure();
balancers_[1]->NotifyDoneWithServerlists();
balancers_[2]->NotifyDoneWithServerlists();
EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
// Kill balancer 0.
gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************");
balancers_[0]->NotifyDoneWithServerlists();
if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown();
gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************");
balancers_[0]->NotifyDoneWithServerlists();
balancers_[1]->NotifyDoneWithServerlists();
balancers_[2]->NotifyDoneWithServerlists();
// Balancer 0 got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
// Balancer 1 may have received a request if re-resolution is done quickly
// enough.
EXPECT_GE(balancer_servers_[1].service_->request_count(), 0U);
EXPECT_GE(balancer_servers_[1].service_->response_count(), 0U);
EXPECT_LE(balancer_servers_[1].service_->request_count(), 1U);
EXPECT_LE(balancer_servers_[1].service_->response_count(), 1U);
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
// Wait until re-resolution has finished, as signaled by the second backend
// receiving a request.
WaitForBackend(1);
// This is serviced by the new serverlist.
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
balancers_[0]->NotifyDoneWithServerlists();
balancers_[1]->NotifyDoneWithServerlists();
balancers_[2]->NotifyDoneWithServerlists();
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
}
TEST_F(UpdatesTest, ReresolveDeadBackend) {
ResetStub(500);
// The first resolution contains the addresses of a balancer that never
// responds, and a fallback backend.
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
addresses.emplace_back(AddressData{backend_servers_[0].port_, false, ""});
SetNextResolution(addresses);
// The re-resolution result will contain a balancer address.
addresses.clear();
addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
SetNextResolutionUponError(addresses);
const std::vector<int> second_backend{backend_servers_[1].port_};
ScheduleResponseForBalancer(
1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// All 10 requests should have gone to the fallback backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
// Kill backend 0.
gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************");
if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown();
gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************");
balancers_[0]->NotifyDoneWithServerlists();
balancers_[1]->NotifyDoneWithServerlists();
balancers_[2]->NotifyDoneWithServerlists();
// Balancer 0 got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// but didn't send any response.
EXPECT_EQ(0U, balancer_servers_[0].service_->response_count());
// Balancer 1 may have received a request if re-resolution is done quickly
// enough.
EXPECT_GE(balancer_servers_[1].service_->request_count(), 0U);
EXPECT_GE(balancer_servers_[1].service_->response_count(), 0U);
EXPECT_LE(balancer_servers_[1].service_->request_count(), 1U);
EXPECT_LE(balancer_servers_[1].service_->response_count(), 1U);
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
// Wait until re-resolution has finished, as signaled by the second backend
// receiving a request.
WaitForBackend(1);
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
balancers_[0]->NotifyDoneWithServerlists();
balancers_[1]->NotifyDoneWithServerlists();
balancers_[2]->NotifyDoneWithServerlists();
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[0].service_->response_count());
EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
}
TEST_F(SingleBalancerTest, Drop) {

Loading…
Cancel
Save