|
|
|
@ -130,7 +130,7 @@ class ChannelData { |
|
|
|
|
return disconnect_error_.Load(MemoryOrder::ACQUIRE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_combiner* data_plane_combiner() const { return data_plane_combiner_; } |
|
|
|
|
Mutex* data_plane_mu() const { return &data_plane_mu_; } |
|
|
|
|
|
|
|
|
|
LoadBalancingPolicy::SubchannelPicker* picker() const { |
|
|
|
|
return picker_.get(); |
|
|
|
@ -166,8 +166,6 @@ class ChannelData { |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
class SubchannelWrapper; |
|
|
|
|
class ConnectivityStateAndPickerSetter; |
|
|
|
|
class ServiceConfigSetter; |
|
|
|
|
class ClientChannelControlHelper; |
|
|
|
|
|
|
|
|
|
class ExternalConnectivityWatcher { |
|
|
|
@ -214,6 +212,14 @@ class ChannelData { |
|
|
|
|
ChannelData(grpc_channel_element_args* args, grpc_error** error); |
|
|
|
|
~ChannelData(); |
|
|
|
|
|
|
|
|
|
void UpdateStateAndPickerLocked( |
|
|
|
|
grpc_connectivity_state state, const char* reason, |
|
|
|
|
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker); |
|
|
|
|
|
|
|
|
|
void UpdateServiceConfigLocked( |
|
|
|
|
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data, |
|
|
|
|
RefCountedPtr<ServiceConfig> service_config); |
|
|
|
|
|
|
|
|
|
void CreateResolvingLoadBalancingPolicyLocked(); |
|
|
|
|
|
|
|
|
|
void DestroyResolvingLoadBalancingPolicyLocked(); |
|
|
|
@ -250,9 +256,9 @@ class ChannelData { |
|
|
|
|
channelz::ChannelNode* channelz_node_; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Fields used in the data plane. Guarded by data_plane_combiner.
|
|
|
|
|
// Fields used in the data plane. Guarded by data_plane_mu.
|
|
|
|
|
//
|
|
|
|
|
grpc_combiner* data_plane_combiner_; |
|
|
|
|
mutable Mutex data_plane_mu_; |
|
|
|
|
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_; |
|
|
|
|
QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks.
|
|
|
|
|
// Data from service config.
|
|
|
|
@ -282,13 +288,13 @@ class ChannelData { |
|
|
|
|
Map<SubchannelWrapper*, bool> subchannel_wrappers_; |
|
|
|
|
// Pending ConnectedSubchannel updates for each SubchannelWrapper.
|
|
|
|
|
// Updates are queued here in the control plane combiner and then applied
|
|
|
|
|
// in the data plane combiner when the picker is updated.
|
|
|
|
|
// in the data plane mutex when the picker is updated.
|
|
|
|
|
Map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>, |
|
|
|
|
RefCountedPtrLess<SubchannelWrapper>> |
|
|
|
|
pending_subchannel_updates_; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Fields accessed from both data plane and control plane combiners.
|
|
|
|
|
// Fields accessed from both data plane mutex and control plane combiner.
|
|
|
|
|
//
|
|
|
|
|
Atomic<grpc_error*> disconnect_error_; |
|
|
|
|
|
|
|
|
@ -322,7 +328,16 @@ class CallData { |
|
|
|
|
void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem); |
|
|
|
|
|
|
|
|
|
// Invoked by channel for queued picks when the picker is updated.
|
|
|
|
|
static void StartPickLocked(void* arg, grpc_error* error); |
|
|
|
|
static void PickSubchannel(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
// Helper function for performing a pick while holding the data plane
|
|
|
|
|
// mutex. Returns true if the pick is complete, in which case the caller
|
|
|
|
|
// must invoke PickDone() or AsyncPickDone() with the returned error.
|
|
|
|
|
bool PickSubchannelLocked(grpc_call_element* elem, grpc_error** error); |
|
|
|
|
|
|
|
|
|
// Schedules a callback to process the completed pick. The callback
|
|
|
|
|
// will not run until after this method returns.
|
|
|
|
|
void AsyncPickDone(grpc_call_element* elem, grpc_error* error); |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
class QueuedPickCanceller; |
|
|
|
@ -931,7 +946,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
return connected_subchannel_.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Caller must be holding the data-plane combiner.
|
|
|
|
|
// Caller must be holding the data-plane mutex.
|
|
|
|
|
ConnectedSubchannel* connected_subchannel_in_data_plane() const { |
|
|
|
|
return connected_subchannel_in_data_plane_.get(); |
|
|
|
|
} |
|
|
|
@ -1059,7 +1074,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
// Update the connected subchannel only if the channel is not shutting
|
|
|
|
|
// down. This is because once the channel is shutting down, we
|
|
|
|
|
// ignore picker updates from the LB policy, which means that
|
|
|
|
|
// ConnectivityStateAndPickerSetter will never process the entries
|
|
|
|
|
// UpdateStateAndPickerLocked() will never process the entries
|
|
|
|
|
// in chand_->pending_subchannel_updates_. So we don't want to add
|
|
|
|
|
// entries there that will never be processed, since that would
|
|
|
|
|
// leave dangling refs to the channel and prevent its destruction.
|
|
|
|
@ -1069,7 +1084,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
if (connected_subchannel_ != connected_subchannel) { |
|
|
|
|
connected_subchannel_ = std::move(connected_subchannel); |
|
|
|
|
// Record the new connected subchannel so that it can be updated
|
|
|
|
|
// in the data plane combiner the next time the picker is updated.
|
|
|
|
|
// in the data plane mutex the next time the picker is updated.
|
|
|
|
|
chand_->pending_subchannel_updates_[Ref( |
|
|
|
|
DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_; |
|
|
|
|
} |
|
|
|
@ -1086,159 +1101,10 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
Map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_; |
|
|
|
|
// To be accessed only in the control plane combiner.
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel_; |
|
|
|
|
// To be accessed only in the data plane combiner.
|
|
|
|
|
// To be accessed only in the data plane mutex.
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// ChannelData::ConnectivityStateAndPickerSetter
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
// A fire-and-forget class that sets the channel's connectivity state
|
|
|
|
|
// and then hops into the data plane combiner to update the picker.
|
|
|
|
|
// Must be instantiated while holding the control plane combiner.
|
|
|
|
|
// Deletes itself when done.
|
|
|
|
|
class ChannelData::ConnectivityStateAndPickerSetter { |
|
|
|
|
public: |
|
|
|
|
ConnectivityStateAndPickerSetter( |
|
|
|
|
ChannelData* chand, grpc_connectivity_state state, const char* reason, |
|
|
|
|
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) |
|
|
|
|
: chand_(chand), picker_(std::move(picker)) { |
|
|
|
|
// Clean the control plane when entering IDLE, while holding control plane
|
|
|
|
|
// combiner.
|
|
|
|
|
if (picker_ == nullptr) { |
|
|
|
|
chand->health_check_service_name_.reset(); |
|
|
|
|
chand->saved_service_config_.reset(); |
|
|
|
|
chand->received_first_resolver_result_ = false; |
|
|
|
|
} |
|
|
|
|
// Update connectivity state here, while holding control plane combiner.
|
|
|
|
|
grpc_connectivity_state_set(&chand->state_tracker_, state, reason); |
|
|
|
|
if (chand->channelz_node_ != nullptr) { |
|
|
|
|
chand->channelz_node_->SetConnectivityState(state); |
|
|
|
|
chand->channelz_node_->AddTraceEvent( |
|
|
|
|
channelz::ChannelTrace::Severity::Info, |
|
|
|
|
grpc_slice_from_static_string( |
|
|
|
|
channelz::ChannelNode::GetChannelConnectivityStateChangeString( |
|
|
|
|
state))); |
|
|
|
|
} |
|
|
|
|
// Grab any pending subchannel updates.
|
|
|
|
|
pending_subchannel_updates_ = |
|
|
|
|
std::move(chand_->pending_subchannel_updates_); |
|
|
|
|
// Bounce into the data plane combiner to reset the picker.
|
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, |
|
|
|
|
"ConnectivityStateAndPickerSetter"); |
|
|
|
|
GRPC_CLOSURE_INIT(&closure_, SetPickerInDataPlane, this, |
|
|
|
|
grpc_combiner_scheduler(chand->data_plane_combiner_)); |
|
|
|
|
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
static void SetPickerInDataPlane(void* arg, grpc_error* ignored) { |
|
|
|
|
auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg); |
|
|
|
|
// Handle subchannel updates.
|
|
|
|
|
for (auto& p : self->pending_subchannel_updates_) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: updating subchannel wrapper %p data plane " |
|
|
|
|
"connected_subchannel to %p", |
|
|
|
|
self->chand_, p.first.get(), p.second.get()); |
|
|
|
|
} |
|
|
|
|
p.first->set_connected_subchannel_in_data_plane(std::move(p.second)); |
|
|
|
|
} |
|
|
|
|
// Swap out the picker. We hang on to the old picker so that it can
|
|
|
|
|
// be deleted in the control-plane combiner, since that's where we need
|
|
|
|
|
// to unref the subchannel wrappers that are reffed by the picker.
|
|
|
|
|
self->picker_.swap(self->chand_->picker_); |
|
|
|
|
// Clean the data plane if the updated picker is nullptr.
|
|
|
|
|
if (self->chand_->picker_ == nullptr) { |
|
|
|
|
self->chand_->received_service_config_data_ = false; |
|
|
|
|
self->chand_->retry_throttle_data_.reset(); |
|
|
|
|
self->chand_->service_config_.reset(); |
|
|
|
|
} |
|
|
|
|
// Re-process queued picks.
|
|
|
|
|
for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr; |
|
|
|
|
pick = pick->next) { |
|
|
|
|
CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
// Pop back into the control plane combiner to delete ourself, so
|
|
|
|
|
// that we make sure to unref subchannel wrappers there. This
|
|
|
|
|
// includes both the ones reffed by the old picker (now stored in
|
|
|
|
|
// self->picker_) and the ones in self->pending_subchannel_updates_.
|
|
|
|
|
GRPC_CLOSURE_INIT(&self->closure_, CleanUpInControlPlane, self, |
|
|
|
|
grpc_combiner_scheduler(self->chand_->combiner_)); |
|
|
|
|
GRPC_CLOSURE_SCHED(&self->closure_, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void CleanUpInControlPlane(void* arg, grpc_error* ignored) { |
|
|
|
|
auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg); |
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, |
|
|
|
|
"ConnectivityStateAndPickerSetter"); |
|
|
|
|
Delete(self); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ChannelData* chand_; |
|
|
|
|
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_; |
|
|
|
|
Map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>, |
|
|
|
|
RefCountedPtrLess<SubchannelWrapper>> |
|
|
|
|
pending_subchannel_updates_; |
|
|
|
|
grpc_closure closure_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// ChannelData::ServiceConfigSetter
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
// A fire-and-forget class that sets the channel's service config data
|
|
|
|
|
// in the data plane combiner. Deletes itself when done.
|
|
|
|
|
class ChannelData::ServiceConfigSetter { |
|
|
|
|
public: |
|
|
|
|
ServiceConfigSetter( |
|
|
|
|
ChannelData* chand, |
|
|
|
|
Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling> |
|
|
|
|
retry_throttle_data, |
|
|
|
|
RefCountedPtr<ServiceConfig> service_config) |
|
|
|
|
: chand_(chand), |
|
|
|
|
retry_throttle_data_(retry_throttle_data), |
|
|
|
|
service_config_(std::move(service_config)) { |
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ServiceConfigSetter"); |
|
|
|
|
GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this, |
|
|
|
|
grpc_combiner_scheduler(chand->data_plane_combiner_)); |
|
|
|
|
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
static void SetServiceConfigData(void* arg, grpc_error* ignored) { |
|
|
|
|
ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg); |
|
|
|
|
ChannelData* chand = self->chand_; |
|
|
|
|
// Update channel state.
|
|
|
|
|
chand->received_service_config_data_ = true; |
|
|
|
|
if (self->retry_throttle_data_.has_value()) { |
|
|
|
|
chand->retry_throttle_data_ = |
|
|
|
|
internal::ServerRetryThrottleMap::GetDataForServer( |
|
|
|
|
chand->server_name_.get(), |
|
|
|
|
self->retry_throttle_data_.value().max_milli_tokens, |
|
|
|
|
self->retry_throttle_data_.value().milli_token_ratio); |
|
|
|
|
} |
|
|
|
|
chand->service_config_ = std::move(self->service_config_); |
|
|
|
|
// Apply service config to queued picks.
|
|
|
|
|
for (QueuedPick* pick = chand->queued_picks_; pick != nullptr; |
|
|
|
|
pick = pick->next) { |
|
|
|
|
CallData* calld = static_cast<CallData*>(pick->elem->call_data); |
|
|
|
|
calld->MaybeApplyServiceConfigToCallLocked(pick->elem); |
|
|
|
|
} |
|
|
|
|
// Clean up.
|
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, |
|
|
|
|
"ServiceConfigSetter"); |
|
|
|
|
Delete(self); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ChannelData* chand_; |
|
|
|
|
Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling> |
|
|
|
|
retry_throttle_data_; |
|
|
|
|
RefCountedPtr<ServiceConfig> service_config_; |
|
|
|
|
grpc_closure closure_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// ChannelData::ExternalConnectivityWatcher::WatcherList
|
|
|
|
|
//
|
|
|
|
@ -1409,9 +1275,7 @@ class ChannelData::ClientChannelControlHelper |
|
|
|
|
} |
|
|
|
|
// Do update only if not shutting down.
|
|
|
|
|
if (disconnect_error == GRPC_ERROR_NONE) { |
|
|
|
|
// Will delete itself.
|
|
|
|
|
New<ConnectivityStateAndPickerSetter>(chand_, state, "helper", |
|
|
|
|
std::move(picker)); |
|
|
|
|
chand_->UpdateStateAndPickerLocked(state, "helper", std::move(picker)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1495,7 +1359,6 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) |
|
|
|
|
client_channel_factory_( |
|
|
|
|
ClientChannelFactory::GetFromChannelArgs(args->channel_args)), |
|
|
|
|
channelz_node_(GetChannelzNode(args->channel_args)), |
|
|
|
|
data_plane_combiner_(grpc_combiner_create()), |
|
|
|
|
combiner_(grpc_combiner_create()), |
|
|
|
|
interested_parties_(grpc_pollset_set_create()), |
|
|
|
|
subchannel_pool_(GetSubchannelPool(args->channel_args)), |
|
|
|
@ -1568,13 +1431,108 @@ ChannelData::~ChannelData() { |
|
|
|
|
// Stop backup polling.
|
|
|
|
|
grpc_client_channel_stop_backup_polling(interested_parties_); |
|
|
|
|
grpc_pollset_set_destroy(interested_parties_); |
|
|
|
|
GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel"); |
|
|
|
|
GRPC_COMBINER_UNREF(combiner_, "client_channel"); |
|
|
|
|
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED)); |
|
|
|
|
grpc_connectivity_state_destroy(&state_tracker_); |
|
|
|
|
gpr_mu_destroy(&info_mu_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::UpdateStateAndPickerLocked( |
|
|
|
|
grpc_connectivity_state state, const char* reason, |
|
|
|
|
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) { |
|
|
|
|
// Clean the control plane when entering IDLE.
|
|
|
|
|
if (picker_ == nullptr) { |
|
|
|
|
health_check_service_name_.reset(); |
|
|
|
|
saved_service_config_.reset(); |
|
|
|
|
received_first_resolver_result_ = false; |
|
|
|
|
} |
|
|
|
|
// Update connectivity state.
|
|
|
|
|
grpc_connectivity_state_set(&state_tracker_, state, reason); |
|
|
|
|
if (channelz_node_ != nullptr) { |
|
|
|
|
channelz_node_->SetConnectivityState(state); |
|
|
|
|
channelz_node_->AddTraceEvent( |
|
|
|
|
channelz::ChannelTrace::Severity::Info, |
|
|
|
|
grpc_slice_from_static_string( |
|
|
|
|
channelz::ChannelNode::GetChannelConnectivityStateChangeString( |
|
|
|
|
state))); |
|
|
|
|
} |
|
|
|
|
// Grab data plane lock to do subchannel updates and update the picker.
|
|
|
|
|
//
|
|
|
|
|
// Note that we want to minimize the work done while holding the data
|
|
|
|
|
// plane lock, to keep the critical section small. So, for all of the
|
|
|
|
|
// objects that we might wind up unreffing here, we actually hold onto
|
|
|
|
|
// the refs until after we release the lock, and then unref them at
|
|
|
|
|
// that point. This includes the following:
|
|
|
|
|
// - refs to subchannel wrappers in the keys of pending_subchannel_updates_
|
|
|
|
|
// - ref stored in retry_throttle_data_
|
|
|
|
|
// - ref stored in service_config_
|
|
|
|
|
// - ownership of the existing picker in picker_
|
|
|
|
|
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_to_unref; |
|
|
|
|
RefCountedPtr<ServiceConfig> service_config_to_unref; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&data_plane_mu_); |
|
|
|
|
// Handle subchannel updates.
|
|
|
|
|
for (auto& p : pending_subchannel_updates_) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: updating subchannel wrapper %p data plane " |
|
|
|
|
"connected_subchannel to %p", |
|
|
|
|
this, p.first.get(), p.second.get()); |
|
|
|
|
} |
|
|
|
|
// Note: We do not remove the entry from pending_subchannel_updates_
|
|
|
|
|
// here, since this would unref the subchannel wrapper; instead,
|
|
|
|
|
// we wait until we've released the lock to clear the map.
|
|
|
|
|
p.first->set_connected_subchannel_in_data_plane(std::move(p.second)); |
|
|
|
|
} |
|
|
|
|
// Swap out the picker.
|
|
|
|
|
// Note: Original value will be destroyed after the lock is released.
|
|
|
|
|
picker_.swap(picker); |
|
|
|
|
// Clean the data plane if the updated picker is nullptr.
|
|
|
|
|
if (picker_ == nullptr) { |
|
|
|
|
received_service_config_data_ = false; |
|
|
|
|
// Note: We save the objects to unref until after the lock is released.
|
|
|
|
|
retry_throttle_data_to_unref = std::move(retry_throttle_data_); |
|
|
|
|
service_config_to_unref = std::move(service_config_); |
|
|
|
|
} |
|
|
|
|
// Re-process queued picks.
|
|
|
|
|
for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) { |
|
|
|
|
grpc_call_element* elem = pick->elem; |
|
|
|
|
CallData* calld = static_cast<CallData*>(elem->call_data); |
|
|
|
|
grpc_error* error = GRPC_ERROR_NONE; |
|
|
|
|
if (calld->PickSubchannelLocked(elem, &error)) { |
|
|
|
|
calld->AsyncPickDone(elem, error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Clear the pending update map after releasing the lock, to keep the
|
|
|
|
|
// critical section small.
|
|
|
|
|
pending_subchannel_updates_.clear(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::UpdateServiceConfigLocked( |
|
|
|
|
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data, |
|
|
|
|
RefCountedPtr<ServiceConfig> service_config) { |
|
|
|
|
// Grab data plane lock to update service config.
|
|
|
|
|
//
|
|
|
|
|
// We defer unreffing the old values (and deallocating memory) until
|
|
|
|
|
// after releasing the lock to keep the critical section small.
|
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&data_plane_mu_); |
|
|
|
|
// Update service config.
|
|
|
|
|
received_service_config_data_ = true; |
|
|
|
|
// Old values will be unreffed after lock is released.
|
|
|
|
|
retry_throttle_data_.swap(retry_throttle_data); |
|
|
|
|
service_config_.swap(service_config); |
|
|
|
|
// Apply service config to queued picks.
|
|
|
|
|
for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) { |
|
|
|
|
CallData* calld = static_cast<CallData*>(pick->elem->call_data); |
|
|
|
|
calld->MaybeApplyServiceConfigToCallLocked(pick->elem); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Old values will be unreffed after lock is released when they go out
|
|
|
|
|
// of scope.
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::CreateResolvingLoadBalancingPolicyLocked() { |
|
|
|
|
// Instantiate resolving LB policy.
|
|
|
|
|
LoadBalancingPolicy::Args lb_args; |
|
|
|
@ -1746,15 +1704,20 @@ bool ChannelData::ProcessResolverResultLocked( |
|
|
|
|
// if we feel it is unnecessary.
|
|
|
|
|
if (service_config_changed || !chand->received_first_resolver_result_) { |
|
|
|
|
chand->received_first_resolver_result_ = true; |
|
|
|
|
Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling> |
|
|
|
|
retry_throttle_data; |
|
|
|
|
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; |
|
|
|
|
if (parsed_service_config != nullptr) { |
|
|
|
|
retry_throttle_data = parsed_service_config->retry_throttling(); |
|
|
|
|
Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling> |
|
|
|
|
retry_throttle_config = parsed_service_config->retry_throttling(); |
|
|
|
|
if (retry_throttle_config.has_value()) { |
|
|
|
|
retry_throttle_data = |
|
|
|
|
internal::ServerRetryThrottleMap::GetDataForServer( |
|
|
|
|
chand->server_name_.get(), |
|
|
|
|
retry_throttle_config.value().max_milli_tokens, |
|
|
|
|
retry_throttle_config.value().milli_token_ratio); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Create service config setter to update channel state in the data
|
|
|
|
|
// plane combiner. Destroys itself when done.
|
|
|
|
|
New<ServiceConfigSetter>(chand, retry_throttle_data, |
|
|
|
|
chand->saved_service_config_); |
|
|
|
|
chand->UpdateServiceConfigLocked(std::move(retry_throttle_data), |
|
|
|
|
chand->saved_service_config_); |
|
|
|
|
} |
|
|
|
|
UniquePtr<char> processed_lb_policy_name; |
|
|
|
|
chand->ProcessLbPolicy(result, parsed_service_config, |
|
|
|
@ -1838,8 +1801,8 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) { |
|
|
|
|
static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) { |
|
|
|
|
if (chand->disconnect_error() == GRPC_ERROR_NONE) { |
|
|
|
|
// Enter IDLE state.
|
|
|
|
|
New<ConnectivityStateAndPickerSetter>(chand, GRPC_CHANNEL_IDLE, |
|
|
|
|
"channel entering IDLE", nullptr); |
|
|
|
|
chand->UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, |
|
|
|
|
"channel entering IDLE", nullptr); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(op->disconnect_with_error); |
|
|
|
|
} else { |
|
|
|
@ -1848,8 +1811,8 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) { |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
chand->disconnect_error_.Store(op->disconnect_with_error, |
|
|
|
|
MemoryOrder::RELEASE); |
|
|
|
|
New<ConnectivityStateAndPickerSetter>( |
|
|
|
|
chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API", |
|
|
|
|
chand->UpdateStateAndPickerLocked( |
|
|
|
|
GRPC_CHANNEL_SHUTDOWN, "shutdown from API", |
|
|
|
|
UniquePtr<LoadBalancingPolicy::SubchannelPicker>( |
|
|
|
|
New<LoadBalancingPolicy::TransientFailurePicker>( |
|
|
|
|
GRPC_ERROR_REF(op->disconnect_with_error)))); |
|
|
|
@ -2092,8 +2055,8 @@ void CallData::StartTransportStreamOpBatch( |
|
|
|
|
// Add the batch to the pending list.
|
|
|
|
|
calld->PendingBatchesAdd(elem, batch); |
|
|
|
|
// Check if we've already gotten a subchannel call.
|
|
|
|
|
// Note that once we have completed the pick, we do not need to enter
|
|
|
|
|
// the channel combiner, which is more efficient (especially for
|
|
|
|
|
// Note that once we have picked a subchannel, we do not need to acquire
|
|
|
|
|
// the channel's data plane mutex, which is more efficient (especially for
|
|
|
|
|
// streaming calls).
|
|
|
|
|
if (calld->subchannel_call_ != nullptr) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
@ -2105,18 +2068,15 @@ void CallData::StartTransportStreamOpBatch( |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// We do not yet have a subchannel call.
|
|
|
|
|
// For batches containing a send_initial_metadata op, enter the channel
|
|
|
|
|
// combiner to start a pick.
|
|
|
|
|
// For batches containing a send_initial_metadata op, acquire the
|
|
|
|
|
// channel's data plane mutex to pick a subchannel.
|
|
|
|
|
if (GPR_LIKELY(batch->send_initial_metadata)) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner", |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: grabbing data plane mutex to perform pick", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
|
GRPC_CLOSURE_SCHED( |
|
|
|
|
GRPC_CLOSURE_INIT( |
|
|
|
|
&batch->handler_private.closure, StartPickLocked, elem, |
|
|
|
|
grpc_combiner_scheduler(chand->data_plane_combiner())), |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
PickSubchannel(elem, GRPC_ERROR_NONE); |
|
|
|
|
} else { |
|
|
|
|
// For all other batches, release the call combiner.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
@ -2544,8 +2504,8 @@ void CallData::DoRetry(grpc_call_element* elem, |
|
|
|
|
this, next_attempt_time - ExecCtx::Get()->Now()); |
|
|
|
|
} |
|
|
|
|
// Schedule retry after computed delay.
|
|
|
|
|
GRPC_CLOSURE_INIT(&pick_closure_, StartPickLocked, elem, |
|
|
|
|
grpc_combiner_scheduler(chand->data_plane_combiner())); |
|
|
|
|
GRPC_CLOSURE_INIT(&pick_closure_, PickSubchannel, elem, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_); |
|
|
|
|
// Update bookkeeping.
|
|
|
|
|
if (retry_state != nullptr) retry_state->retry_dispatched = true; |
|
|
|
@ -3660,6 +3620,11 @@ void CallData::CreateSubchannelCall(grpc_call_element* elem) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void CallData::AsyncPickDone(grpc_call_element* elem, grpc_error* error) { |
|
|
|
|
GRPC_CLOSURE_INIT(&pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx); |
|
|
|
|
GRPC_CLOSURE_SCHED(&pick_closure_, error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void CallData::PickDone(void* arg, grpc_error* error) { |
|
|
|
|
grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
|
|
|
|
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); |
|
|
|
@ -3682,10 +3647,9 @@ class CallData::QueuedPickCanceller { |
|
|
|
|
public: |
|
|
|
|
explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) { |
|
|
|
|
auto* calld = static_cast<CallData*>(elem->call_data); |
|
|
|
|
auto* chand = static_cast<ChannelData*>(elem->channel_data); |
|
|
|
|
GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller"); |
|
|
|
|
GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, |
|
|
|
|
grpc_combiner_scheduler(chand->data_plane_combiner())); |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
calld->call_combiner_->SetNotifyOnCancel(&closure_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -3694,6 +3658,7 @@ class CallData::QueuedPickCanceller { |
|
|
|
|
auto* self = static_cast<QueuedPickCanceller*>(arg); |
|
|
|
|
auto* chand = static_cast<ChannelData*>(self->elem_->channel_data); |
|
|
|
|
auto* calld = static_cast<CallData*>(self->elem_->call_data); |
|
|
|
|
MutexLock lock(chand->data_plane_mu()); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: cancelling queued pick: " |
|
|
|
@ -3818,23 +3783,38 @@ const char* PickResultTypeName( |
|
|
|
|
GPR_UNREACHABLE_CODE(return "UNKNOWN"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void CallData::StartPickLocked(void* arg, grpc_error* error) { |
|
|
|
|
void CallData::PickSubchannel(void* arg, grpc_error* error) { |
|
|
|
|
grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
|
|
|
|
CallData* calld = static_cast<CallData*>(elem->call_data); |
|
|
|
|
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); |
|
|
|
|
GPR_ASSERT(calld->connected_subchannel_ == nullptr); |
|
|
|
|
GPR_ASSERT(calld->subchannel_call_ == nullptr); |
|
|
|
|
// picker's being null means the channel is currently in IDLE state. The
|
|
|
|
|
// incoming call will make the channel exit IDLE and queue itself.
|
|
|
|
|
bool pick_complete; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(chand->data_plane_mu()); |
|
|
|
|
pick_complete = calld->PickSubchannelLocked(elem, &error); |
|
|
|
|
} |
|
|
|
|
if (pick_complete) { |
|
|
|
|
PickDone(elem, error); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool CallData::PickSubchannelLocked(grpc_call_element* elem, |
|
|
|
|
grpc_error** error) { |
|
|
|
|
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); |
|
|
|
|
GPR_ASSERT(connected_subchannel_ == nullptr); |
|
|
|
|
GPR_ASSERT(subchannel_call_ == nullptr); |
|
|
|
|
// The picker being null means that the channel is currently in IDLE state.
|
|
|
|
|
// The incoming call will make the channel exit IDLE.
|
|
|
|
|
if (chand->picker() == nullptr) { |
|
|
|
|
// We are currently in the data plane.
|
|
|
|
|
// Bounce into the control plane to exit IDLE.
|
|
|
|
|
chand->CheckConnectivityState(true); |
|
|
|
|
calld->AddCallToQueuedPicksLocked(elem); |
|
|
|
|
return; |
|
|
|
|
// Bounce into the control plane combiner to exit IDLE.
|
|
|
|
|
chand->CheckConnectivityState(/*try_to_connect=*/true); |
|
|
|
|
// Queue the pick, so that it will be attempted once the channel
|
|
|
|
|
// becomes connected.
|
|
|
|
|
AddCallToQueuedPicksLocked(elem); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
// Apply service config to call if needed.
|
|
|
|
|
calld->MaybeApplyServiceConfigToCallLocked(elem); |
|
|
|
|
MaybeApplyServiceConfigToCallLocked(elem); |
|
|
|
|
// If this is a retry, use the send_initial_metadata payload that
|
|
|
|
|
// we've cached; otherwise, use the pending batch. The
|
|
|
|
|
// send_initial_metadata batch will be the first pending batch in the
|
|
|
|
@ -3846,31 +3826,27 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { |
|
|
|
|
// subchannel's copy of the metadata batch (which is copied for each
|
|
|
|
|
// attempt) to the LB policy instead the one from the parent channel.
|
|
|
|
|
LoadBalancingPolicy::PickArgs pick_args; |
|
|
|
|
pick_args.call_state = &calld->lb_call_state_; |
|
|
|
|
pick_args.call_state = &lb_call_state_; |
|
|
|
|
Metadata initial_metadata( |
|
|
|
|
calld, |
|
|
|
|
calld->seen_send_initial_metadata_ |
|
|
|
|
? &calld->send_initial_metadata_ |
|
|
|
|
: calld->pending_batches_[0] |
|
|
|
|
this, |
|
|
|
|
seen_send_initial_metadata_ |
|
|
|
|
? &send_initial_metadata_ |
|
|
|
|
: pending_batches_[0] |
|
|
|
|
.batch->payload->send_initial_metadata.send_initial_metadata); |
|
|
|
|
pick_args.initial_metadata = &initial_metadata; |
|
|
|
|
// Grab initial metadata flags so that we can check later if the call has
|
|
|
|
|
// wait_for_ready enabled.
|
|
|
|
|
const uint32_t send_initial_metadata_flags = |
|
|
|
|
calld->seen_send_initial_metadata_ |
|
|
|
|
? calld->send_initial_metadata_flags_ |
|
|
|
|
: calld->pending_batches_[0] |
|
|
|
|
.batch->payload->send_initial_metadata |
|
|
|
|
.send_initial_metadata_flags; |
|
|
|
|
// When done, we schedule this closure to leave the data plane combiner.
|
|
|
|
|
GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
seen_send_initial_metadata_ ? send_initial_metadata_flags_ |
|
|
|
|
: pending_batches_[0] |
|
|
|
|
.batch->payload->send_initial_metadata |
|
|
|
|
.send_initial_metadata_flags; |
|
|
|
|
// Attempt pick.
|
|
|
|
|
auto result = chand->picker()->Pick(pick_args); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: LB pick returned %s (subchannel=%p, error=%s)", |
|
|
|
|
chand, calld, PickResultTypeName(result.type), |
|
|
|
|
chand, this, PickResultTypeName(result.type), |
|
|
|
|
result.subchannel.get(), grpc_error_string(result.error)); |
|
|
|
|
} |
|
|
|
|
switch (result.type) { |
|
|
|
@ -3879,10 +3855,9 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { |
|
|
|
|
grpc_error* disconnect_error = chand->disconnect_error(); |
|
|
|
|
if (disconnect_error != GRPC_ERROR_NONE) { |
|
|
|
|
GRPC_ERROR_UNREF(result.error); |
|
|
|
|
GRPC_CLOSURE_SCHED(&calld->pick_closure_, |
|
|
|
|
GRPC_ERROR_REF(disconnect_error)); |
|
|
|
|
if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem); |
|
|
|
|
break; |
|
|
|
|
if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem); |
|
|
|
|
*error = GRPC_ERROR_REF(disconnect_error); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
// If wait_for_ready is false, then the error indicates the RPC
|
|
|
|
|
// attempt's final status.
|
|
|
|
@ -3890,19 +3865,20 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { |
|
|
|
|
GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { |
|
|
|
|
// Retry if appropriate; otherwise, fail.
|
|
|
|
|
grpc_status_code status = GRPC_STATUS_OK; |
|
|
|
|
grpc_error_get_status(result.error, calld->deadline_, &status, nullptr, |
|
|
|
|
grpc_error_get_status(result.error, deadline_, &status, nullptr, |
|
|
|
|
nullptr, nullptr); |
|
|
|
|
if (!calld->enable_retries_ || |
|
|
|
|
!calld->MaybeRetry(elem, nullptr /* batch_data */, status, |
|
|
|
|
nullptr /* server_pushback_md */)) { |
|
|
|
|
const bool retried = enable_retries_ && |
|
|
|
|
MaybeRetry(elem, nullptr /* batch_data */, status, |
|
|
|
|
nullptr /* server_pushback_md */); |
|
|
|
|
if (!retried) { |
|
|
|
|
grpc_error* new_error = |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
|
|
|
|
"Failed to pick subchannel", &result.error, 1); |
|
|
|
|
GRPC_ERROR_UNREF(result.error); |
|
|
|
|
GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error); |
|
|
|
|
*error = new_error; |
|
|
|
|
} |
|
|
|
|
if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem); |
|
|
|
|
break; |
|
|
|
|
if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem); |
|
|
|
|
return !retried; |
|
|
|
|
} |
|
|
|
|
// If wait_for_ready is true, then queue to retry when we get a new
|
|
|
|
|
// picker.
|
|
|
|
@ -3910,26 +3886,26 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { |
|
|
|
|
} |
|
|
|
|
// Fallthrough
|
|
|
|
|
case LoadBalancingPolicy::PickResult::PICK_QUEUE: |
|
|
|
|
if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem); |
|
|
|
|
break; |
|
|
|
|
if (!pick_queued_) AddCallToQueuedPicksLocked(elem); |
|
|
|
|
return false; |
|
|
|
|
default: // PICK_COMPLETE
|
|
|
|
|
if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem); |
|
|
|
|
// Handle drops.
|
|
|
|
|
if (GPR_UNLIKELY(result.subchannel == nullptr)) { |
|
|
|
|
result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"Call dropped by load balancing policy"); |
|
|
|
|
} else { |
|
|
|
|
// Grab a ref to the connected subchannel while we're still
|
|
|
|
|
// holding the data plane combiner.
|
|
|
|
|
calld->connected_subchannel_ = |
|
|
|
|
// holding the data plane mutex.
|
|
|
|
|
connected_subchannel_ = |
|
|
|
|
chand->GetConnectedSubchannelInDataPlane(result.subchannel.get()); |
|
|
|
|
GPR_ASSERT(calld->connected_subchannel_ != nullptr); |
|
|
|
|
GPR_ASSERT(connected_subchannel_ != nullptr); |
|
|
|
|
} |
|
|
|
|
calld->lb_recv_trailing_metadata_ready_ = |
|
|
|
|
result.recv_trailing_metadata_ready; |
|
|
|
|
calld->lb_recv_trailing_metadata_ready_user_data_ = |
|
|
|
|
lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready; |
|
|
|
|
lb_recv_trailing_metadata_ready_user_data_ = |
|
|
|
|
result.recv_trailing_metadata_ready_user_data; |
|
|
|
|
GRPC_CLOSURE_SCHED(&calld->pick_closure_, result.error); |
|
|
|
|
if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem); |
|
|
|
|
*error = result.error; |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|