|
|
|
@ -417,7 +417,7 @@ class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ReportResult(Resolver::Result result) override |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
chand_->OnResolverResultChangedLocked(std::move(result)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -499,7 +499,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
void WatchConnectivityState( |
|
|
|
|
grpc_connectivity_state initial_state, |
|
|
|
|
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
auto& watcher_wrapper = watcher_map_[watcher.get()]; |
|
|
|
|
GPR_ASSERT(watcher_wrapper == nullptr); |
|
|
|
|
watcher_wrapper = new WatcherWrapper(std::move(watcher), |
|
|
|
@ -512,7 +512,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface* watcher) |
|
|
|
|
override ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
auto it = watcher_map_.find(watcher); |
|
|
|
|
GPR_ASSERT(it != watcher_map_.end()); |
|
|
|
|
subchannel_->CancelConnectivityStateWatch(health_check_service_name_, |
|
|
|
@ -565,10 +565,10 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
~WatcherWrapper() override { |
|
|
|
|
auto* parent = parent_.release(); // ref owned by lambda
|
|
|
|
|
parent->chand_->work_serializer_->Run( |
|
|
|
|
[parent]() |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) { |
|
|
|
|
parent->Unref(DEBUG_LOCATION, "WatcherWrapper"); |
|
|
|
|
}, |
|
|
|
|
[parent]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( |
|
|
|
|
*parent_->chand_->work_serializer_) { |
|
|
|
|
parent->Unref(DEBUG_LOCATION, "WatcherWrapper"); |
|
|
|
|
}, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -581,11 +581,11 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
} |
|
|
|
|
Ref().release(); // ref owned by lambda
|
|
|
|
|
parent_->chand_->work_serializer_->Run( |
|
|
|
|
[this]() |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) { |
|
|
|
|
ApplyUpdateInControlPlaneWorkSerializer(); |
|
|
|
|
Unref(); |
|
|
|
|
}, |
|
|
|
|
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( |
|
|
|
|
*parent_->chand_->work_serializer_) { |
|
|
|
|
ApplyUpdateInControlPlaneWorkSerializer(); |
|
|
|
|
Unref(); |
|
|
|
|
}, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -607,7 +607,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
void ApplyUpdateInControlPlaneWorkSerializer() |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) { |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*parent_->chand_->work_serializer_) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p: processing connectivity change in work serializer " |
|
|
|
@ -667,7 +667,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
// CancelConnectivityStateWatch() with its watcher, we know the
|
|
|
|
|
// corresponding WrapperWatcher to cancel on the underlying subchannel.
|
|
|
|
|
std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_ |
|
|
|
|
ABSL_GUARDED_BY(&ClientChannel::work_serializer_); |
|
|
|
|
ABSL_GUARDED_BY(*chand_->work_serializer_); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -697,7 +697,7 @@ ClientChannel::ExternalConnectivityWatcher::ExternalConnectivityWatcher( |
|
|
|
|
} |
|
|
|
|
// Pass the ref from creating the object to Start().
|
|
|
|
|
chand_->work_serializer_->Run( |
|
|
|
|
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
// The ref is passed to AddWatcherLocked().
|
|
|
|
|
AddWatcherLocked(); |
|
|
|
|
}, |
|
|
|
@ -747,7 +747,7 @@ void ClientChannel::ExternalConnectivityWatcher::Notify( |
|
|
|
|
// automatically remove all watchers in that case.
|
|
|
|
|
if (state != GRPC_CHANNEL_SHUTDOWN) { |
|
|
|
|
chand_->work_serializer_->Run( |
|
|
|
|
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
RemoveWatcherLocked(); |
|
|
|
|
}, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
@ -763,7 +763,7 @@ void ClientChannel::ExternalConnectivityWatcher::Cancel() { |
|
|
|
|
ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED); |
|
|
|
|
// Hop back into the work_serializer to clean up.
|
|
|
|
|
chand_->work_serializer_->Run( |
|
|
|
|
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
RemoveWatcherLocked(); |
|
|
|
|
}, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
@ -794,7 +794,7 @@ class ClientChannel::ConnectivityWatcherAdder { |
|
|
|
|
watcher_(std::move(watcher)) { |
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder"); |
|
|
|
|
chand_->work_serializer_->Run( |
|
|
|
|
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
AddWatcherLocked(); |
|
|
|
|
}, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
@ -802,7 +802,7 @@ class ClientChannel::ConnectivityWatcherAdder { |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
void AddWatcherLocked() |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_)); |
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder"); |
|
|
|
|
delete this; |
|
|
|
@ -824,7 +824,7 @@ class ClientChannel::ConnectivityWatcherRemover { |
|
|
|
|
: chand_(chand), watcher_(watcher) { |
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover"); |
|
|
|
|
chand_->work_serializer_->Run( |
|
|
|
|
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
RemoveWatcherLocked(); |
|
|
|
|
}, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
@ -832,7 +832,7 @@ class ClientChannel::ConnectivityWatcherRemover { |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
void RemoveWatcherLocked() |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
chand_->state_tracker_.RemoveWatcher(watcher_); |
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, |
|
|
|
|
"ConnectivityWatcherRemover"); |
|
|
|
@ -861,7 +861,7 @@ class ClientChannel::ClientChannelControlHelper |
|
|
|
|
|
|
|
|
|
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
|
|
|
|
ServerAddress address, const grpc_channel_args& args) override |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
|
|
|
|
|
// Determine health check service name.
|
|
|
|
|
absl::optional<std::string> health_check_service_name; |
|
|
|
@ -932,7 +932,7 @@ class ClientChannel::ClientChannelControlHelper |
|
|
|
|
void UpdateState( |
|
|
|
|
grpc_connectivity_state state, const absl::Status& status, |
|
|
|
|
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) override |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
if (chand_->resolver_ == nullptr) return; // Shutting down.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
const char* extra = chand_->disconnect_error_ == GRPC_ERROR_NONE |
|
|
|
@ -950,7 +950,7 @@ class ClientChannel::ClientChannelControlHelper |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RequestReresolution() override |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
if (chand_->resolver_ == nullptr) return; // Shutting down.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_); |
|
|
|
@ -963,7 +963,7 @@ class ClientChannel::ClientChannelControlHelper |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void AddTraceEvent(TraceSeverity severity, absl::string_view message) override |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) { |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { |
|
|
|
|
if (chand_->resolver_ == nullptr) return; // Shutting down.
|
|
|
|
|
if (chand_->channelz_node_ != nullptr) { |
|
|
|
|
chand_->channelz_node_->AddTraceEvent( |
|
|
|
@ -1663,7 +1663,7 @@ grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) { |
|
|
|
|
&result, |
|
|
|
|
// Complete pick.
|
|
|
|
|
[op](LoadBalancingPolicy::PickResult::Complete* complete_pick) |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::work_serializer_) { |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*ClientChannel::work_serializer_) { |
|
|
|
|
SubchannelWrapper* subchannel = static_cast<SubchannelWrapper*>( |
|
|
|
|
complete_pick->subchannel.get()); |
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel = |
|
|
|
@ -1755,7 +1755,7 @@ void ClientChannel::StartTransportOp(grpc_channel_element* elem, |
|
|
|
|
// Pop into control plane work_serializer for remaining ops.
|
|
|
|
|
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op"); |
|
|
|
|
chand->work_serializer_->Run( |
|
|
|
|
[chand, op]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand->work_serializer_) { |
|
|
|
|
[chand, op]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) { |
|
|
|
|
chand->StartTransportOpLocked(op); |
|
|
|
|
}, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
@ -1817,7 +1817,7 @@ grpc_connectivity_state ClientChannel::CheckConnectivityState( |
|
|
|
|
if (out == GRPC_CHANNEL_IDLE && try_to_connect) { |
|
|
|
|
GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect"); |
|
|
|
|
work_serializer_->Run([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( |
|
|
|
|
work_serializer_) { TryToConnectLocked(); }, |
|
|
|
|
*work_serializer_) { TryToConnectLocked(); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
return out; |
|
|
|
@ -2323,7 +2323,7 @@ bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem, |
|
|
|
|
auto* chand = static_cast<ClientChannel*>(arg); |
|
|
|
|
chand->work_serializer_->Run( |
|
|
|
|
[chand]() |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand->work_serializer_) { |
|
|
|
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) { |
|
|
|
|
chand->CheckConnectivityState(/*try_to_connect=*/true); |
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, |
|
|
|
|
"CheckResolutionLocked"); |
|
|
|
|