diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index a6a7d3697b6..0ed90e85673 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -212,7 +212,11 @@ struct ClientStats { class BalancerServiceImpl : public BalancerService { public: using Stream = ServerReaderWriter; - using ResponseDelayPair = std::pair; + struct ResponseConfig { + LoadBalanceResponse response; + int delay_ms; + std::string for_target; + }; explicit BalancerServiceImpl(int client_load_reporting_interval_seconds) : client_load_reporting_interval_seconds_( @@ -229,10 +233,15 @@ class BalancerServiceImpl : public BalancerService { EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey), context->client_metadata().end()); LoadBalanceRequest request; - std::vector responses_and_delays; + std::string target; + std::vector response_configs; if (!stream->Read(&request)) { goto done; + } else { + if (request.has_initial_request()) { + target = request.initial_request().name(); + } } IncreaseRequestCount(); gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this, @@ -249,11 +258,14 @@ class BalancerServiceImpl : public BalancerService { { grpc::internal::MutexLock lock(&mu_); - responses_and_delays = responses_and_delays_; + response_configs = response_configs_; } - for (const auto& response_and_delay : responses_and_delays) { - SendResponse(stream, response_and_delay.first, - response_and_delay.second); + for (const auto& response_config : response_configs) { + if (response_config.for_target.empty() || + response_config.for_target == target) { + SendResponse(stream, response_config.response, + response_config.delay_ms); + } } { grpc::internal::MutexLock lock(&mu_); @@ -295,16 +307,16 @@ class BalancerServiceImpl : public BalancerService { return Status::OK; } - void add_response(const LoadBalanceResponse& response, int send_after_ms) { + void add_response(const LoadBalanceResponse& response, int send_after_ms, + std::string for_target = "") { grpc::internal::MutexLock lock(&mu_); - responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); + response_configs_.push_back({response, send_after_ms, for_target}); } void Start() { grpc::internal::MutexLock lock(&mu_); serverlist_done_ = false; - responses_and_delays_.clear(); - load_report_queue_.clear(); + response_configs_.clear(); } void Shutdown() { @@ -372,7 +384,7 @@ class BalancerServiceImpl : public BalancerService { } const int client_load_reporting_interval_seconds_; - std::vector responses_and_delays_; + std::vector response_configs_; grpc::internal::Mutex mu_; grpc::internal::CondVar serverlist_cond_; @@ -613,8 +625,8 @@ class GrpclbEnd2endTest : public ::testing::Test { void ScheduleResponseForBalancer(size_t i, const LoadBalanceResponse& response, - int delay_ms) { - balancers_[i]->service_.add_response(response, delay_ms); + int delay_ms, std::string target = "") { + balancers_[i]->service_.add_response(response, delay_ms, target); } Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000, @@ -1383,6 +1395,29 @@ TEST_F(SingleBalancerTest, BackendsRestart) { EXPECT_EQ(1U, balancers_[0]->service_.response_count()); } +TEST_F(SingleBalancerTest, TargetFromLbPolicyConfig) { + constexpr char kServiceConfigWithTarget[] = + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"grpclb\":{\n" + " \"targetName\":\"test_target\"\n" + " }}\n" + " ]\n" + "}"; + + SetNextResolutionAllBalancers(kServiceConfigWithTarget); + const size_t kNumRpcsPerAddress = 1; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + 0, "test_target"); + // Make sure that trying to connect works without a call. + channel_->GetState(true /* try_to_connect */); + // We need to wait for all backends to come online. + WaitForAllBackends(); + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); +} + class UpdatesTest : public GrpclbEnd2endTest { public: UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}