Remove CleanupThread

This is no longer needed with the addition of a close() API that allows
clean shutdown.
pull/15229/head
kpayson64 7 years ago
parent 6dcb2f7333
commit a0bc0ac169
  1. 26
      src/python/grpcio/grpc/_channel.py
  2. 34
      src/python/grpcio/grpc/_common.py
  3. 10
      src/python/grpcio/grpc/_server.py
  4. 7
      src/python/grpcio/grpc/beta/_server_adaptations.py
  5. 1
      src/python/grpcio_tests/tests/tests.json
  6. 115
      src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py

@ -222,16 +222,8 @@ def _consume_request_iterator(request_iterator, state, call,
call.start_client_batch(operations, event_handler)
state.due.add(cygrpc.OperationType.send_close_from_client)
def stop_consumption_thread(timeout): # pylint: disable=unused-argument
with state.condition:
if state.code is None:
call.cancel()
state.cancelled = True
_abort(state, grpc.StatusCode.CANCELLED, 'Cancelled!')
state.condition.notify_all()
consumption_thread = _common.CleanupThread(
stop_consumption_thread, target=consume_request_iterator)
consumption_thread = threading.Thread(target=consume_request_iterator)
consumption_thread.daemon = True
consumption_thread.start()
@ -706,14 +698,8 @@ def _run_channel_spin_thread(state):
state.managed_calls = None
return
def stop_channel_spin(timeout): # pylint: disable=unused-argument
with state.lock:
if state.managed_calls is not None:
for call in state.managed_calls:
call.cancel()
channel_spin_thread = _common.CleanupThread(
stop_channel_spin, target=channel_spin)
channel_spin_thread = threading.Thread(target=channel_spin)
channel_spin_thread.daemon = True
channel_spin_thread.start()
@ -855,10 +841,10 @@ def _moot(state):
def _subscribe(state, callback, try_to_connect):
with state.lock:
if not state.callbacks_and_connectivities and not state.polling:
polling_thread = _common.CleanupThread(
lambda timeout: _moot(state),
polling_thread = threading.Thread(
target=_poll_connectivity,
args=(state, state.channel, bool(try_to_connect)))
polling_thread.daemon = True
polling_thread.start()
state.polling = True
state.callbacks_and_connectivities.append([callback, None])

@ -14,8 +14,6 @@
"""Shared implementation."""
import logging
import threading
import time
import six
@ -101,35 +99,3 @@ def deserialize(serialized_message, deserializer):
def fully_qualified_method(group, method):
return '/{}/{}'.format(group, method)
class CleanupThread(threading.Thread):
"""A threading.Thread subclass supporting custom behavior on join().
On Python Interpreter exit, Python will attempt to join outstanding threads
prior to garbage collection. We may need to do additional cleanup, and
we accomplish this by overriding the join() method.
"""
def __init__(self, behavior, *args, **kwargs):
"""Constructor.
Args:
behavior (function): Function called on join() with a single
argument, timeout, indicating the maximum duration of
`behavior`, or None indicating `behavior` has no deadline.
`behavior` must be idempotent.
args: Positional arguments passed to threading.Thread constructor.
kwargs: Keyword arguments passed to threading.Thread constructor.
"""
super(CleanupThread, self).__init__(*args, **kwargs)
self._behavior = behavior
def join(self, timeout=None):
start_time = time.time()
self._behavior(timeout)
end_time = time.time()
if timeout is not None:
timeout -= end_time - start_time
timeout = max(timeout, 0)
super(CleanupThread, self).join(timeout)

@ -780,14 +780,8 @@ def _start(state):
state.stage = _ServerStage.STARTED
_request_call(state)
def cleanup_server(timeout):
if timeout is None:
_stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait()
else:
_stop(state, timeout).wait()
thread = _common.CleanupThread(
cleanup_server, target=_serve, args=(state,))
thread = threading.Thread(target=_serve, args=(state,))
thread.daemon = True
thread.start()

@ -168,11 +168,8 @@ def _run_request_pipe_thread(request_iterator, request_consumer,
return
request_consumer.terminate()
def stop_request_pipe(timeout): # pylint: disable=unused-argument
thread_joined.set()
request_pipe_thread = _common.CleanupThread(
stop_request_pipe, target=pipe_requests)
request_pipe_thread = threading.Thread(target=pipe_requests)
request_pipe_thread.daemon = True
request_pipe_thread.start()

@ -52,7 +52,6 @@
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth",
"unit._thread_cleanup_test.CleanupThreadTest",
"unit.beta._beta_features_test.BetaFeaturesTest",
"unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
"unit.beta._connectivity_channel_test.ConnectivityStatesTest",

@ -1,115 +0,0 @@
# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for CleanupThread."""
import threading
import time
import unittest
from grpc import _common
_SHORT_TIME = 0.5
_LONG_TIME = 5.0
_EPSILON = 0.5
def cleanup(timeout):
if timeout is not None:
time.sleep(timeout)
else:
time.sleep(_LONG_TIME)
def slow_cleanup(timeout):
# Don't respect timeout
time.sleep(_LONG_TIME)
class CleanupThreadTest(unittest.TestCase):
def testTargetInvocation(self):
event = threading.Event()
def target(arg1, arg2, arg3=None):
self.assertEqual('arg1', arg1)
self.assertEqual('arg2', arg2)
self.assertEqual('arg3', arg3)
event.set()
cleanup_thread = _common.CleanupThread(
behavior=lambda x: None,
target=target,
name='test-name',
args=('arg1', 'arg2'),
kwargs={
'arg3': 'arg3'
})
cleanup_thread.start()
cleanup_thread.join()
self.assertEqual(cleanup_thread.name, 'test-name')
self.assertTrue(event.is_set())
def testJoinNoTimeout(self):
cleanup_thread = _common.CleanupThread(behavior=cleanup)
cleanup_thread.start()
start_time = time.time()
cleanup_thread.join()
end_time = time.time()
self.assertAlmostEqual(
_LONG_TIME, end_time - start_time, delta=_EPSILON)
def testJoinTimeout(self):
cleanup_thread = _common.CleanupThread(behavior=cleanup)
cleanup_thread.start()
start_time = time.time()
cleanup_thread.join(_SHORT_TIME)
end_time = time.time()
self.assertAlmostEqual(
_SHORT_TIME, end_time - start_time, delta=_EPSILON)
def testJoinTimeoutSlowBehavior(self):
cleanup_thread = _common.CleanupThread(behavior=slow_cleanup)
cleanup_thread.start()
start_time = time.time()
cleanup_thread.join(_SHORT_TIME)
end_time = time.time()
self.assertAlmostEqual(
_LONG_TIME, end_time - start_time, delta=_EPSILON)
def testJoinTimeoutSlowTarget(self):
event = threading.Event()
def target():
event.wait(_LONG_TIME)
cleanup_thread = _common.CleanupThread(behavior=cleanup, target=target)
cleanup_thread.start()
start_time = time.time()
cleanup_thread.join(_SHORT_TIME)
end_time = time.time()
self.assertAlmostEqual(
_SHORT_TIME, end_time - start_time, delta=_EPSILON)
event.set()
def testJoinZeroTimeout(self):
cleanup_thread = _common.CleanupThread(behavior=cleanup)
cleanup_thread.start()
start_time = time.time()
cleanup_thread.join(0)
end_time = time.time()
self.assertAlmostEqual(0, end_time - start_time, delta=_EPSILON)
if __name__ == '__main__':
unittest.main(verbosity=2)
Loading…
Cancel
Save