|
|
|
@ -210,6 +210,10 @@ class ChannelData { |
|
|
|
|
ChannelData(grpc_channel_element_args* args, grpc_error** error); |
|
|
|
|
~ChannelData(); |
|
|
|
|
|
|
|
|
|
void CreateResolvingLoadBalancingPolicyLocked(); |
|
|
|
|
|
|
|
|
|
void DestroyResolvingLoadBalancingPolicyLocked(); |
|
|
|
|
|
|
|
|
|
static bool ProcessResolverResultLocked( |
|
|
|
|
void* arg, const Resolver::Result& result, const char** lb_policy_name, |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config, |
|
|
|
@ -235,8 +239,10 @@ class ChannelData { |
|
|
|
|
const size_t per_rpc_retry_buffer_size_; |
|
|
|
|
grpc_channel_stack* owning_stack_; |
|
|
|
|
ClientChannelFactory* client_channel_factory_; |
|
|
|
|
UniquePtr<char> server_name_; |
|
|
|
|
const grpc_channel_args* channel_args_; |
|
|
|
|
RefCountedPtr<ServiceConfig> default_service_config_; |
|
|
|
|
UniquePtr<char> server_name_; |
|
|
|
|
UniquePtr<char> target_uri_; |
|
|
|
|
channelz::ChannelNode* channelz_node_; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1226,33 +1232,47 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) |
|
|
|
|
grpc_channel_args* new_args = nullptr; |
|
|
|
|
grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name, |
|
|
|
|
&new_args); |
|
|
|
|
UniquePtr<char> target_uri(proxy_name != nullptr ? proxy_name |
|
|
|
|
target_uri_.reset(proxy_name != nullptr ? proxy_name |
|
|
|
|
: gpr_strdup(server_uri)); |
|
|
|
|
channel_args_ = new_args != nullptr |
|
|
|
|
? new_args |
|
|
|
|
: grpc_channel_args_copy(args->channel_args); |
|
|
|
|
if (!ResolverRegistry::IsValidTarget(target_uri_.get())) { |
|
|
|
|
*error = |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("the target uri is not valid."); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
*error = GRPC_ERROR_NONE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ChannelData::~ChannelData() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: destroying channel", this); |
|
|
|
|
} |
|
|
|
|
DestroyResolvingLoadBalancingPolicyLocked(); |
|
|
|
|
grpc_channel_args_destroy(channel_args_); |
|
|
|
|
// Stop backup polling.
|
|
|
|
|
grpc_client_channel_stop_backup_polling(interested_parties_); |
|
|
|
|
grpc_pollset_set_destroy(interested_parties_); |
|
|
|
|
GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel"); |
|
|
|
|
GRPC_COMBINER_UNREF(combiner_, "client_channel"); |
|
|
|
|
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED)); |
|
|
|
|
grpc_connectivity_state_destroy(&state_tracker_); |
|
|
|
|
gpr_mu_destroy(&info_mu_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::CreateResolvingLoadBalancingPolicyLocked() { |
|
|
|
|
// Instantiate resolving LB policy.
|
|
|
|
|
LoadBalancingPolicy::Args lb_args; |
|
|
|
|
lb_args.combiner = combiner_; |
|
|
|
|
lb_args.channel_control_helper = |
|
|
|
|
UniquePtr<LoadBalancingPolicy::ChannelControlHelper>( |
|
|
|
|
New<ClientChannelControlHelper>(this)); |
|
|
|
|
lb_args.args = new_args != nullptr ? new_args : args->channel_args; |
|
|
|
|
lb_args.args = channel_args_; |
|
|
|
|
UniquePtr<char> target_uri(strdup(target_uri_.get())); |
|
|
|
|
resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>( |
|
|
|
|
std::move(lb_args), &grpc_client_channel_routing_trace, |
|
|
|
|
std::move(target_uri), ProcessResolverResultLocked, this, error)); |
|
|
|
|
grpc_channel_args_destroy(new_args); |
|
|
|
|
if (*error != GRPC_ERROR_NONE) { |
|
|
|
|
// Orphan the resolving LB policy and flush the exec_ctx to ensure
|
|
|
|
|
// that it finishes shutting down. This ensures that if we are
|
|
|
|
|
// failing, we destroy the ClientChannelControlHelper (and thus
|
|
|
|
|
// unref the channel stack) before we return.
|
|
|
|
|
// TODO(roth): This is not a complete solution, because it only
|
|
|
|
|
// catches the case where channel stack initialization fails in this
|
|
|
|
|
// particular filter. If there is a failure in a different filter, we
|
|
|
|
|
// will leave a dangling ref here, which can cause a crash. Fortunately,
|
|
|
|
|
// in practice, there are no other filters that can cause failures in
|
|
|
|
|
// channel stack initialization, so this works for now.
|
|
|
|
|
resolving_lb_policy_.reset(); |
|
|
|
|
ExecCtx::Get()->Flush(); |
|
|
|
|
} else { |
|
|
|
|
std::move(target_uri), ProcessResolverResultLocked, this)); |
|
|
|
|
grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(), |
|
|
|
|
interested_parties_); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
@ -1260,25 +1280,13 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) |
|
|
|
|
resolving_lb_policy_.get()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ChannelData::~ChannelData() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: destroying channel", this); |
|
|
|
|
} |
|
|
|
|
void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() { |
|
|
|
|
if (resolving_lb_policy_ != nullptr) { |
|
|
|
|
grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(), |
|
|
|
|
interested_parties_); |
|
|
|
|
resolving_lb_policy_.reset(); |
|
|
|
|
} |
|
|
|
|
// Stop backup polling.
|
|
|
|
|
grpc_client_channel_stop_backup_polling(interested_parties_); |
|
|
|
|
grpc_pollset_set_destroy(interested_parties_); |
|
|
|
|
GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel"); |
|
|
|
|
GRPC_COMBINER_UNREF(combiner_, "client_channel"); |
|
|
|
|
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED)); |
|
|
|
|
grpc_connectivity_state_destroy(&state_tracker_); |
|
|
|
|
gpr_mu_destroy(&info_mu_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::ProcessLbPolicy( |
|
|
|
@ -1500,10 +1508,7 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) { |
|
|
|
|
GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong( |
|
|
|
|
&error, op->disconnect_with_error, MemoryOrder::ACQ_REL, |
|
|
|
|
MemoryOrder::ACQUIRE)); |
|
|
|
|
grpc_pollset_set_del_pollset_set( |
|
|
|
|
chand->resolving_lb_policy_->interested_parties(), |
|
|
|
|
chand->interested_parties_); |
|
|
|
|
chand->resolving_lb_policy_.reset(); |
|
|
|
|
chand->DestroyResolvingLoadBalancingPolicyLocked(); |
|
|
|
|
// Will delete itself.
|
|
|
|
|
New<ConnectivityStateAndPickerSetter>( |
|
|
|
|
chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API", |
|
|
|
@ -1574,6 +1579,8 @@ void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) { |
|
|
|
|
auto* chand = static_cast<ChannelData*>(arg); |
|
|
|
|
if (chand->resolving_lb_policy_ != nullptr) { |
|
|
|
|
chand->resolving_lb_policy_->ExitIdleLocked(); |
|
|
|
|
} else { |
|
|
|
|
chand->CreateResolvingLoadBalancingPolicyLocked(); |
|
|
|
|
} |
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect"); |
|
|
|
|
} |
|
|
|
@ -3463,6 +3470,15 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { |
|
|
|
|
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); |
|
|
|
|
GPR_ASSERT(calld->connected_subchannel_ == nullptr); |
|
|
|
|
GPR_ASSERT(calld->subchannel_call_ == nullptr); |
|
|
|
|
// picker's being null means the channel is currently in IDLE state. The
|
|
|
|
|
// incoming call will make the channel exit IDLE and queue itself.
|
|
|
|
|
if (chand->picker() == nullptr) { |
|
|
|
|
// We are currently in the data plane.
|
|
|
|
|
// Bounce into the control plane to exit IDLE.
|
|
|
|
|
chand->CheckConnectivityState(true); |
|
|
|
|
calld->AddCallToQueuedPicksLocked(elem); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Apply service config to call if needed.
|
|
|
|
|
calld->MaybeApplyServiceConfigToCallLocked(elem); |
|
|
|
|
// If this is a retry, use the send_initial_metadata payload that
|
|
|
|
|