|
|
@ -966,11 +966,6 @@ def _poll_connectivity(state, channel, initial_try_to_connect): |
|
|
|
_spawn_delivery(state, callbacks) |
|
|
|
_spawn_delivery(state, callbacks) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _moot(state): |
|
|
|
|
|
|
|
with state.lock: |
|
|
|
|
|
|
|
del state.callbacks_and_connectivities[:] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _subscribe(state, callback, try_to_connect): |
|
|
|
def _subscribe(state, callback, try_to_connect): |
|
|
|
with state.lock: |
|
|
|
with state.lock: |
|
|
|
if not state.callbacks_and_connectivities and not state.polling: |
|
|
|
if not state.callbacks_and_connectivities and not state.polling: |
|
|
@ -1000,11 +995,6 @@ def _unsubscribe(state, callback): |
|
|
|
break |
|
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _unsubscribe_all(state): |
|
|
|
|
|
|
|
with state.lock: |
|
|
|
|
|
|
|
del state.callbacks_and_connectivities[:] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _augment_options(base_options, compression): |
|
|
|
def _augment_options(base_options, compression): |
|
|
|
compression_option = _compression.create_channel_option(compression) |
|
|
|
compression_option = _compression.create_channel_option(compression) |
|
|
|
return tuple(base_options) + compression_option + (( |
|
|
|
return tuple(base_options) + compression_option + (( |
|
|
@ -1071,16 +1061,21 @@ class Channel(grpc.Channel): |
|
|
|
self._channel, _channel_managed_call_management(self._call_state), |
|
|
|
self._channel, _channel_managed_call_management(self._call_state), |
|
|
|
_common.encode(method), request_serializer, response_deserializer) |
|
|
|
_common.encode(method), request_serializer, response_deserializer) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _unsubscribe_all(self): |
|
|
|
|
|
|
|
state = self._connectivity_state |
|
|
|
|
|
|
|
if state: |
|
|
|
|
|
|
|
with state.lock: |
|
|
|
|
|
|
|
del state.callbacks_and_connectivities[:] |
|
|
|
|
|
|
|
|
|
|
|
def _close(self): |
|
|
|
def _close(self): |
|
|
|
_unsubscribe_all(self._connectivity_state) |
|
|
|
self._unsubscribe_all() |
|
|
|
self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!') |
|
|
|
self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!') |
|
|
|
cygrpc.fork_unregister_channel(self) |
|
|
|
cygrpc.fork_unregister_channel(self) |
|
|
|
_moot(self._connectivity_state) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _close_on_fork(self): |
|
|
|
def _close_on_fork(self): |
|
|
|
|
|
|
|
self._unsubscribe_all() |
|
|
|
self._channel.close_on_fork(cygrpc.StatusCode.cancelled, |
|
|
|
self._channel.close_on_fork(cygrpc.StatusCode.cancelled, |
|
|
|
'Channel closed due to fork') |
|
|
|
'Channel closed due to fork') |
|
|
|
_moot(self._connectivity_state) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __enter__(self): |
|
|
|
def __enter__(self): |
|
|
|
return self |
|
|
|
return self |
|
|
@ -1102,7 +1097,9 @@ class Channel(grpc.Channel): |
|
|
|
# for as long as they are in use and to close them after using them, |
|
|
|
# for as long as they are in use and to close them after using them, |
|
|
|
# then deletion of this grpc._channel.Channel instance can be made to |
|
|
|
# then deletion of this grpc._channel.Channel instance can be made to |
|
|
|
# effect closure of the underlying cygrpc.Channel instance. |
|
|
|
# effect closure of the underlying cygrpc.Channel instance. |
|
|
|
# This prevent the failed-at-initializing object removal from failing. |
|
|
|
try: |
|
|
|
# Though the __init__ failed, the removal will still trigger __del__. |
|
|
|
self._unsubscribe_all() |
|
|
|
if _moot is not None and hasattr(self, '_connectivity_state'): |
|
|
|
except: |
|
|
|
_moot(self._connectivity_state) |
|
|
|
# Exceptions in __del__ are ignored by Python anyway, but they can |
|
|
|
|
|
|
|
# keep spamming logs. Just silence them. |
|
|
|
|
|
|
|
pass |
|
|
|