|
|
@ -147,9 +147,6 @@ class ChannelData { |
|
|
|
return service_config_; |
|
|
|
return service_config_; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane( |
|
|
|
|
|
|
|
SubchannelInterface* subchannel) const; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grpc_connectivity_state CheckConnectivityState(bool try_to_connect); |
|
|
|
grpc_connectivity_state CheckConnectivityState(bool try_to_connect); |
|
|
|
void AddExternalConnectivityWatcher(grpc_polling_entity pollent, |
|
|
|
void AddExternalConnectivityWatcher(grpc_polling_entity pollent, |
|
|
|
grpc_connectivity_state* state, |
|
|
|
grpc_connectivity_state* state, |
|
|
@ -164,9 +161,9 @@ class ChannelData { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private: |
|
|
|
private: |
|
|
|
class SubchannelWrapper; |
|
|
|
|
|
|
|
class ConnectivityStateAndPickerSetter; |
|
|
|
class ConnectivityStateAndPickerSetter; |
|
|
|
class ServiceConfigSetter; |
|
|
|
class ServiceConfigSetter; |
|
|
|
|
|
|
|
class GrpcSubchannel; |
|
|
|
class ClientChannelControlHelper; |
|
|
|
class ClientChannelControlHelper; |
|
|
|
|
|
|
|
|
|
|
|
class ExternalConnectivityWatcher { |
|
|
|
class ExternalConnectivityWatcher { |
|
|
@ -265,14 +262,7 @@ class ChannelData { |
|
|
|
UniquePtr<char> health_check_service_name_; |
|
|
|
UniquePtr<char> health_check_service_name_; |
|
|
|
RefCountedPtr<ServiceConfig> saved_service_config_; |
|
|
|
RefCountedPtr<ServiceConfig> saved_service_config_; |
|
|
|
bool received_first_resolver_result_ = false; |
|
|
|
bool received_first_resolver_result_ = false; |
|
|
|
// The number of SubchannelWrapper instances referencing a given Subchannel.
|
|
|
|
|
|
|
|
Map<Subchannel*, int> subchannel_refcount_map_; |
|
|
|
Map<Subchannel*, int> subchannel_refcount_map_; |
|
|
|
// Pending ConnectedSubchannel updates for each SubchannelWrapper.
|
|
|
|
|
|
|
|
// Updates are queued here in the control plane combiner and then applied
|
|
|
|
|
|
|
|
// in the data plane combiner when the picker is updated.
|
|
|
|
|
|
|
|
Map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>, |
|
|
|
|
|
|
|
RefCountedPtrLess<SubchannelWrapper>> |
|
|
|
|
|
|
|
pending_subchannel_updates_; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
//
|
|
|
|
// Fields accessed from both data plane and control plane combiners.
|
|
|
|
// Fields accessed from both data plane and control plane combiners.
|
|
|
@ -716,247 +706,6 @@ class CallData { |
|
|
|
grpc_metadata_batch send_trailing_metadata_; |
|
|
|
grpc_metadata_batch send_trailing_metadata_; |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// ChannelData::SubchannelWrapper
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// This class is a wrapper for Subchannel that hides details of the
|
|
|
|
|
|
|
|
// channel's implementation (such as the health check service name and
|
|
|
|
|
|
|
|
// connected subchannel) from the LB policy API.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// Note that no synchronization is needed here, because even if the
|
|
|
|
|
|
|
|
// underlying subchannel is shared between channels, this wrapper will only
|
|
|
|
|
|
|
|
// be used within one channel, so it will always be synchronized by the
|
|
|
|
|
|
|
|
// control plane combiner.
|
|
|
|
|
|
|
|
class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
|
|
|
public: |
|
|
|
|
|
|
|
SubchannelWrapper(ChannelData* chand, Subchannel* subchannel, |
|
|
|
|
|
|
|
UniquePtr<char> health_check_service_name) |
|
|
|
|
|
|
|
: SubchannelInterface(&grpc_client_channel_routing_trace), |
|
|
|
|
|
|
|
chand_(chand), |
|
|
|
|
|
|
|
subchannel_(subchannel), |
|
|
|
|
|
|
|
health_check_service_name_(std::move(health_check_service_name)) { |
|
|
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
|
|
|
"chand=%p: creating subchannel wrapper %p for subchannel %p", |
|
|
|
|
|
|
|
chand, this, subchannel_); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper"); |
|
|
|
|
|
|
|
auto* subchannel_node = subchannel_->channelz_node(); |
|
|
|
|
|
|
|
if (subchannel_node != nullptr) { |
|
|
|
|
|
|
|
intptr_t subchannel_uuid = subchannel_node->uuid(); |
|
|
|
|
|
|
|
auto it = chand_->subchannel_refcount_map_.find(subchannel_); |
|
|
|
|
|
|
|
if (it == chand_->subchannel_refcount_map_.end()) { |
|
|
|
|
|
|
|
chand_->channelz_node_->AddChildSubchannel(subchannel_uuid); |
|
|
|
|
|
|
|
it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
++it->second; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
~SubchannelWrapper() { |
|
|
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
|
|
|
"chand=%p: destroying subchannel wrapper %p for subchannel %p", |
|
|
|
|
|
|
|
chand_, this, subchannel_); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
auto* subchannel_node = subchannel_->channelz_node(); |
|
|
|
|
|
|
|
if (subchannel_node != nullptr) { |
|
|
|
|
|
|
|
intptr_t subchannel_uuid = subchannel_node->uuid(); |
|
|
|
|
|
|
|
auto it = chand_->subchannel_refcount_map_.find(subchannel_); |
|
|
|
|
|
|
|
GPR_ASSERT(it != chand_->subchannel_refcount_map_.end()); |
|
|
|
|
|
|
|
--it->second; |
|
|
|
|
|
|
|
if (it->second == 0) { |
|
|
|
|
|
|
|
chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid); |
|
|
|
|
|
|
|
chand_->subchannel_refcount_map_.erase(it); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB"); |
|
|
|
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grpc_connectivity_state CheckConnectivityState() override { |
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel; |
|
|
|
|
|
|
|
grpc_connectivity_state connectivity_state = |
|
|
|
|
|
|
|
subchannel_->CheckConnectivityState(health_check_service_name_.get(), |
|
|
|
|
|
|
|
&connected_subchannel); |
|
|
|
|
|
|
|
MaybeUpdateConnectedSubchannel(std::move(connected_subchannel)); |
|
|
|
|
|
|
|
return connectivity_state; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void WatchConnectivityState( |
|
|
|
|
|
|
|
grpc_connectivity_state initial_state, |
|
|
|
|
|
|
|
UniquePtr<ConnectivityStateWatcherInterface> watcher) override { |
|
|
|
|
|
|
|
auto& watcher_wrapper = watcher_map_[watcher.get()]; |
|
|
|
|
|
|
|
GPR_ASSERT(watcher_wrapper == nullptr); |
|
|
|
|
|
|
|
watcher_wrapper = New<WatcherWrapper>( |
|
|
|
|
|
|
|
std::move(watcher), Ref(DEBUG_LOCATION, "WatcherWrapper")); |
|
|
|
|
|
|
|
subchannel_->WatchConnectivityState( |
|
|
|
|
|
|
|
initial_state, |
|
|
|
|
|
|
|
UniquePtr<char>(gpr_strdup(health_check_service_name_.get())), |
|
|
|
|
|
|
|
OrphanablePtr<Subchannel::ConnectivityStateWatcherInterface>( |
|
|
|
|
|
|
|
watcher_wrapper)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void CancelConnectivityStateWatch( |
|
|
|
|
|
|
|
ConnectivityStateWatcherInterface* watcher) override { |
|
|
|
|
|
|
|
auto it = watcher_map_.find(watcher); |
|
|
|
|
|
|
|
GPR_ASSERT(it != watcher_map_.end()); |
|
|
|
|
|
|
|
subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(), |
|
|
|
|
|
|
|
it->second); |
|
|
|
|
|
|
|
watcher_map_.erase(it); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void AttemptToConnect() override { subchannel_->AttemptToConnect(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ResetBackoff() override { subchannel_->ResetBackoff(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const grpc_channel_args* channel_args() override { |
|
|
|
|
|
|
|
return subchannel_->channel_args(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Caller must be holding the control-plane combiner.
|
|
|
|
|
|
|
|
ConnectedSubchannel* connected_subchannel() const { |
|
|
|
|
|
|
|
return connected_subchannel_.get(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Caller must be holding the data-plane combiner.
|
|
|
|
|
|
|
|
ConnectedSubchannel* connected_subchannel_in_data_plane() const { |
|
|
|
|
|
|
|
return connected_subchannel_in_data_plane_.get(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
void set_connected_subchannel_in_data_plane( |
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel) { |
|
|
|
|
|
|
|
connected_subchannel_in_data_plane_ = std::move(connected_subchannel); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
|
|
|
// Subchannel and SubchannelInterface have different interfaces for
|
|
|
|
|
|
|
|
// their respective ConnectivityStateWatcherInterface classes.
|
|
|
|
|
|
|
|
// The one in Subchannel updates the ConnectedSubchannel along with
|
|
|
|
|
|
|
|
// the state, whereas the one in SubchannelInterface does not expose
|
|
|
|
|
|
|
|
// the ConnectedSubchannel.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// This wrapper provides a bridge between the two. It implements
|
|
|
|
|
|
|
|
// Subchannel::ConnectivityStateWatcherInterface and wraps
|
|
|
|
|
|
|
|
// the instance of SubchannelInterface::ConnectivityStateWatcherInterface
|
|
|
|
|
|
|
|
// that was passed in by the LB policy. We pass an instance of this
|
|
|
|
|
|
|
|
// class to the underlying Subchannel, and when we get updates from
|
|
|
|
|
|
|
|
// the subchannel, we pass those on to the wrapped watcher to return
|
|
|
|
|
|
|
|
// the update to the LB policy. This allows us to set the connected
|
|
|
|
|
|
|
|
// subchannel before passing the result back to the LB policy.
|
|
|
|
|
|
|
|
class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface { |
|
|
|
|
|
|
|
public: |
|
|
|
|
|
|
|
WatcherWrapper( |
|
|
|
|
|
|
|
UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface> |
|
|
|
|
|
|
|
watcher, |
|
|
|
|
|
|
|
RefCountedPtr<SubchannelWrapper> parent) |
|
|
|
|
|
|
|
: watcher_(std::move(watcher)), parent_(std::move(parent)) {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
~WatcherWrapper() { parent_.reset(DEBUG_LOCATION, "WatcherWrapper"); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void Orphan() override { Unref(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void OnConnectivityStateChange( |
|
|
|
|
|
|
|
grpc_connectivity_state new_state, |
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel) override { |
|
|
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
|
|
|
"chand=%p: connectivity change for subchannel wrapper %p " |
|
|
|
|
|
|
|
"subchannel %p (connected_subchannel=%p state=%s); " |
|
|
|
|
|
|
|
"hopping into combiner", |
|
|
|
|
|
|
|
parent_->chand_, parent_.get(), parent_->subchannel_, |
|
|
|
|
|
|
|
connected_subchannel.get(), |
|
|
|
|
|
|
|
grpc_connectivity_state_name(new_state)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// Will delete itself.
|
|
|
|
|
|
|
|
New<Updater>(Ref(), new_state, std::move(connected_subchannel)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grpc_pollset_set* interested_parties() override { |
|
|
|
|
|
|
|
return watcher_->interested_parties(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
|
|
|
class Updater { |
|
|
|
|
|
|
|
public: |
|
|
|
|
|
|
|
Updater(RefCountedPtr<WatcherWrapper> parent, |
|
|
|
|
|
|
|
grpc_connectivity_state new_state, |
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel) |
|
|
|
|
|
|
|
: parent_(std::move(parent)), |
|
|
|
|
|
|
|
state_(new_state), |
|
|
|
|
|
|
|
connected_subchannel_(std::move(connected_subchannel)) { |
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT( |
|
|
|
|
|
|
|
&closure_, ApplyUpdateInControlPlaneCombiner, this, |
|
|
|
|
|
|
|
grpc_combiner_scheduler(parent_->parent_->chand_->combiner_)); |
|
|
|
|
|
|
|
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
|
|
|
static void ApplyUpdateInControlPlaneCombiner(void* arg, |
|
|
|
|
|
|
|
grpc_error* error) { |
|
|
|
|
|
|
|
Updater* self = static_cast<Updater*>(arg); |
|
|
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
|
|
|
"chand=%p: processing connectivity change in combiner " |
|
|
|
|
|
|
|
"for subchannel wrapper %p subchannel %p " |
|
|
|
|
|
|
|
"(connected_subchannel=%p state=%s)", |
|
|
|
|
|
|
|
self->parent_->parent_->chand_, self->parent_->parent_.get(), |
|
|
|
|
|
|
|
self->parent_->parent_->subchannel_, |
|
|
|
|
|
|
|
self->connected_subchannel_.get(), |
|
|
|
|
|
|
|
grpc_connectivity_state_name(self->state_)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
self->parent_->parent_->MaybeUpdateConnectedSubchannel( |
|
|
|
|
|
|
|
std::move(self->connected_subchannel_)); |
|
|
|
|
|
|
|
self->parent_->watcher_->OnConnectivityStateChange(self->state_); |
|
|
|
|
|
|
|
Delete(self); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
RefCountedPtr<WatcherWrapper> parent_; |
|
|
|
|
|
|
|
grpc_connectivity_state state_; |
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel_; |
|
|
|
|
|
|
|
grpc_closure closure_; |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher_; |
|
|
|
|
|
|
|
RefCountedPtr<SubchannelWrapper> parent_; |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void MaybeUpdateConnectedSubchannel( |
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel) { |
|
|
|
|
|
|
|
// Update the connected subchannel only if the channel is not shutting
|
|
|
|
|
|
|
|
// down. This is because once the channel is shutting down, we
|
|
|
|
|
|
|
|
// ignore picker updates from the LB policy, which means that
|
|
|
|
|
|
|
|
// ConnectivityStateAndPickerSetter will never process the entries
|
|
|
|
|
|
|
|
// in chand_->pending_subchannel_updates_. So we don't want to add
|
|
|
|
|
|
|
|
// entries there that will never be processed, since that would
|
|
|
|
|
|
|
|
// leave dangling refs to the channel and prevent its destruction.
|
|
|
|
|
|
|
|
grpc_error* disconnect_error = chand_->disconnect_error(); |
|
|
|
|
|
|
|
if (disconnect_error != GRPC_ERROR_NONE) return; |
|
|
|
|
|
|
|
// Not shutting down, so do the update.
|
|
|
|
|
|
|
|
if (connected_subchannel_ != connected_subchannel) { |
|
|
|
|
|
|
|
connected_subchannel_ = std::move(connected_subchannel); |
|
|
|
|
|
|
|
// Record the new connected subchannel so that it can be updated
|
|
|
|
|
|
|
|
// in the data plane combiner the next time the picker is updated.
|
|
|
|
|
|
|
|
chand_->pending_subchannel_updates_[Ref( |
|
|
|
|
|
|
|
DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ChannelData* chand_; |
|
|
|
|
|
|
|
Subchannel* subchannel_; |
|
|
|
|
|
|
|
UniquePtr<char> health_check_service_name_; |
|
|
|
|
|
|
|
// Maps from the address of the watcher passed to us by the LB policy
|
|
|
|
|
|
|
|
// to the address of the WrapperWatcher that we passed to the underlying
|
|
|
|
|
|
|
|
// subchannel. This is needed so that when the LB policy calls
|
|
|
|
|
|
|
|
// CancelConnectivityStateWatch() with its watcher, we know the
|
|
|
|
|
|
|
|
// corresponding WrapperWatcher to cancel on the underlying subchannel.
|
|
|
|
|
|
|
|
Map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_; |
|
|
|
|
|
|
|
// To be accessed only in the control plane combiner.
|
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel_; |
|
|
|
|
|
|
|
// To be accessed only in the data plane combiner.
|
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_; |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
//
|
|
|
|
// ChannelData::ConnectivityStateAndPickerSetter
|
|
|
|
// ChannelData::ConnectivityStateAndPickerSetter
|
|
|
|
//
|
|
|
|
//
|
|
|
@ -980,13 +729,10 @@ class ChannelData::ConnectivityStateAndPickerSetter { |
|
|
|
grpc_slice_from_static_string( |
|
|
|
grpc_slice_from_static_string( |
|
|
|
GetChannelConnectivityStateChangeString(state))); |
|
|
|
GetChannelConnectivityStateChangeString(state))); |
|
|
|
} |
|
|
|
} |
|
|
|
// Grab any pending subchannel updates.
|
|
|
|
|
|
|
|
pending_subchannel_updates_ = |
|
|
|
|
|
|
|
std::move(chand_->pending_subchannel_updates_); |
|
|
|
|
|
|
|
// Bounce into the data plane combiner to reset the picker.
|
|
|
|
// Bounce into the data plane combiner to reset the picker.
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, |
|
|
|
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, |
|
|
|
"ConnectivityStateAndPickerSetter"); |
|
|
|
"ConnectivityStateAndPickerSetter"); |
|
|
|
GRPC_CLOSURE_INIT(&closure_, SetPickerInDataPlane, this, |
|
|
|
GRPC_CLOSURE_INIT(&closure_, SetPicker, this, |
|
|
|
grpc_combiner_scheduler(chand->data_plane_combiner_)); |
|
|
|
grpc_combiner_scheduler(chand->data_plane_combiner_)); |
|
|
|
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); |
|
|
|
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); |
|
|
|
} |
|
|
|
} |
|
|
@ -1009,38 +755,16 @@ class ChannelData::ConnectivityStateAndPickerSetter { |
|
|
|
GPR_UNREACHABLE_CODE(return "UNKNOWN"); |
|
|
|
GPR_UNREACHABLE_CODE(return "UNKNOWN"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void SetPickerInDataPlane(void* arg, grpc_error* ignored) { |
|
|
|
static void SetPicker(void* arg, grpc_error* ignored) { |
|
|
|
auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg); |
|
|
|
auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg); |
|
|
|
// Handle subchannel updates.
|
|
|
|
// Update picker.
|
|
|
|
for (auto& p : self->pending_subchannel_updates_) { |
|
|
|
self->chand_->picker_ = std::move(self->picker_); |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
|
|
|
"chand=%p: updating subchannel wrapper %p data plane " |
|
|
|
|
|
|
|
"connected_subchannel to %p", |
|
|
|
|
|
|
|
self->chand_, p.first.get(), p.second.get()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
p.first->set_connected_subchannel_in_data_plane(std::move(p.second)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// Swap out the picker. We hang on to the old picker so that it can
|
|
|
|
|
|
|
|
// be deleted in the control-plane combiner, since that's where we need
|
|
|
|
|
|
|
|
// to unref the subchannel wrappers that are reffed by the picker.
|
|
|
|
|
|
|
|
self->picker_.swap(self->chand_->picker_); |
|
|
|
|
|
|
|
// Re-process queued picks.
|
|
|
|
// Re-process queued picks.
|
|
|
|
for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr; |
|
|
|
for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr; |
|
|
|
pick = pick->next) { |
|
|
|
pick = pick->next) { |
|
|
|
CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE); |
|
|
|
CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE); |
|
|
|
} |
|
|
|
} |
|
|
|
// Pop back into the control plane combiner to delete ourself, so
|
|
|
|
// Clean up.
|
|
|
|
// that we make sure to unref subchannel wrappers there. This
|
|
|
|
|
|
|
|
// includes both the ones reffed by the old picker (now stored in
|
|
|
|
|
|
|
|
// self->picker_) and the ones in self->pending_subchannel_updates_.
|
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&self->closure_, CleanUpInControlPlane, self, |
|
|
|
|
|
|
|
grpc_combiner_scheduler(self->chand_->combiner_)); |
|
|
|
|
|
|
|
GRPC_CLOSURE_SCHED(&self->closure_, GRPC_ERROR_NONE); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void CleanUpInControlPlane(void* arg, grpc_error* ignored) { |
|
|
|
|
|
|
|
auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg); |
|
|
|
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, |
|
|
|
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, |
|
|
|
"ConnectivityStateAndPickerSetter"); |
|
|
|
"ConnectivityStateAndPickerSetter"); |
|
|
|
Delete(self); |
|
|
|
Delete(self); |
|
|
@ -1048,9 +772,6 @@ class ChannelData::ConnectivityStateAndPickerSetter { |
|
|
|
|
|
|
|
|
|
|
|
ChannelData* chand_; |
|
|
|
ChannelData* chand_; |
|
|
|
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_; |
|
|
|
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_; |
|
|
|
Map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>, |
|
|
|
|
|
|
|
RefCountedPtrLess<SubchannelWrapper>> |
|
|
|
|
|
|
|
pending_subchannel_updates_; |
|
|
|
|
|
|
|
grpc_closure closure_; |
|
|
|
grpc_closure closure_; |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
@ -1225,6 +946,89 @@ void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked( |
|
|
|
&self->chand_->state_tracker_, self->state_, &self->my_closure_); |
|
|
|
&self->chand_->state_tracker_, self->state_, &self->my_closure_); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// ChannelData::GrpcSubchannel
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// This class is a wrapper for Subchannel that hides details of the
|
|
|
|
|
|
|
|
// channel's implementation (such as the health check service name) from
|
|
|
|
|
|
|
|
// the LB policy API.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// Note that no synchronization is needed here, because even if the
|
|
|
|
|
|
|
|
// underlying subchannel is shared between channels, this wrapper will only
|
|
|
|
|
|
|
|
// be used within one channel, so it will always be synchronized by the
|
|
|
|
|
|
|
|
// control plane combiner.
|
|
|
|
|
|
|
|
class ChannelData::GrpcSubchannel : public SubchannelInterface { |
|
|
|
|
|
|
|
public: |
|
|
|
|
|
|
|
GrpcSubchannel(ChannelData* chand, Subchannel* subchannel, |
|
|
|
|
|
|
|
UniquePtr<char> health_check_service_name) |
|
|
|
|
|
|
|
: chand_(chand), |
|
|
|
|
|
|
|
subchannel_(subchannel), |
|
|
|
|
|
|
|
health_check_service_name_(std::move(health_check_service_name)) { |
|
|
|
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "GrpcSubchannel"); |
|
|
|
|
|
|
|
auto* subchannel_node = subchannel_->channelz_node(); |
|
|
|
|
|
|
|
if (subchannel_node != nullptr) { |
|
|
|
|
|
|
|
intptr_t subchannel_uuid = subchannel_node->uuid(); |
|
|
|
|
|
|
|
auto it = chand_->subchannel_refcount_map_.find(subchannel_); |
|
|
|
|
|
|
|
if (it == chand_->subchannel_refcount_map_.end()) { |
|
|
|
|
|
|
|
chand_->channelz_node_->AddChildSubchannel(subchannel_uuid); |
|
|
|
|
|
|
|
it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
++it->second; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
~GrpcSubchannel() { |
|
|
|
|
|
|
|
auto* subchannel_node = subchannel_->channelz_node(); |
|
|
|
|
|
|
|
if (subchannel_node != nullptr) { |
|
|
|
|
|
|
|
intptr_t subchannel_uuid = subchannel_node->uuid(); |
|
|
|
|
|
|
|
auto it = chand_->subchannel_refcount_map_.find(subchannel_); |
|
|
|
|
|
|
|
GPR_ASSERT(it != chand_->subchannel_refcount_map_.end()); |
|
|
|
|
|
|
|
--it->second; |
|
|
|
|
|
|
|
if (it->second == 0) { |
|
|
|
|
|
|
|
chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid); |
|
|
|
|
|
|
|
chand_->subchannel_refcount_map_.erase(it); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB"); |
|
|
|
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "GrpcSubchannel"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grpc_connectivity_state CheckConnectivityState( |
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel) |
|
|
|
|
|
|
|
override { |
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> tmp; |
|
|
|
|
|
|
|
auto retval = subchannel_->CheckConnectivityState( |
|
|
|
|
|
|
|
health_check_service_name_.get(), &tmp); |
|
|
|
|
|
|
|
*connected_subchannel = std::move(tmp); |
|
|
|
|
|
|
|
return retval; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void WatchConnectivityState( |
|
|
|
|
|
|
|
grpc_connectivity_state initial_state, |
|
|
|
|
|
|
|
UniquePtr<ConnectivityStateWatcher> watcher) override { |
|
|
|
|
|
|
|
subchannel_->WatchConnectivityState( |
|
|
|
|
|
|
|
initial_state, |
|
|
|
|
|
|
|
UniquePtr<char>(gpr_strdup(health_check_service_name_.get())), |
|
|
|
|
|
|
|
std::move(watcher)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void CancelConnectivityStateWatch( |
|
|
|
|
|
|
|
ConnectivityStateWatcher* watcher) override { |
|
|
|
|
|
|
|
subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(), |
|
|
|
|
|
|
|
watcher); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void AttemptToConnect() override { subchannel_->AttemptToConnect(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ResetBackoff() override { subchannel_->ResetBackoff(); } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
|
|
|
ChannelData* chand_; |
|
|
|
|
|
|
|
Subchannel* subchannel_; |
|
|
|
|
|
|
|
UniquePtr<char> health_check_service_name_; |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
//
|
|
|
|
// ChannelData::ClientChannelControlHelper
|
|
|
|
// ChannelData::ClientChannelControlHelper
|
|
|
|
//
|
|
|
|
//
|
|
|
@ -1262,8 +1066,8 @@ class ChannelData::ClientChannelControlHelper |
|
|
|
chand_->client_channel_factory_->CreateSubchannel(new_args); |
|
|
|
chand_->client_channel_factory_->CreateSubchannel(new_args); |
|
|
|
grpc_channel_args_destroy(new_args); |
|
|
|
grpc_channel_args_destroy(new_args); |
|
|
|
if (subchannel == nullptr) return nullptr; |
|
|
|
if (subchannel == nullptr) return nullptr; |
|
|
|
return MakeRefCounted<SubchannelWrapper>( |
|
|
|
return MakeRefCounted<GrpcSubchannel>(chand_, subchannel, |
|
|
|
chand_, subchannel, std::move(health_check_service_name)); |
|
|
|
std::move(health_check_service_name)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
grpc_channel* CreateChannel(const char* target, |
|
|
|
grpc_channel* CreateChannel(const char* target, |
|
|
@ -1274,7 +1078,8 @@ class ChannelData::ClientChannelControlHelper |
|
|
|
void UpdateState( |
|
|
|
void UpdateState( |
|
|
|
grpc_connectivity_state state, |
|
|
|
grpc_connectivity_state state, |
|
|
|
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override { |
|
|
|
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override { |
|
|
|
grpc_error* disconnect_error = chand_->disconnect_error(); |
|
|
|
grpc_error* disconnect_error = |
|
|
|
|
|
|
|
chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE); |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
const char* extra = disconnect_error == GRPC_ERROR_NONE |
|
|
|
const char* extra = disconnect_error == GRPC_ERROR_NONE |
|
|
|
? "" |
|
|
|
? "" |
|
|
@ -1646,13 +1451,9 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { |
|
|
|
} |
|
|
|
} |
|
|
|
LoadBalancingPolicy::PickResult result = |
|
|
|
LoadBalancingPolicy::PickResult result = |
|
|
|
picker_->Pick(LoadBalancingPolicy::PickArgs()); |
|
|
|
picker_->Pick(LoadBalancingPolicy::PickArgs()); |
|
|
|
ConnectedSubchannel* connected_subchannel = nullptr; |
|
|
|
if (result.connected_subchannel != nullptr) { |
|
|
|
if (result.subchannel != nullptr) { |
|
|
|
ConnectedSubchannel* connected_subchannel = |
|
|
|
SubchannelWrapper* subchannel = |
|
|
|
static_cast<ConnectedSubchannel*>(result.connected_subchannel.get()); |
|
|
|
static_cast<SubchannelWrapper*>(result.subchannel.get()); |
|
|
|
|
|
|
|
connected_subchannel = subchannel->connected_subchannel(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (connected_subchannel != nullptr) { |
|
|
|
|
|
|
|
connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack); |
|
|
|
connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
if (result.error == GRPC_ERROR_NONE) { |
|
|
|
if (result.error == GRPC_ERROR_NONE) { |
|
|
@ -1695,10 +1496,6 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) { |
|
|
|
} |
|
|
|
} |
|
|
|
// Disconnect.
|
|
|
|
// Disconnect.
|
|
|
|
if (op->disconnect_with_error != GRPC_ERROR_NONE) { |
|
|
|
if (op->disconnect_with_error != GRPC_ERROR_NONE) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: channel shut down from API: %s", chand, |
|
|
|
|
|
|
|
grpc_error_string(op->disconnect_with_error)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
grpc_error* error = GRPC_ERROR_NONE; |
|
|
|
grpc_error* error = GRPC_ERROR_NONE; |
|
|
|
GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong( |
|
|
|
GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong( |
|
|
|
&error, op->disconnect_with_error, MemoryOrder::ACQ_REL, |
|
|
|
&error, op->disconnect_with_error, MemoryOrder::ACQ_REL, |
|
|
@ -1773,17 +1570,6 @@ void ChannelData::RemoveQueuedPick(QueuedPick* to_remove, |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
RefCountedPtr<ConnectedSubchannel> |
|
|
|
|
|
|
|
ChannelData::GetConnectedSubchannelInDataPlane( |
|
|
|
|
|
|
|
SubchannelInterface* subchannel) const { |
|
|
|
|
|
|
|
SubchannelWrapper* subchannel_wrapper = |
|
|
|
|
|
|
|
static_cast<SubchannelWrapper*>(subchannel); |
|
|
|
|
|
|
|
ConnectedSubchannel* connected_subchannel = |
|
|
|
|
|
|
|
subchannel_wrapper->connected_subchannel_in_data_plane(); |
|
|
|
|
|
|
|
if (connected_subchannel == nullptr) return nullptr; |
|
|
|
|
|
|
|
return connected_subchannel->Ref(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) { |
|
|
|
void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) { |
|
|
|
auto* chand = static_cast<ChannelData*>(arg); |
|
|
|
auto* chand = static_cast<ChannelData*>(arg); |
|
|
|
if (chand->resolving_lb_policy_ != nullptr) { |
|
|
|
if (chand->resolving_lb_policy_ != nullptr) { |
|
|
@ -3711,9 +3497,10 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { |
|
|
|
auto result = chand->picker()->Pick(pick_args); |
|
|
|
auto result = chand->picker()->Pick(pick_args); |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"chand=%p calld=%p: LB pick returned %s (subchannel=%p, error=%s)", |
|
|
|
"chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, " |
|
|
|
|
|
|
|
"error=%s)", |
|
|
|
chand, calld, PickResultTypeName(result.type), |
|
|
|
chand, calld, PickResultTypeName(result.type), |
|
|
|
result.subchannel.get(), grpc_error_string(result.error)); |
|
|
|
result.connected_subchannel.get(), grpc_error_string(result.error)); |
|
|
|
} |
|
|
|
} |
|
|
|
switch (result.type) { |
|
|
|
switch (result.type) { |
|
|
|
case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: { |
|
|
|
case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: { |
|
|
@ -3755,16 +3542,11 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { |
|
|
|
break; |
|
|
|
break; |
|
|
|
default: // PICK_COMPLETE
|
|
|
|
default: // PICK_COMPLETE
|
|
|
|
// Handle drops.
|
|
|
|
// Handle drops.
|
|
|
|
if (GPR_UNLIKELY(result.subchannel == nullptr)) { |
|
|
|
if (GPR_UNLIKELY(result.connected_subchannel == nullptr)) { |
|
|
|
result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
"Call dropped by load balancing policy"); |
|
|
|
"Call dropped by load balancing policy"); |
|
|
|
} else { |
|
|
|
|
|
|
|
// Grab a ref to the connected subchannel while we're still
|
|
|
|
|
|
|
|
// holding the data plane combiner.
|
|
|
|
|
|
|
|
calld->connected_subchannel_ = |
|
|
|
|
|
|
|
chand->GetConnectedSubchannelInDataPlane(result.subchannel.get()); |
|
|
|
|
|
|
|
GPR_ASSERT(calld->connected_subchannel_ != nullptr); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
calld->connected_subchannel_ = std::move(result.connected_subchannel); |
|
|
|
calld->lb_recv_trailing_metadata_ready_ = |
|
|
|
calld->lb_recv_trailing_metadata_ready_ = |
|
|
|
result.recv_trailing_metadata_ready; |
|
|
|
result.recv_trailing_metadata_ready; |
|
|
|
calld->lb_recv_trailing_metadata_ready_user_data_ = |
|
|
|
calld->lb_recv_trailing_metadata_ready_user_data_ = |
|
|
|