@ -688,10 +688,93 @@ grpc_connectivity_state ClientChannel::CheckConnectivityState(
return state ;
}
void ClientChannel : : WatchConnectivityState ( grpc_connectivity_state , Timestamp ,
grpc_completion_queue * , void * ) {
// TODO(ctiller): implement
Crash ( " not implemented " ) ;
namespace {
// A fire-and-forget object to handle external connectivity state watches.
class ExternalStateWatcher : public RefCounted < ExternalStateWatcher > {
public :
ExternalStateWatcher ( WeakRefCountedPtr < ClientChannel > channel ,
grpc_completion_queue * cq , void * tag ,
grpc_connectivity_state last_observed_state ,
Timestamp deadline )
: channel_ ( std : : move ( channel ) ) , cq_ ( cq ) , tag_ ( tag ) {
MutexLock lock ( & mu_ ) ;
// Start watch. This inherits the ref from creation.
auto watcher =
MakeOrphanable < Watcher > ( RefCountedPtr < ExternalStateWatcher > ( this ) ) ;
watcher_ = watcher . get ( ) ;
channel_ - > AddConnectivityWatcher ( last_observed_state , std : : move ( watcher ) ) ;
// Start timer. This takes a second ref.
const Duration timeout = deadline - Timestamp : : Now ( ) ;
timer_handle_ =
channel_ - > event_engine ( ) - > RunAfter ( timeout , [ self = Ref ( ) ] ( ) mutable {
ApplicationCallbackExecCtx callback_exec_ctx ;
ExecCtx exec_ctx ;
self - > MaybeStartCompletion ( absl : : DeadlineExceededError (
" Timed out waiting for connection state change " ) ) ;
// ExternalStateWatcher deletion might require an active ExecCtx.
self . reset ( ) ;
} ) ;
}
private :
class Watcher : public AsyncConnectivityStateWatcherInterface {
public :
explicit Watcher ( RefCountedPtr < ExternalStateWatcher > external_state_watcher )
: external_state_watcher_ ( std : : move ( external_state_watcher ) ) { }
void OnConnectivityStateChange ( grpc_connectivity_state /*new_state*/ ,
const absl : : Status & /*status*/ ) override {
external_state_watcher_ - > MaybeStartCompletion ( absl : : OkStatus ( ) ) ;
}
private :
RefCountedPtr < ExternalStateWatcher > external_state_watcher_ ;
} ;
// This is called both when the watch reports a new connectivity state
// and when the timer fires. It will trigger a CQ notification only
// on the first call. Subsequent calls will be ignored, because
// events can come in asynchronously.
void MaybeStartCompletion ( absl : : Status status ) {
MutexLock lock ( & mu_ ) ;
if ( watcher_ = = nullptr ) return ; // Ignore subsequent notifications.
// Cancel watch.
channel_ - > RemoveConnectivityWatcher ( watcher_ ) ;
watcher_ = nullptr ;
// Cancel timer.
channel_ - > event_engine ( ) - > Cancel ( timer_handle_ ) ;
// Send CQ completion.
Ref ( ) . release ( ) ; // Released in FinishedCompletion().
grpc_cq_end_op ( cq_ , tag_ , status , FinishedCompletion , this ,
& completion_storage_ ) ;
}
// Called when the completion is returned to the CQ.
static void FinishedCompletion ( void * arg , grpc_cq_completion * /*ignored*/ ) {
auto * self = static_cast < ExternalStateWatcher * > ( arg ) ;
self - > Unref ( ) ;
}
WeakRefCountedPtr < ClientChannel > channel_ ;
Mutex mu_ ;
grpc_completion_queue * cq_ ABSL_GUARDED_BY ( & mu_ ) ;
void * tag_ ABSL_GUARDED_BY ( & mu_ ) ;
grpc_cq_completion completion_storage_ ABSL_GUARDED_BY ( & mu_ ) ;
Watcher * watcher_ ABSL_GUARDED_BY ( & mu_ ) = nullptr ;
grpc_event_engine : : experimental : : EventEngine : : TaskHandle timer_handle_
ABSL_GUARDED_BY ( & mu_ ) ;
} ;
} // namespace
void ClientChannel : : WatchConnectivityState ( grpc_connectivity_state state ,
Timestamp deadline ,
grpc_completion_queue * cq ,
void * tag ) {
new ExternalStateWatcher ( WeakRefAsSubclass < ClientChannel > ( ) , cq , tag , state ,
deadline ) ;
}
void ClientChannel : : AddConnectivityWatcher (