Merge pull request #22345 from muxi/grpclb-name-config

Update grpclb configuration with field "service_name"
pull/23435/head
Muxi Yan 4 years ago committed by GitHub
commit 3b2e473548
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 33
      test/cpp/end2end/grpclb_end2end_test.cc

@ -131,16 +131,21 @@ constexpr char kGrpclb[] = "grpclb";
class GrpcLbConfig : public LoadBalancingPolicy::Config {
public:
explicit GrpcLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy)
: child_policy_(std::move(child_policy)) {}
GrpcLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
std::string service_name)
: child_policy_(std::move(child_policy)),
service_name_(std::move(service_name)) {}
const char* name() const override { return kGrpclb; }
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
return child_policy_;
}
const std::string& service_name() const { return service_name_; }
private:
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
std::string service_name_;
};
class GrpcLb : public LoadBalancingPolicy {
@ -370,6 +375,8 @@ class GrpcLb : public LoadBalancingPolicy {
// Who the client is trying to communicate with.
const char* server_name_ = nullptr;
// Configurations for the policy.
RefCountedPtr<GrpcLbConfig> config_;
// Current channel args from the resolver.
grpc_channel_args* args_ = nullptr;
@ -415,8 +422,6 @@ class GrpcLb : public LoadBalancingPolicy {
// The child policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// The child policy config.
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
// Child policy in state READY.
bool child_policy_ready_ = false;
};
@ -760,8 +765,11 @@ GrpcLb::BalancerCallState::BalancerCallState(
nullptr, deadline, nullptr);
// Init the LB call request payload.
upb::Arena arena;
grpc_slice request_payload_slice =
GrpcLbRequestCreate(grpclb_policy()->server_name_, arena.ptr());
grpc_slice request_payload_slice = GrpcLbRequestCreate(
grpclb_policy()->config_->service_name().empty()
? grpclb_policy()->server_name_
: grpclb_policy()->config_->service_name().c_str(),
arena.ptr());
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
@ -1387,12 +1395,8 @@ void GrpcLb::ResetBackoffLocked() {
void GrpcLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = lb_channel_ == nullptr;
auto* grpclb_config = static_cast<const GrpcLbConfig*>(args.config.get());
if (grpclb_config != nullptr) {
child_policy_config_ = grpclb_config->child_policy();
} else {
child_policy_config_ = nullptr;
}
config_ = args.config;
GPR_ASSERT(config_ != nullptr);
ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args);
// Update the existing child policy.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
@ -1646,7 +1650,7 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
update_args.args =
CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
GPR_ASSERT(update_args.args != nullptr);
update_args.config = child_policy_config_;
update_args.config = config_->child_policy();
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(update_args.args);
@ -1676,12 +1680,23 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
const Json& json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) {
return MakeRefCounted<GrpcLbConfig>(nullptr);
return MakeRefCounted<GrpcLbConfig>(nullptr, "");
}
std::vector<grpc_error*> error_list;
Json child_policy_config_json_tmp;
const Json* child_policy_config_json;
auto it = json.object_value().find("childPolicy");
std::string service_name;
auto it = json.object_value().find("serviceName");
if (it != json.object_value().end()) {
const Json& service_name_json = it->second;
if (service_name_json.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:serviceName error:type should be string"));
} else {
service_name = service_name_json.string_value();
}
}
it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
child_policy_config_json_tmp = Json::Array{Json::Object{
{"round_robin", Json::Object()},
@ -1701,7 +1716,8 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
if (error_list.empty()) {
return MakeRefCounted<GrpcLbConfig>(std::move(child_policy_config));
return MakeRefCounted<GrpcLbConfig>(std::move(child_policy_config),
std::move(service_name));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
return nullptr;

@ -236,6 +236,11 @@ class BalancerServiceImpl : public BalancerService {
if (!stream->Read(&request)) {
goto done;
} else {
if (request.has_initial_request()) {
grpc::internal::MutexLock lock(&mu_);
service_names_.push_back(request.initial_request().name());
}
}
IncreaseRequestCount();
gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this,
@ -359,6 +364,11 @@ class BalancerServiceImpl : public BalancerService {
}
}
std::vector<std::string> service_names() {
grpc::internal::MutexLock lock(&mu_);
return service_names_;
}
private:
void SendResponse(Stream* stream, const LoadBalanceResponse& response,
int delay_ms) {
@ -374,6 +384,7 @@ class BalancerServiceImpl : public BalancerService {
const int client_load_reporting_interval_seconds_;
std::vector<ResponseDelayPair> responses_and_delays_;
std::vector<std::string> service_names_;
grpc::internal::Mutex mu_;
grpc::internal::CondVar serverlist_cond_;
@ -1382,6 +1393,28 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
EXPECT_EQ(1U, balancers_[0]->service_.response_count());
}
TEST_F(SingleBalancerTest, ServiceNameFromLbPolicyConfig) {
constexpr char kServiceConfigWithTarget[] =
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"grpclb\":{\n"
" \"serviceName\":\"test_service\"\n"
" }}\n"
" ]\n"
"}";
SetNextResolutionAllBalancers(kServiceConfigWithTarget);
const size_t kNumRpcsPerAddress = 1;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
// We need to wait for all backends to come online.
WaitForAllBackends();
EXPECT_EQ(balancers_[0]->service_.service_names().back(), "test_service");
}
class UpdatesTest : public GrpclbEnd2endTest {
public:
UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}

Loading…
Cancel
Save