From a0bc0ac169ecae34b5a7a43cb5e0fdfc9662d22a Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Mon, 30 Apr 2018 09:06:31 -0700 Subject: [PATCH] Remove CleanupThread This is no longer needed with the addition of a close() API that allows clean shutdown. --- src/python/grpcio/grpc/_channel.py | 26 +--- src/python/grpcio/grpc/_common.py | 34 ------ src/python/grpcio/grpc/_server.py | 10 +- .../grpcio/grpc/beta/_server_adaptations.py | 7 +- src/python/grpcio_tests/tests/tests.json | 1 - .../tests/unit/_thread_cleanup_test.py | 115 ------------------ 6 files changed, 10 insertions(+), 183 deletions(-) delete mode 100644 src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 2eff08aa573..1e4a99d908f 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -222,16 +222,8 @@ def _consume_request_iterator(request_iterator, state, call, call.start_client_batch(operations, event_handler) state.due.add(cygrpc.OperationType.send_close_from_client) - def stop_consumption_thread(timeout): # pylint: disable=unused-argument - with state.condition: - if state.code is None: - call.cancel() - state.cancelled = True - _abort(state, grpc.StatusCode.CANCELLED, 'Cancelled!') - state.condition.notify_all() - - consumption_thread = _common.CleanupThread( - stop_consumption_thread, target=consume_request_iterator) + consumption_thread = threading.Thread(target=consume_request_iterator) + consumption_thread.daemon = True consumption_thread.start() @@ -706,14 +698,8 @@ def _run_channel_spin_thread(state): state.managed_calls = None return - def stop_channel_spin(timeout): # pylint: disable=unused-argument - with state.lock: - if state.managed_calls is not None: - for call in state.managed_calls: - call.cancel() - - channel_spin_thread = _common.CleanupThread( - stop_channel_spin, target=channel_spin) + channel_spin_thread = threading.Thread(target=channel_spin) + channel_spin_thread.daemon = True channel_spin_thread.start() @@ -855,10 +841,10 @@ def _moot(state): def _subscribe(state, callback, try_to_connect): with state.lock: if not state.callbacks_and_connectivities and not state.polling: - polling_thread = _common.CleanupThread( - lambda timeout: _moot(state), + polling_thread = threading.Thread( target=_poll_connectivity, args=(state, state.channel, bool(try_to_connect))) + polling_thread.daemon = True polling_thread.start() state.polling = True state.callbacks_and_connectivities.append([callback, None]) diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index bbb69ad4893..862987a0cd1 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -14,8 +14,6 @@ """Shared implementation.""" import logging -import threading -import time import six @@ -101,35 +99,3 @@ def deserialize(serialized_message, deserializer): def fully_qualified_method(group, method): return '/{}/{}'.format(group, method) - - -class CleanupThread(threading.Thread): - """A threading.Thread subclass supporting custom behavior on join(). - - On Python Interpreter exit, Python will attempt to join outstanding threads - prior to garbage collection. We may need to do additional cleanup, and - we accomplish this by overriding the join() method. - """ - - def __init__(self, behavior, *args, **kwargs): - """Constructor. - - Args: - behavior (function): Function called on join() with a single - argument, timeout, indicating the maximum duration of - `behavior`, or None indicating `behavior` has no deadline. - `behavior` must be idempotent. - args: Positional arguments passed to threading.Thread constructor. - kwargs: Keyword arguments passed to threading.Thread constructor. - """ - super(CleanupThread, self).__init__(*args, **kwargs) - self._behavior = behavior - - def join(self, timeout=None): - start_time = time.time() - self._behavior(timeout) - end_time = time.time() - if timeout is not None: - timeout -= end_time - start_time - timeout = max(timeout, 0) - super(CleanupThread, self).join(timeout) diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index c988e0c87ce..d849cadbeea 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -780,14 +780,8 @@ def _start(state): state.stage = _ServerStage.STARTED _request_call(state) - def cleanup_server(timeout): - if timeout is None: - _stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait() - else: - _stop(state, timeout).wait() - - thread = _common.CleanupThread( - cleanup_server, target=_serve, args=(state,)) + thread = threading.Thread(target=_serve, args=(state,)) + thread.daemon = True thread.start() diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py index 3c04fd76396..ccafec8951d 100644 --- a/src/python/grpcio/grpc/beta/_server_adaptations.py +++ b/src/python/grpcio/grpc/beta/_server_adaptations.py @@ -168,11 +168,8 @@ def _run_request_pipe_thread(request_iterator, request_consumer, return request_consumer.terminate() - def stop_request_pipe(timeout): # pylint: disable=unused-argument - thread_joined.set() - - request_pipe_thread = _common.CleanupThread( - stop_request_pipe, target=pipe_requests) + request_pipe_thread = threading.Thread(target=pipe_requests) + request_pipe_thread.daemon = True request_pipe_thread.start() diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index d38ee517f09..724427adb69 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -52,7 +52,6 @@ "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth", - "unit._thread_cleanup_test.CleanupThreadTest", "unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest", "unit.beta._connectivity_channel_test.ConnectivityStatesTest", diff --git a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py deleted file mode 100644 index 18f5af058a6..00000000000 --- a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py +++ /dev/null @@ -1,115 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for CleanupThread.""" - -import threading -import time -import unittest - -from grpc import _common - -_SHORT_TIME = 0.5 -_LONG_TIME = 5.0 -_EPSILON = 0.5 - - -def cleanup(timeout): - if timeout is not None: - time.sleep(timeout) - else: - time.sleep(_LONG_TIME) - - -def slow_cleanup(timeout): - # Don't respect timeout - time.sleep(_LONG_TIME) - - -class CleanupThreadTest(unittest.TestCase): - - def testTargetInvocation(self): - event = threading.Event() - - def target(arg1, arg2, arg3=None): - self.assertEqual('arg1', arg1) - self.assertEqual('arg2', arg2) - self.assertEqual('arg3', arg3) - event.set() - - cleanup_thread = _common.CleanupThread( - behavior=lambda x: None, - target=target, - name='test-name', - args=('arg1', 'arg2'), - kwargs={ - 'arg3': 'arg3' - }) - cleanup_thread.start() - cleanup_thread.join() - self.assertEqual(cleanup_thread.name, 'test-name') - self.assertTrue(event.is_set()) - - def testJoinNoTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join() - end_time = time.time() - self.assertAlmostEqual( - _LONG_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _SHORT_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeoutSlowBehavior(self): - cleanup_thread = _common.CleanupThread(behavior=slow_cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _LONG_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeoutSlowTarget(self): - event = threading.Event() - - def target(): - event.wait(_LONG_TIME) - - cleanup_thread = _common.CleanupThread(behavior=cleanup, target=target) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _SHORT_TIME, end_time - start_time, delta=_EPSILON) - event.set() - - def testJoinZeroTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(0) - end_time = time.time() - self.assertAlmostEqual(0, end_time - start_time, delta=_EPSILON) - - -if __name__ == '__main__': - unittest.main(verbosity=2)