Address comments

reviewable/pr22345/r3
Muxi Yan 5 years ago
parent ec2cd96426
commit fa28bab456
  1. 41
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 54
      test/cpp/end2end/grpclb_end2end_test.cc

@ -131,20 +131,20 @@ constexpr char kGrpclb[] = "grpclb";
class GrpcLbConfig : public LoadBalancingPolicy::Config {
public:
GrpcLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
std::string target_name)
std::string service_name)
: child_policy_(std::move(child_policy)),
target_name_(std::move(target_name)) {}
service_name_(std::move(service_name)) {}
const char* name() const override { return kGrpclb; }
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
return child_policy_;
}
const std::string& target_name() const { return target_name_; }
const std::string& service_name() const { return service_name_; }
private:
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
std::string target_name_;
std::string service_name_;
};
class GrpcLb : public LoadBalancingPolicy {
@ -374,9 +374,8 @@ class GrpcLb : public LoadBalancingPolicy {
// Who the client is trying to communicate with.
const char* server_name_ = nullptr;
// The target name from configuration; if set, it overrides server_name_ in
// the balancer requests.
std::string target_name_;
// Configurations for the policy.
RefCountedPtr<GrpcLbConfig> config_;
// Current channel args from the resolver.
grpc_channel_args* args_ = nullptr;
@ -422,8 +421,6 @@ class GrpcLb : public LoadBalancingPolicy {
// The child policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// The child policy config.
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
// Child policy in state READY.
bool child_policy_ready_ = false;
};
@ -769,10 +766,10 @@ GrpcLb::BalancerCallState::BalancerCallState(
nullptr, deadline, nullptr);
// Init the LB call request payload.
upb::Arena arena;
grpc_slice request_payload_slice =
GrpcLbRequestCreate(grpclb_policy()->target_name_.empty()
grpc_slice request_payload_slice = GrpcLbRequestCreate(
grpclb_policy()->config_->service_name().empty()
? grpclb_policy()->server_name_
: grpclb_policy()->target_name_.c_str(),
: grpclb_policy()->config_->service_name().c_str(),
arena.ptr());
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
@ -1399,10 +1396,8 @@ void GrpcLb::ResetBackoffLocked() {
void GrpcLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = lb_channel_ == nullptr;
auto* grpclb_config = static_cast<const GrpcLbConfig*>(args.config.get());
GPR_ASSERT(grpclb_config != nullptr);
child_policy_config_ = grpclb_config->child_policy();
target_name_ = grpclb_config->target_name();
config_ = args.config;
GPR_ASSERT(config_ != nullptr);
ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args);
// Update the existing child policy.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
@ -1657,7 +1652,7 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
update_args.args =
CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
GPR_ASSERT(update_args.args != nullptr);
update_args.config = child_policy_config_;
update_args.config = config_->child_policy();
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(update_args.args);
@ -1687,20 +1682,20 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
const Json& json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) {
return MakeRefCounted<GrpcLbConfig>(nullptr, nullptr);
return MakeRefCounted<GrpcLbConfig>(nullptr, "");
}
std::vector<grpc_error*> error_list;
Json child_policy_config_json_tmp;
const Json* child_policy_config_json;
std::string target_name;
std::string service_name;
auto it = json.object_value().find("serviceName");
if (it != json.object_value().end()) {
const Json& target_name_json = it->second;
if (target_name_json.type() != Json::Type::STRING) {
const Json& service_name_json = it->second;
if (service_name_json.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:serviceName error:type should be string"));
} else {
target_name = target_name_json.string_value();
service_name = service_name_json.string_value();
}
}
it = json.object_value().find("childPolicy");
@ -1724,7 +1719,7 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
}
if (error_list.empty()) {
return MakeRefCounted<GrpcLbConfig>(std::move(child_policy_config),
target_name);
std::move(service_name));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
return nullptr;

@ -212,11 +212,7 @@ struct ClientStats {
class BalancerServiceImpl : public BalancerService {
public:
using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>;
struct ResponseConfig {
LoadBalanceResponse response;
int delay_ms;
std::string for_target;
};
using ResponseDelayPair = std::pair<LoadBalanceResponse, int>;
explicit BalancerServiceImpl(int client_load_reporting_interval_seconds)
: client_load_reporting_interval_seconds_(
@ -233,14 +229,14 @@ class BalancerServiceImpl : public BalancerService {
EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey),
context->client_metadata().end());
LoadBalanceRequest request;
std::string target;
std::vector<ResponseConfig> response_configs;
std::vector<ResponseDelayPair> responses_and_delays;
if (!stream->Read(&request)) {
goto done;
} else {
if (request.has_initial_request()) {
target = request.initial_request().name();
grpc::internal::MutexLock lock(&mu_);
service_names_.push_back(request.initial_request().name());
}
}
IncreaseRequestCount();
@ -258,14 +254,11 @@ class BalancerServiceImpl : public BalancerService {
{
grpc::internal::MutexLock lock(&mu_);
response_configs = response_configs_;
}
for (const auto& response_config : response_configs) {
if (response_config.for_target.empty() ||
response_config.for_target == target) {
SendResponse(stream, response_config.response,
response_config.delay_ms);
responses_and_delays = responses_and_delays_;
}
for (const auto& response_and_delay : responses_and_delays) {
SendResponse(stream, response_and_delay.first,
response_and_delay.second);
}
{
grpc::internal::MutexLock lock(&mu_);
@ -307,16 +300,16 @@ class BalancerServiceImpl : public BalancerService {
return Status::OK;
}
void add_response(const LoadBalanceResponse& response, int send_after_ms,
std::string for_target = "") {
void add_response(const LoadBalanceResponse& response, int send_after_ms) {
grpc::internal::MutexLock lock(&mu_);
response_configs_.push_back({response, send_after_ms, for_target});
responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
}
void Start() {
grpc::internal::MutexLock lock(&mu_);
serverlist_done_ = false;
response_configs_.clear();
responses_and_delays_.clear();
load_report_queue_.clear();
}
void Shutdown() {
@ -370,6 +363,11 @@ class BalancerServiceImpl : public BalancerService {
}
}
std::vector<std::string> service_names() {
grpc::internal::MutexLock lock(&mu_);
return service_names_;
}
private:
void SendResponse(Stream* stream, const LoadBalanceResponse& response,
int delay_ms) {
@ -384,7 +382,8 @@ class BalancerServiceImpl : public BalancerService {
}
const int client_load_reporting_interval_seconds_;
std::vector<ResponseConfig> response_configs_;
std::vector<ResponseDelayPair> responses_and_delays_;
std::vector<std::string> service_names_;
grpc::internal::Mutex mu_;
grpc::internal::CondVar serverlist_cond_;
@ -625,8 +624,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
void ScheduleResponseForBalancer(size_t i,
const LoadBalanceResponse& response,
int delay_ms, std::string target = "") {
balancers_[i]->service_.add_response(response, delay_ms, target);
int delay_ms) {
balancers_[i]->service_.add_response(response, delay_ms);
}
Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000,
@ -1395,12 +1394,12 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
EXPECT_EQ(1U, balancers_[0]->service_.response_count());
}
TEST_F(SingleBalancerTest, TargetFromLbPolicyConfig) {
TEST_F(SingleBalancerTest, ServiceNameFromLbPolicyConfig) {
constexpr char kServiceConfigWithTarget[] =
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
" \"serviceName\":\"test_target\"\n"
" \"serviceName\":\"test_service\"\n"
" }}\n"
" ]\n"
"}";
@ -1409,13 +1408,14 @@ TEST_F(SingleBalancerTest, TargetFromLbPolicyConfig) {
const size_t kNumRpcsPerAddress = 1;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0, "test_target");
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
// We need to wait for all backends to come online.
WaitForAllBackends();
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
// Send an RPC to trigger load balancing.
CheckRpcSendOk();
EXPECT_EQ(balancers_[0]->service_.service_names().back(), "test_service");
}
class UpdatesTest : public GrpclbEnd2endTest {

Loading…
Cancel
Save