lazy resolving lb policy creation

pull/19588/head
Qiancheng Zhao 5 years ago
parent 030c399a05
commit 444806583d
  1. 100
      src/core/ext/filters/client_channel/client_channel.cc
  2. 1
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  3. 59
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  4. 15
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  5. 4
      test/cpp/end2end/grpclb_end2end_test.cc

@ -210,6 +210,10 @@ class ChannelData {
ChannelData(grpc_channel_element_args* args, grpc_error** error);
~ChannelData();
void CreateResolvingLoadBalancingPolicyLocked();
void DestroyResolvingLoadBalancingPolicyLocked();
static bool ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
@ -235,8 +239,10 @@ class ChannelData {
const size_t per_rpc_retry_buffer_size_;
grpc_channel_stack* owning_stack_;
ClientChannelFactory* client_channel_factory_;
UniquePtr<char> server_name_;
const grpc_channel_args* channel_args_;
RefCountedPtr<ServiceConfig> default_service_config_;
UniquePtr<char> server_name_;
UniquePtr<char> target_uri_;
channelz::ChannelNode* channelz_node_;
//
@ -1226,59 +1232,61 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
grpc_channel_args* new_args = nullptr;
grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
&new_args);
UniquePtr<char> target_uri(proxy_name != nullptr ? proxy_name
: gpr_strdup(server_uri));
target_uri_.reset(proxy_name != nullptr ? proxy_name
: gpr_strdup(server_uri));
channel_args_ = new_args != nullptr
? new_args
: grpc_channel_args_copy(args->channel_args);
if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
*error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("the target uri is not valid.");
return;
}
*error = GRPC_ERROR_NONE;
}
ChannelData::~ChannelData() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
}
DestroyResolvingLoadBalancingPolicyLocked();
grpc_channel_args_destroy(channel_args_);
// Stop backup polling.
grpc_client_channel_stop_backup_polling(interested_parties_);
grpc_pollset_set_destroy(interested_parties_);
GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel");
GRPC_COMBINER_UNREF(combiner_, "client_channel");
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
grpc_connectivity_state_destroy(&state_tracker_);
gpr_mu_destroy(&info_mu_);
}
void ChannelData::CreateResolvingLoadBalancingPolicyLocked() {
// Instantiate resolving LB policy.
LoadBalancingPolicy::Args lb_args;
lb_args.combiner = combiner_;
lb_args.channel_control_helper =
UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
New<ClientChannelControlHelper>(this));
lb_args.args = new_args != nullptr ? new_args : args->channel_args;
lb_args.args = channel_args_;
UniquePtr<char> target_uri(strdup(target_uri_.get()));
resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
std::move(lb_args), &grpc_client_channel_routing_trace,
std::move(target_uri), ProcessResolverResultLocked, this, error));
grpc_channel_args_destroy(new_args);
if (*error != GRPC_ERROR_NONE) {
// Orphan the resolving LB policy and flush the exec_ctx to ensure
// that it finishes shutting down. This ensures that if we are
// failing, we destroy the ClientChannelControlHelper (and thus
// unref the channel stack) before we return.
// TODO(roth): This is not a complete solution, because it only
// catches the case where channel stack initialization fails in this
// particular filter. If there is a failure in a different filter, we
// will leave a dangling ref here, which can cause a crash. Fortunately,
// in practice, there are no other filters that can cause failures in
// channel stack initialization, so this works for now.
resolving_lb_policy_.reset();
ExecCtx::Get()->Flush();
} else {
grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
interested_parties_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
resolving_lb_policy_.get());
}
std::move(target_uri), ProcessResolverResultLocked, this));
grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
interested_parties_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
resolving_lb_policy_.get());
}
}
ChannelData::~ChannelData() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
}
void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() {
if (resolving_lb_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
interested_parties_);
resolving_lb_policy_.reset();
}
// Stop backup polling.
grpc_client_channel_stop_backup_polling(interested_parties_);
grpc_pollset_set_destroy(interested_parties_);
GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel");
GRPC_COMBINER_UNREF(combiner_, "client_channel");
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
grpc_connectivity_state_destroy(&state_tracker_);
gpr_mu_destroy(&info_mu_);
}
void ChannelData::ProcessLbPolicy(
@ -1500,10 +1508,7 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
&error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
MemoryOrder::ACQUIRE));
grpc_pollset_set_del_pollset_set(
chand->resolving_lb_policy_->interested_parties(),
chand->interested_parties_);
chand->resolving_lb_policy_.reset();
chand->DestroyResolvingLoadBalancingPolicyLocked();
// Will delete itself.
New<ConnectivityStateAndPickerSetter>(
chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
@ -1574,6 +1579,8 @@ void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
auto* chand = static_cast<ChannelData*>(arg);
if (chand->resolving_lb_policy_ != nullptr) {
chand->resolving_lb_policy_->ExitIdleLocked();
} else {
chand->CreateResolvingLoadBalancingPolicyLocked();
}
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
}
@ -3463,6 +3470,15 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
GPR_ASSERT(calld->connected_subchannel_ == nullptr);
GPR_ASSERT(calld->subchannel_call_ == nullptr);
// picker's being null means the channel is currently in IDLE state. The
// incoming call will make the channel exit IDLE and queue itself.
if (chand->picker() == nullptr) {
// We are currently in the data plane.
// Bounce into the control plane to exit IDLE.
chand->CheckConnectivityState(true);
calld->AddCallToQueuedPicksLocked(elem);
return;
}
// Apply service config to call if needed.
calld->MaybeApplyServiceConfigToCallLocked(elem);
// If this is a retry, use the send_initial_metadata payload that

@ -196,7 +196,6 @@ void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
grpc_combiner_scheduler(resolver_->combiner())),
GRPC_ERROR_NONE);
} else {
GPR_ASSERT(!has_result_);
has_result_ = true;
result_ = std::move(result);
}

@ -181,52 +181,29 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
// ResolvingLoadBalancingPolicy
//
ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
UniquePtr<char> child_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config> child_lb_config,
grpc_error** error)
: LoadBalancingPolicy(std::move(args)),
tracer_(tracer),
target_uri_(std::move(target_uri)),
child_policy_name_(std::move(child_policy_name)),
child_lb_config_(std::move(child_lb_config)) {
GPR_ASSERT(child_policy_name_ != nullptr);
// Don't fetch service config, since this ctor is for use in nested LB
// policies, not at the top level, and we only fetch the service
// config at the top level.
grpc_arg arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION), 0);
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(args.args, &arg, 1);
*error = Init(*new_args);
grpc_channel_args_destroy(new_args);
}
ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
ProcessResolverResultCallback process_resolver_result,
void* process_resolver_result_user_data, grpc_error** error)
void* process_resolver_result_user_data)
: LoadBalancingPolicy(std::move(args)),
tracer_(tracer),
target_uri_(std::move(target_uri)),
process_resolver_result_(process_resolver_result),
process_resolver_result_user_data_(process_resolver_result_user_data) {
GPR_ASSERT(process_resolver_result != nullptr);
*error = Init(*args.args);
}
grpc_error* ResolvingLoadBalancingPolicy::Init(const grpc_channel_args& args) {
resolver_ = ResolverRegistry::CreateResolver(
target_uri_.get(), &args, interested_parties(), combiner(),
target_uri_.get(), args.args, interested_parties(), combiner(),
UniquePtr<Resolver::ResultHandler>(New<ResolverResultHandler>(Ref())));
if (resolver_ == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
// Since the validity of args has been checked when create the channel,
// CreateResolver() must return a non-null result.
GPR_ASSERT(resolver_ != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
}
// Return our picker to the channel.
channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
return GRPC_ERROR_NONE;
GRPC_CHANNEL_CONNECTING,
UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
resolver_->StartLocked();
}
ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
@ -262,10 +239,6 @@ void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
if (lb_policy_ != nullptr) {
lb_policy_->ExitIdleLocked();
if (pending_lb_policy_ != nullptr) pending_lb_policy_->ExitIdleLocked();
} else {
if (!started_resolving_ && resolver_ != nullptr) {
StartResolvingLocked();
}
}
}
@ -278,18 +251,6 @@ void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked();
}
void ResolvingLoadBalancingPolicy::StartResolvingLocked() {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
}
GPR_ASSERT(!started_resolving_);
started_resolving_ = true;
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING,
UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
resolver_->StartLocked();
}
void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
if (resolver_ == nullptr) {
GRPC_ERROR_UNREF(error);

@ -51,16 +51,6 @@ namespace grpc_core {
// child LB policy and config to use.
class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
public:
// If error is set when this returns, then construction failed, and
// the caller may not use the new object.
ResolvingLoadBalancingPolicy(
Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
UniquePtr<char> child_policy_name,
RefCountedPtr<LoadBalancingPolicy::Config> child_lb_config,
grpc_error** error);
// Private ctor, to be used by client_channel only!
//
// Synchronous callback that takes the resolver result and sets
// lb_policy_name and lb_policy_config to point to the right data.
// Returns true if the service config has changed since the last result.
@ -77,7 +67,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
ResolvingLoadBalancingPolicy(
Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
ProcessResolverResultCallback process_resolver_result,
void* process_resolver_result_user_data, grpc_error** error);
void* process_resolver_result_user_data);
virtual const char* name() const override { return "resolving_lb"; }
@ -98,10 +88,8 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
~ResolvingLoadBalancingPolicy();
grpc_error* Init(const grpc_channel_args& args);
void ShutdownLocked() override;
void StartResolvingLocked();
void OnResolverError(grpc_error* error);
void CreateOrUpdateLbPolicyLocked(
const char* lb_policy_name,
@ -126,7 +114,6 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
// Resolver and associated state.
OrphanablePtr<Resolver> resolver_;
bool started_resolving_ = false;
bool previous_resolution_contained_addresses_ = false;
// Child LB policy.

@ -1618,6 +1618,8 @@ TEST_F(UpdatesTest, ReresolveDeadBackend) {
addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""});
addresses.emplace_back(AddressData{backends_[0]->port_, false, ""});
SetNextResolution(addresses);
// Ask channel to connect to trigger resolver creation.
channel_->GetState(true);
// The re-resolution result will contain the addresses of the same balancer
// and a new fallback backend.
addresses.clear();
@ -1669,6 +1671,8 @@ class UpdatesWithClientLoadReportingTest : public GrpclbEnd2endTest {
};
TEST_F(UpdatesWithClientLoadReportingTest, ReresolveDeadBalancer) {
// Ask channel to connect to trigger resolver creation.
channel_->GetState(true);
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""});
SetNextResolution(addresses);

Loading…
Cancel
Save