|
|
|
@ -353,9 +353,11 @@ class DynamicTerminationFilter::CallData { |
|
|
|
|
args, pollent, nullptr, |
|
|
|
|
service_config_call_data->call_dispatch_controller(), |
|
|
|
|
/*is_transparent_retry=*/false); |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p dynamic_termination_calld=%p: create lb_call=%p", |
|
|
|
|
chand, client_channel, calld->lb_call_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand, |
|
|
|
|
client_channel, calld->lb_call_.get()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -406,8 +408,9 @@ class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
~ResolverResultHandler() override { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: resolver shutdown complete", chand_); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_); |
|
|
|
|
} |
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -448,9 +451,11 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
chand_(chand), |
|
|
|
|
subchannel_(std::move(subchannel)), |
|
|
|
|
health_check_service_name_(std::move(health_check_service_name)) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: creating subchannel wrapper %p for subchannel %p", |
|
|
|
|
chand, this, subchannel_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: creating subchannel wrapper %p for subchannel %p", |
|
|
|
|
chand, this, subchannel_.get()); |
|
|
|
|
} |
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper"); |
|
|
|
|
auto* subchannel_node = subchannel_->channelz_node(); |
|
|
|
|
if (subchannel_node != nullptr) { |
|
|
|
@ -466,10 +471,11 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
~SubchannelWrapper() override { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p: destroying subchannel wrapper %p for subchannel %p", chand_, |
|
|
|
|
this, subchannel_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: destroying subchannel wrapper %p for subchannel %p", |
|
|
|
|
chand_, this, subchannel_.get()); |
|
|
|
|
} |
|
|
|
|
chand_->subchannel_wrappers_.erase(this); |
|
|
|
|
auto* subchannel_node = subchannel_->channelz_node(); |
|
|
|
|
if (subchannel_node != nullptr) { |
|
|
|
@ -531,12 +537,13 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
|
|
|
|
|
void UpdateHealthCheckServiceName( |
|
|
|
|
absl::optional<std::string> health_check_service_name) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p: subchannel wrapper %p: updating health check service " |
|
|
|
|
"name from \"%s\" to \"%s\"", |
|
|
|
|
chand_, this, health_check_service_name_->c_str(), |
|
|
|
|
health_check_service_name->c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: subchannel wrapper %p: updating health check service " |
|
|
|
|
"name from \"%s\" to \"%s\"", |
|
|
|
|
chand_, this, health_check_service_name_->c_str(), |
|
|
|
|
health_check_service_name->c_str()); |
|
|
|
|
} |
|
|
|
|
for (auto& p : watcher_map_) { |
|
|
|
|
WatcherWrapper*& watcher_wrapper = p.second; |
|
|
|
|
// Cancel the current watcher and create a new one using the new
|
|
|
|
@ -616,11 +623,12 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void OnConnectivityStateChange() override { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p: connectivity change for subchannel wrapper %p " |
|
|
|
|
"subchannel %p; hopping into work_serializer", |
|
|
|
|
parent_->chand_, parent_.get(), parent_->subchannel_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: connectivity change for subchannel wrapper %p " |
|
|
|
|
"subchannel %p; hopping into work_serializer", |
|
|
|
|
parent_->chand_, parent_.get(), parent_->subchannel_.get()); |
|
|
|
|
} |
|
|
|
|
Ref().release(); // ref owned by lambda
|
|
|
|
|
parent_->chand_->work_serializer_->Run( |
|
|
|
|
[this]() |
|
|
|
@ -650,13 +658,14 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
private: |
|
|
|
|
void ApplyUpdateInControlPlaneWorkSerializer() |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p: processing connectivity change in work serializer " |
|
|
|
|
"for subchannel wrapper %p subchannel %p " |
|
|
|
|
"watcher=%p", |
|
|
|
|
parent_->chand_, parent_.get(), parent_->subchannel_.get(), |
|
|
|
|
watcher_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: processing connectivity change in work serializer " |
|
|
|
|
"for subchannel wrapper %p subchannel %p " |
|
|
|
|
"watcher=%p", |
|
|
|
|
parent_->chand_, parent_.get(), parent_->subchannel_.get(), |
|
|
|
|
watcher_.get()); |
|
|
|
|
} |
|
|
|
|
ConnectivityStateChange state_change = PopConnectivityStateChange(); |
|
|
|
|
absl::optional<absl::Cord> keepalive_throttling = |
|
|
|
|
state_change.status.GetPayload(kKeepaliveThrottlingKey); |
|
|
|
@ -666,9 +675,10 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
&new_keepalive_time)) { |
|
|
|
|
if (new_keepalive_time > parent_->chand_->keepalive_time_) { |
|
|
|
|
parent_->chand_->keepalive_time_ = new_keepalive_time; |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: throttling keepalive time to %d", |
|
|
|
|
parent_->chand_, parent_->chand_->keepalive_time_); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d", |
|
|
|
|
parent_->chand_, parent_->chand_->keepalive_time_); |
|
|
|
|
} |
|
|
|
|
// Propagate the new keepalive time to all subchannels. This is so
|
|
|
|
|
// that new transports created by any subchannel (and not just the
|
|
|
|
|
// subchannel that received the GOAWAY), use the new keepalive time.
|
|
|
|
@ -995,8 +1005,9 @@ class ClientChannel::ClientChannelControlHelper |
|
|
|
|
void RequestReresolution() override |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
if (chand_->resolver_ == nullptr) return; // Shutting down.
|
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: started name re-resolving", chand_); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_); |
|
|
|
|
} |
|
|
|
|
chand_->resolver_->RequestReresolutionLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1078,9 +1089,10 @@ ClientChannel::ClientChannel(grpc_channel_element_args* args, |
|
|
|
|
state_tracker_("client_channel", GRPC_CHANNEL_IDLE), |
|
|
|
|
subchannel_pool_(GetSubchannelPool(args->channel_args)), |
|
|
|
|
disconnect_error_(GRPC_ERROR_NONE) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: creating client_channel for channel stack %p", this, |
|
|
|
|
owning_stack_); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p", |
|
|
|
|
this, owning_stack_); |
|
|
|
|
} |
|
|
|
|
// Start backup polling.
|
|
|
|
|
grpc_client_channel_start_backup_polling(interested_parties_); |
|
|
|
|
// Check client channel factory.
|
|
|
|
@ -1139,8 +1151,9 @@ ClientChannel::ClientChannel(grpc_channel_element_args* args, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ClientChannel::~ClientChannel() { |
|
|
|
|
grpc_client_channel_routing_trace.Log(GPR_INFO, |
|
|
|
|
"chand=%p: destroying channel", this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: destroying channel", this); |
|
|
|
|
} |
|
|
|
|
DestroyResolverAndLbPolicyLocked(); |
|
|
|
|
grpc_channel_args_destroy(channel_args_); |
|
|
|
|
GRPC_ERROR_UNREF(resolver_transient_failure_error_); |
|
|
|
@ -1211,8 +1224,9 @@ RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy( |
|
|
|
|
void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { |
|
|
|
|
// Handle race conditions.
|
|
|
|
|
if (resolver_ == nullptr) return; |
|
|
|
|
grpc_client_channel_routing_trace.Log(GPR_INFO, |
|
|
|
|
"chand=%p: got resolver result", this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: got resolver result", this); |
|
|
|
|
} |
|
|
|
|
// We only want to trace the address resolution in the follow cases:
|
|
|
|
|
// (a) Address resolution resulted in service config change.
|
|
|
|
|
// (b) Address resolution that causes number of backends to go from
|
|
|
|
@ -1240,17 +1254,19 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { |
|
|
|
|
RefCountedPtr<ServiceConfig> service_config; |
|
|
|
|
RefCountedPtr<ConfigSelector> config_selector; |
|
|
|
|
if (result.service_config_error != GRPC_ERROR_NONE) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: resolver returned service config error: %s", this, |
|
|
|
|
grpc_error_std_string(result.service_config_error).c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s", |
|
|
|
|
this, grpc_error_std_string(result.service_config_error).c_str()); |
|
|
|
|
} |
|
|
|
|
// If the service config was invalid, then fallback to the
|
|
|
|
|
// previously returned service config.
|
|
|
|
|
if (saved_service_config_ != nullptr) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p: resolver returned invalid service config. " |
|
|
|
|
"Continuing to use previous service config.", |
|
|
|
|
this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: resolver returned invalid service config. " |
|
|
|
|
"Continuing to use previous service config.", |
|
|
|
|
this); |
|
|
|
|
} |
|
|
|
|
service_config = saved_service_config_; |
|
|
|
|
config_selector = saved_config_selector_; |
|
|
|
|
} else { |
|
|
|
@ -1262,11 +1278,12 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { |
|
|
|
|
} |
|
|
|
|
} else if (result.service_config == nullptr) { |
|
|
|
|
// Resolver did not return any service config.
|
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p: resolver returned no service config. Using default " |
|
|
|
|
"service config for channel.", |
|
|
|
|
this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: resolver returned no service config. Using default " |
|
|
|
|
"service config for channel.", |
|
|
|
|
this); |
|
|
|
|
} |
|
|
|
|
service_config = default_service_config_; |
|
|
|
|
} else { |
|
|
|
|
// Use ServiceConfig and ConfigSelector returned by resolver.
|
|
|
|
@ -1295,9 +1312,8 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { |
|
|
|
|
UpdateServiceConfigInControlPlaneLocked( |
|
|
|
|
std::move(service_config), std::move(config_selector), |
|
|
|
|
parsed_service_config, lb_policy_config->name()); |
|
|
|
|
} else { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: service config not changed", this); |
|
|
|
|
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: service config not changed", this); |
|
|
|
|
} |
|
|
|
|
// Create or update LB policy, as needed.
|
|
|
|
|
CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config), |
|
|
|
@ -1329,9 +1345,10 @@ void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) { |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: resolver transient failure: %s", this, |
|
|
|
|
grpc_error_std_string(error).c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this, |
|
|
|
|
grpc_error_std_string(error).c_str()); |
|
|
|
|
} |
|
|
|
|
// If we already have an LB policy from a previous resolution
|
|
|
|
|
// result, then we continue to let it set the connectivity state.
|
|
|
|
|
// Otherwise, we go into TRANSIENT_FAILURE.
|
|
|
|
@ -1382,8 +1399,10 @@ void ClientChannel::CreateOrUpdateLbPolicyLocked( |
|
|
|
|
lb_policy_ = CreateLbPolicyLocked(*update_args.args); |
|
|
|
|
} |
|
|
|
|
// Update the policy.
|
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: Updating child policy %p", this, lb_policy_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this, |
|
|
|
|
lb_policy_.get()); |
|
|
|
|
} |
|
|
|
|
lb_policy_->UpdateLocked(std::move(update_args)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1398,8 +1417,10 @@ OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked( |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> lb_policy = |
|
|
|
|
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
|
|
|
|
&grpc_client_channel_routing_trace); |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: created new LB policy %p", this, lb_policy.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this, |
|
|
|
|
lb_policy.get()); |
|
|
|
|
} |
|
|
|
|
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
|
|
|
|
interested_parties_); |
|
|
|
|
return lb_policy; |
|
|
|
@ -1436,9 +1457,11 @@ void ClientChannel::UpdateServiceConfigInControlPlaneLocked( |
|
|
|
|
const char* lb_policy_name) { |
|
|
|
|
UniquePtr<char> service_config_json( |
|
|
|
|
gpr_strdup(service_config->json_string().c_str())); |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: resolver returned updated service config: \"%s\"", |
|
|
|
|
this, service_config_json.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: resolver returned updated service config: \"%s\"", this, |
|
|
|
|
service_config_json.get()); |
|
|
|
|
} |
|
|
|
|
// Save service config.
|
|
|
|
|
saved_service_config_ = std::move(service_config); |
|
|
|
|
// Update health check service name if needed.
|
|
|
|
@ -1461,9 +1484,10 @@ void ClientChannel::UpdateServiceConfigInControlPlaneLocked( |
|
|
|
|
} |
|
|
|
|
// Save config selector.
|
|
|
|
|
saved_config_selector_ = std::move(config_selector); |
|
|
|
|
grpc_client_channel_routing_trace.Log(GPR_INFO, |
|
|
|
|
"chand=%p: using ConfigSelector %p", |
|
|
|
|
this, saved_config_selector_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this, |
|
|
|
|
saved_config_selector_.get()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { |
|
|
|
@ -1471,9 +1495,10 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { |
|
|
|
|
RefCountedPtr<ServiceConfig> service_config = saved_service_config_; |
|
|
|
|
// Grab ref to config selector. Use default if resolver didn't supply one.
|
|
|
|
|
RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_; |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: switching to ConfigSelector %p", this, |
|
|
|
|
saved_config_selector_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this, |
|
|
|
|
saved_config_selector_.get()); |
|
|
|
|
} |
|
|
|
|
if (config_selector == nullptr) { |
|
|
|
|
config_selector = |
|
|
|
|
MakeRefCounted<DefaultConfigSelector>(saved_service_config_); |
|
|
|
@ -1543,8 +1568,9 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ClientChannel::CreateResolverLocked() { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: starting name resolution", this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: starting name resolution", this); |
|
|
|
|
} |
|
|
|
|
resolver_ = ResolverRegistry::CreateResolver( |
|
|
|
|
target_uri_.get(), channel_args_, interested_parties_, work_serializer_, |
|
|
|
|
absl::make_unique<ResolverResultHandler>(this)); |
|
|
|
@ -1555,19 +1581,23 @@ void ClientChannel::CreateResolverLocked() { |
|
|
|
|
GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving", |
|
|
|
|
absl::make_unique<LoadBalancingPolicy::QueuePicker>(nullptr)); |
|
|
|
|
resolver_->StartLocked(); |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ClientChannel::DestroyResolverAndLbPolicyLocked() { |
|
|
|
|
if (resolver_ != nullptr) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: shutting down resolver=%p", this, resolver_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this, |
|
|
|
|
resolver_.get()); |
|
|
|
|
} |
|
|
|
|
resolver_.reset(); |
|
|
|
|
if (lb_policy_ != nullptr) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: shutting down lb_policy=%p", this, |
|
|
|
|
lb_policy_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this, |
|
|
|
|
lb_policy_.get()); |
|
|
|
|
} |
|
|
|
|
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), |
|
|
|
|
interested_parties_); |
|
|
|
|
lb_policy_.reset(); |
|
|
|
@ -1620,11 +1650,12 @@ void ClientChannel::UpdateStateAndPickerLocked( |
|
|
|
|
MutexLock lock(&data_plane_mu_); |
|
|
|
|
// Handle subchannel updates.
|
|
|
|
|
for (auto& p : pending_subchannel_updates_) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p: updating subchannel wrapper %p data plane " |
|
|
|
|
"connected_subchannel to %p", |
|
|
|
|
this, p.first.get(), p.second.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: updating subchannel wrapper %p data plane " |
|
|
|
|
"connected_subchannel to %p", |
|
|
|
|
this, p.first.get(), p.second.get()); |
|
|
|
|
} |
|
|
|
|
// Note: We do not remove the entry from pending_subchannel_updates_
|
|
|
|
|
// here, since this would unref the subchannel wrapper; instead,
|
|
|
|
|
// we wait until we've released the lock to clear the map.
|
|
|
|
@ -1755,9 +1786,10 @@ void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) { |
|
|
|
|
} |
|
|
|
|
// Disconnect or enter IDLE.
|
|
|
|
|
if (op->disconnect_with_error != GRPC_ERROR_NONE) { |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p: disconnect_with_error: %s", this, |
|
|
|
|
grpc_error_std_string(op->disconnect_with_error).c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this, |
|
|
|
|
grpc_error_std_string(op->disconnect_with_error).c_str()); |
|
|
|
|
} |
|
|
|
|
DestroyResolverAndLbPolicyLocked(); |
|
|
|
|
intptr_t value; |
|
|
|
|
if (grpc_error_get_int(op->disconnect_with_error, |
|
|
|
@ -1904,8 +1936,9 @@ ClientChannel::CallData::CallData(grpc_call_element* elem, |
|
|
|
|
owning_call_(args.call_stack), |
|
|
|
|
call_combiner_(args.call_combiner), |
|
|
|
|
call_context_(args.context) { |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: created call", &chand, this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: created call", &chand, this); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ClientChannel::CallData::~CallData() { |
|
|
|
@ -1958,9 +1991,10 @@ void ClientChannel::CallData::StartTransportStreamOpBatch( |
|
|
|
|
// Note that once we have done so, we do not need to acquire the channel's
|
|
|
|
|
// resolution mutex, which is more efficient (especially for streaming calls).
|
|
|
|
|
if (calld->dynamic_call_ != nullptr) { |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p", chand, |
|
|
|
|
calld, calld->dynamic_call_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p", |
|
|
|
|
chand, calld, calld->dynamic_call_.get()); |
|
|
|
|
} |
|
|
|
|
calld->dynamic_call_->StartTransportStreamOpBatch(batch); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -1968,9 +2002,11 @@ void ClientChannel::CallData::StartTransportStreamOpBatch( |
|
|
|
|
//
|
|
|
|
|
// If we've previously been cancelled, immediately fail any new batches.
|
|
|
|
|
if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) { |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: failing batch with error: %s", chand, |
|
|
|
|
calld, grpc_error_std_string(calld->cancel_error_).c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s", |
|
|
|
|
chand, calld, |
|
|
|
|
grpc_error_std_string(calld->cancel_error_).c_str()); |
|
|
|
|
} |
|
|
|
|
// Note: This will release the call combiner.
|
|
|
|
|
grpc_transport_stream_op_batch_finish_with_failure( |
|
|
|
|
batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_); |
|
|
|
@ -1986,9 +2022,10 @@ void ClientChannel::CallData::StartTransportStreamOpBatch( |
|
|
|
|
GRPC_ERROR_UNREF(calld->cancel_error_); |
|
|
|
|
calld->cancel_error_ = |
|
|
|
|
GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand, calld, |
|
|
|
|
grpc_error_std_string(calld->cancel_error_).c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand, |
|
|
|
|
calld, grpc_error_std_string(calld->cancel_error_).c_str()); |
|
|
|
|
} |
|
|
|
|
// Fail all pending batches.
|
|
|
|
|
calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_), |
|
|
|
|
NoYieldCallCombiner); |
|
|
|
@ -2003,17 +2040,20 @@ void ClientChannel::CallData::StartTransportStreamOpBatch( |
|
|
|
|
// channel's resolution mutex to apply the service config to the call,
|
|
|
|
|
// after which we will create a dynamic call.
|
|
|
|
|
if (GPR_LIKELY(batch->send_initial_metadata)) { |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: grabbing resolution mutex to apply service " |
|
|
|
|
"config", |
|
|
|
|
chand, calld); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: grabbing resolution mutex to apply service " |
|
|
|
|
"config", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
|
CheckResolution(elem, GRPC_ERROR_NONE); |
|
|
|
|
} else { |
|
|
|
|
// For all other batches, release the call combiner.
|
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: saved batch, yielding call combiner", |
|
|
|
|
chand, calld); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: saved batch, yielding call combiner", chand, |
|
|
|
|
calld); |
|
|
|
|
} |
|
|
|
|
GRPC_CALL_COMBINER_STOP(calld->call_combiner_, |
|
|
|
|
"batch does not include send_initial_metadata"); |
|
|
|
|
} |
|
|
|
@ -2048,9 +2088,11 @@ void ClientChannel::CallData::PendingBatchesAdd( |
|
|
|
|
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { |
|
|
|
|
ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data); |
|
|
|
|
const size_t idx = GetBatchIndex(batch); |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, |
|
|
|
|
chand, this, idx); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand, |
|
|
|
|
this, idx); |
|
|
|
|
} |
|
|
|
|
grpc_transport_stream_op_batch*& pending = pending_batches_[idx]; |
|
|
|
|
GPR_ASSERT(pending == nullptr); |
|
|
|
|
pending = batch; |
|
|
|
@ -2168,12 +2210,13 @@ class ClientChannel::CallData::ResolverQueuedCallCanceller { |
|
|
|
|
auto* calld = static_cast<CallData*>(self->elem_->call_data); |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&chand->resolution_mu_); |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: cancelling resolver queued pick: " |
|
|
|
|
"error=%s self=%p calld->resolver_pick_canceller=%p", |
|
|
|
|
chand, calld, grpc_error_std_string(error).c_str(), self, |
|
|
|
|
calld->resolver_call_canceller_); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: cancelling resolver queued pick: " |
|
|
|
|
"error=%s self=%p calld->resolver_pick_canceller=%p", |
|
|
|
|
chand, calld, grpc_error_std_string(error).c_str(), self, |
|
|
|
|
calld->resolver_call_canceller_); |
|
|
|
|
} |
|
|
|
|
if (calld->resolver_call_canceller_ == self && error != GRPC_ERROR_NONE) { |
|
|
|
|
// Remove pick from list of queued picks.
|
|
|
|
|
calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_); |
|
|
|
@ -2194,9 +2237,11 @@ void ClientChannel::CallData::MaybeRemoveCallFromResolverQueuedCallsLocked( |
|
|
|
|
grpc_call_element* elem) { |
|
|
|
|
if (!queued_pending_resolver_result_) return; |
|
|
|
|
auto* chand = static_cast<ClientChannel*>(elem->channel_data); |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: removing from resolver queued picks list", |
|
|
|
|
chand, this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: removing from resolver queued picks list", |
|
|
|
|
chand, this); |
|
|
|
|
} |
|
|
|
|
chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_); |
|
|
|
|
queued_pending_resolver_result_ = false; |
|
|
|
|
// Lame the call combiner canceller.
|
|
|
|
@ -2207,9 +2252,10 @@ void ClientChannel::CallData::MaybeAddCallToResolverQueuedCallsLocked( |
|
|
|
|
grpc_call_element* elem) { |
|
|
|
|
if (queued_pending_resolver_result_) return; |
|
|
|
|
auto* chand = static_cast<ClientChannel*>(elem->channel_data); |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list", |
|
|
|
|
chand, this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list", |
|
|
|
|
chand, this); |
|
|
|
|
} |
|
|
|
|
queued_pending_resolver_result_ = true; |
|
|
|
|
resolver_queued_call_.elem = elem; |
|
|
|
|
chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_); |
|
|
|
@ -2220,9 +2266,10 @@ void ClientChannel::CallData::MaybeAddCallToResolverQueuedCallsLocked( |
|
|
|
|
grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked( |
|
|
|
|
grpc_call_element* elem, grpc_metadata_batch* initial_metadata) { |
|
|
|
|
ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data); |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: applying service config to call", chand, |
|
|
|
|
this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", |
|
|
|
|
chand, this); |
|
|
|
|
} |
|
|
|
|
ConfigSelector* config_selector = chand->config_selector_.get(); |
|
|
|
|
if (config_selector != nullptr) { |
|
|
|
|
// Use the ConfigSelector to determine the config for the call.
|
|
|
|
@ -2314,9 +2361,11 @@ void ClientChannel::CallData::ResolutionDone(void* arg, |
|
|
|
|
ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data); |
|
|
|
|
CallData* calld = static_cast<CallData*>(elem->call_data); |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: error applying config to call: error=%s", |
|
|
|
|
chand, calld, grpc_error_std_string(error).c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: error applying config to call: error=%s", |
|
|
|
|
chand, calld, grpc_error_std_string(error).c_str()); |
|
|
|
|
} |
|
|
|
|
calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -2413,15 +2462,19 @@ void ClientChannel::CallData::CreateDynamicCall(grpc_call_element* elem) { |
|
|
|
|
call_combiner_}; |
|
|
|
|
grpc_error_handle error = GRPC_ERROR_NONE; |
|
|
|
|
DynamicFilters* channel_stack = args.channel_stack.get(); |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: creating dynamic call stack on channel_stack=%p", |
|
|
|
|
chand, this, channel_stack); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: creating dynamic call stack on channel_stack=%p", |
|
|
|
|
chand, this, channel_stack); |
|
|
|
|
} |
|
|
|
|
dynamic_call_ = channel_stack->CreateCall(std::move(args), &error); |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: failed to create dynamic call: error=%s", |
|
|
|
|
chand, this, grpc_error_std_string(error).c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: failed to create dynamic call: error=%s", |
|
|
|
|
chand, this, grpc_error_std_string(error).c_str()); |
|
|
|
|
} |
|
|
|
|
PendingBatchesFail(elem, error, YieldCallCombiner); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -2614,9 +2667,11 @@ size_t ClientChannel::LoadBalancedCall::GetBatchIndex( |
|
|
|
|
void ClientChannel::LoadBalancedCall::PendingBatchesAdd( |
|
|
|
|
grpc_transport_stream_op_batch* batch) { |
|
|
|
|
const size_t idx = GetBatchIndex(batch); |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR, |
|
|
|
|
chand_, this, idx); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR, |
|
|
|
|
chand_, this, idx); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(pending_batches_[idx] == nullptr); |
|
|
|
|
pending_batches_[idx] = batch; |
|
|
|
|
} |
|
|
|
@ -2773,9 +2828,11 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch( |
|
|
|
|
// the channel's data plane mutex, which is more efficient (especially for
|
|
|
|
|
// streaming calls).
|
|
|
|
|
if (subchannel_call_ != nullptr) { |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: starting batch on subchannel_call=%p", |
|
|
|
|
chand_, this, subchannel_call_.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p lb_call=%p: starting batch on subchannel_call=%p", |
|
|
|
|
chand_, this, subchannel_call_.get()); |
|
|
|
|
} |
|
|
|
|
subchannel_call_->StartTransportStreamOpBatch(batch); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -2783,9 +2840,10 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch( |
|
|
|
|
//
|
|
|
|
|
// If we've previously been cancelled, immediately fail any new batches.
|
|
|
|
|
if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) { |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s", chand_, |
|
|
|
|
this, grpc_error_std_string(cancel_error_).c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s", |
|
|
|
|
chand_, this, grpc_error_std_string(cancel_error_).c_str()); |
|
|
|
|
} |
|
|
|
|
// Note: This will release the call combiner.
|
|
|
|
|
grpc_transport_stream_op_batch_finish_with_failure( |
|
|
|
|
batch, GRPC_ERROR_REF(cancel_error_), call_combiner_); |
|
|
|
@ -2800,9 +2858,10 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch( |
|
|
|
|
// error to the caller when the first batch does get passed down.
|
|
|
|
|
GRPC_ERROR_UNREF(cancel_error_); |
|
|
|
|
cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s", chand_, |
|
|
|
|
this, grpc_error_std_string(cancel_error_).c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s", |
|
|
|
|
chand_, this, grpc_error_std_string(cancel_error_).c_str()); |
|
|
|
|
} |
|
|
|
|
// Fail all pending batches.
|
|
|
|
|
PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner); |
|
|
|
|
// Note: This will release the call combiner.
|
|
|
|
@ -2815,16 +2874,19 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch( |
|
|
|
|
// For batches containing a send_initial_metadata op, acquire the
|
|
|
|
|
// channel's data plane mutex to pick a subchannel.
|
|
|
|
|
if (GPR_LIKELY(batch->send_initial_metadata)) { |
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p lb_call=%p: grabbing data plane mutex to perform pick", |
|
|
|
|
chand_, this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p lb_call=%p: grabbing data plane mutex to perform pick", |
|
|
|
|
chand_, this); |
|
|
|
|
} |
|
|
|
|
PickSubchannel(this, GRPC_ERROR_NONE); |
|
|
|
|
} else { |
|
|
|
|
// For all other batches, release the call combiner.
|
|
|
|
|
grpc_client_channel_call_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: saved batch, yielding call combiner", |
|
|
|
|
chand_, this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p lb_call=%p: saved batch, yielding call combiner", |
|
|
|
|
chand_, this); |
|
|
|
|
} |
|
|
|
|
GRPC_CALL_COMBINER_STOP(call_combiner_, |
|
|
|
|
"batch does not include send_initial_metadata"); |
|
|
|
|
} |
|
|
|
@ -2927,10 +2989,11 @@ void ClientChannel::LoadBalancedCall::CreateSubchannelCall() { |
|
|
|
|
call_context_, call_combiner_}; |
|
|
|
|
grpc_error_handle error = GRPC_ERROR_NONE; |
|
|
|
|
subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error); |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", |
|
|
|
|
chand_, this, subchannel_call_.get(), |
|
|
|
|
grpc_error_std_string(error).c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_, |
|
|
|
|
this, subchannel_call_.get(), grpc_error_std_string(error).c_str()); |
|
|
|
|
} |
|
|
|
|
if (on_call_destruction_complete_ != nullptr) { |
|
|
|
|
subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_); |
|
|
|
|
on_call_destruction_complete_ = nullptr; |
|
|
|
@ -2965,12 +3028,13 @@ class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller { |
|
|
|
|
auto* chand = lb_call->chand_; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&chand->data_plane_mu_); |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p lb_call=%p: cancelling queued pick: " |
|
|
|
|
"error=%s self=%p calld->pick_canceller=%p", |
|
|
|
|
chand, lb_call, grpc_error_std_string(error).c_str(), self, |
|
|
|
|
lb_call->lb_call_canceller_); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p lb_call=%p: cancelling queued pick: " |
|
|
|
|
"error=%s self=%p calld->pick_canceller=%p", |
|
|
|
|
chand, lb_call, grpc_error_std_string(error).c_str(), self, |
|
|
|
|
lb_call->lb_call_canceller_); |
|
|
|
|
} |
|
|
|
|
if (lb_call->lb_call_canceller_ == self && error != GRPC_ERROR_NONE) { |
|
|
|
|
lb_call->call_dispatch_controller_->Commit(); |
|
|
|
|
// Remove pick from list of queued picks.
|
|
|
|
@ -2990,9 +3054,10 @@ class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller { |
|
|
|
|
|
|
|
|
|
void ClientChannel::LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() { |
|
|
|
|
if (!queued_pending_lb_pick_) return; |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list", chand_, |
|
|
|
|
this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list", |
|
|
|
|
chand_, this); |
|
|
|
|
} |
|
|
|
|
chand_->RemoveLbQueuedCall(&queued_call_, pollent_); |
|
|
|
|
queued_pending_lb_pick_ = false; |
|
|
|
|
// Lame the call combiner canceller.
|
|
|
|
@ -3001,9 +3066,10 @@ void ClientChannel::LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() { |
|
|
|
|
|
|
|
|
|
void ClientChannel::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() { |
|
|
|
|
if (queued_pending_lb_pick_) return; |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list", chand_, |
|
|
|
|
this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list", |
|
|
|
|
chand_, this); |
|
|
|
|
} |
|
|
|
|
queued_pending_lb_pick_ = true; |
|
|
|
|
queued_call_.lb_call = this; |
|
|
|
|
chand_->AddLbQueuedCall(&queued_call_, pollent_); |
|
|
|
@ -3021,9 +3087,11 @@ void ClientChannel::LoadBalancedCall::PickDone(void* arg, |
|
|
|
|
grpc_error_handle error) { |
|
|
|
|
auto* self = static_cast<LoadBalancedCall*>(arg); |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: failed to pick subchannel: error=%s", |
|
|
|
|
self->chand_, self, grpc_error_std_string(error).c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p lb_call=%p: failed to pick subchannel: error=%s", |
|
|
|
|
self->chand_, self, grpc_error_std_string(error).c_str()); |
|
|
|
|
} |
|
|
|
|
self->PendingBatchesFail(GRPC_ERROR_REF(error), YieldCallCombiner); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -3069,10 +3137,11 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked( |
|
|
|
|
// CompletePick
|
|
|
|
|
[this](LoadBalancingPolicy::PickResult::Complete* complete_pick) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p lb_call=%p: LB pick succeeded: subchannel=%p", chand_, |
|
|
|
|
this, complete_pick->subchannel.get()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p lb_call=%p: LB pick succeeded: subchannel=%p", |
|
|
|
|
chand_, this, complete_pick->subchannel.get()); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(complete_pick->subchannel != nullptr); |
|
|
|
|
// Grab a ref to the connected subchannel while we're still
|
|
|
|
|
// holding the data plane mutex.
|
|
|
|
@ -3087,8 +3156,10 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked( |
|
|
|
|
// QueuePick
|
|
|
|
|
[this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_, this); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_, |
|
|
|
|
this); |
|
|
|
|
} |
|
|
|
|
MaybeAddCallToLbQueuedCallsLocked(); |
|
|
|
|
return false; |
|
|
|
|
}, |
|
|
|
@ -3096,9 +3167,10 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked( |
|
|
|
|
[this, send_initial_metadata_flags, |
|
|
|
|
&error](LoadBalancingPolicy::PickResult::Fail* fail_pick) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s", chand_, |
|
|
|
|
this, fail_pick->status.ToString().c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s", |
|
|
|
|
chand_, this, fail_pick->status.ToString().c_str()); |
|
|
|
|
} |
|
|
|
|
// If we're shutting down, fail all RPCs.
|
|
|
|
|
grpc_error_handle disconnect_error = chand_->disconnect_error(); |
|
|
|
|
if (disconnect_error != GRPC_ERROR_NONE) { |
|
|
|
@ -3126,9 +3198,10 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked( |
|
|
|
|
// DropPick
|
|
|
|
|
[this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) { |
|
|
|
|
grpc_client_channel_routing_trace.Log( |
|
|
|
|
GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s", chand_, |
|
|
|
|
this, drop_pick->status.ToString().c_str()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s", |
|
|
|
|
chand_, this, drop_pick->status.ToString().c_str()); |
|
|
|
|
} |
|
|
|
|
*error = |
|
|
|
|
grpc_error_set_int(absl_status_to_grpc_error(drop_pick->status), |
|
|
|
|
GRPC_ERROR_INT_LB_POLICY_DROP, 1); |
|
|
|
|