Merge pull request #15229 from kpayson64/remove_cleanup_thread

Remove CleanupThread
pull/15268/head
kpayson64 7 years ago committed by GitHub
commit 84d44071b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      src/python/grpcio/grpc/_channel.py
  2. 34
      src/python/grpcio/grpc/_common.py
  3. 10
      src/python/grpcio/grpc/_server.py
  4. 7
      src/python/grpcio/grpc/beta/_server_adaptations.py
  5. 1
      src/python/grpcio_tests/tests/tests.json
  6. 115
      src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py

@ -209,19 +209,8 @@ def _consume_request_iterator(request_iterator, state, call, request_serializer,
if operating: if operating:
state.due.add(cygrpc.OperationType.send_close_from_client) state.due.add(cygrpc.OperationType.send_close_from_client)
def stop_consumption_thread(timeout): # pylint: disable=unused-argument consumption_thread = threading.Thread(target=consume_request_iterator)
with state.condition: consumption_thread.daemon = True
if state.code is None:
code = grpc.StatusCode.CANCELLED
details = 'Consumption thread cleaned up!'
call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
details)
state.cancelled = True
_abort(state, code, details)
state.condition.notify_all()
consumption_thread = _common.CleanupThread(
stop_consumption_thread, target=consume_request_iterator)
consumption_thread.start() consumption_thread.start()
@ -671,13 +660,8 @@ def _run_channel_spin_thread(state):
if state.managed_calls == 0: if state.managed_calls == 0:
return return
def stop_channel_spin(timeout): # pylint: disable=unused-argument channel_spin_thread = threading.Thread(target=channel_spin)
with state.lock: channel_spin_thread.daemon = True
state.channel.close(cygrpc.StatusCode.cancelled,
'Channel spin thread cleaned up!')
channel_spin_thread = _common.CleanupThread(
stop_channel_spin, target=channel_spin)
channel_spin_thread.start() channel_spin_thread.start()
@ -820,10 +804,10 @@ def _moot(state):
def _subscribe(state, callback, try_to_connect): def _subscribe(state, callback, try_to_connect):
with state.lock: with state.lock:
if not state.callbacks_and_connectivities and not state.polling: if not state.callbacks_and_connectivities and not state.polling:
polling_thread = _common.CleanupThread( polling_thread = threading.Thread(
lambda timeout: _moot(state),
target=_poll_connectivity, target=_poll_connectivity,
args=(state, state.channel, bool(try_to_connect))) args=(state, state.channel, bool(try_to_connect)))
polling_thread.daemon = True
polling_thread.start() polling_thread.start()
state.polling = True state.polling = True
state.callbacks_and_connectivities.append([callback, None]) state.callbacks_and_connectivities.append([callback, None])

@ -14,8 +14,6 @@
"""Shared implementation.""" """Shared implementation."""
import logging import logging
import threading
import time
import six import six
@ -101,35 +99,3 @@ def deserialize(serialized_message, deserializer):
def fully_qualified_method(group, method): def fully_qualified_method(group, method):
return '/{}/{}'.format(group, method) return '/{}/{}'.format(group, method)
class CleanupThread(threading.Thread):
"""A threading.Thread subclass supporting custom behavior on join().
On Python Interpreter exit, Python will attempt to join outstanding threads
prior to garbage collection. We may need to do additional cleanup, and
we accomplish this by overriding the join() method.
"""
def __init__(self, behavior, *args, **kwargs):
"""Constructor.
Args:
behavior (function): Function called on join() with a single
argument, timeout, indicating the maximum duration of
`behavior`, or None indicating `behavior` has no deadline.
`behavior` must be idempotent.
args: Positional arguments passed to threading.Thread constructor.
kwargs: Keyword arguments passed to threading.Thread constructor.
"""
super(CleanupThread, self).__init__(*args, **kwargs)
self._behavior = behavior
def join(self, timeout=None):
start_time = time.time()
self._behavior(timeout)
end_time = time.time()
if timeout is not None:
timeout -= end_time - start_time
timeout = max(timeout, 0)
super(CleanupThread, self).join(timeout)

@ -780,14 +780,8 @@ def _start(state):
state.stage = _ServerStage.STARTED state.stage = _ServerStage.STARTED
_request_call(state) _request_call(state)
def cleanup_server(timeout): thread = threading.Thread(target=_serve, args=(state,))
if timeout is None: thread.daemon = True
_stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait()
else:
_stop(state, timeout).wait()
thread = _common.CleanupThread(
cleanup_server, target=_serve, args=(state,))
thread.start() thread.start()

@ -168,11 +168,8 @@ def _run_request_pipe_thread(request_iterator, request_consumer,
return return
request_consumer.terminate() request_consumer.terminate()
def stop_request_pipe(timeout): # pylint: disable=unused-argument request_pipe_thread = threading.Thread(target=pipe_requests)
thread_joined.set() request_pipe_thread.daemon = True
request_pipe_thread = _common.CleanupThread(
stop_request_pipe, target=pipe_requests)
request_pipe_thread.start() request_pipe_thread.start()

@ -53,7 +53,6 @@
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth",
"unit._thread_cleanup_test.CleanupThreadTest",
"unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.BetaFeaturesTest",
"unit.beta._beta_features_test.ContextManagementAndLifecycleTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
"unit.beta._connectivity_channel_test.ConnectivityStatesTest", "unit.beta._connectivity_channel_test.ConnectivityStatesTest",

@ -1,115 +0,0 @@
# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for CleanupThread."""
import threading
import time
import unittest
from grpc import _common
_SHORT_TIME = 0.5
_LONG_TIME = 5.0
_EPSILON = 0.5
def cleanup(timeout):
if timeout is not None:
time.sleep(timeout)
else:
time.sleep(_LONG_TIME)
def slow_cleanup(timeout):
# Don't respect timeout
time.sleep(_LONG_TIME)
class CleanupThreadTest(unittest.TestCase):
def testTargetInvocation(self):
event = threading.Event()
def target(arg1, arg2, arg3=None):
self.assertEqual('arg1', arg1)
self.assertEqual('arg2', arg2)
self.assertEqual('arg3', arg3)
event.set()
cleanup_thread = _common.CleanupThread(
behavior=lambda x: None,
target=target,
name='test-name',
args=('arg1', 'arg2'),
kwargs={
'arg3': 'arg3'
})
cleanup_thread.start()
cleanup_thread.join()
self.assertEqual(cleanup_thread.name, 'test-name')
self.assertTrue(event.is_set())
def testJoinNoTimeout(self):
cleanup_thread = _common.CleanupThread(behavior=cleanup)
cleanup_thread.start()
start_time = time.time()
cleanup_thread.join()
end_time = time.time()
self.assertAlmostEqual(
_LONG_TIME, end_time - start_time, delta=_EPSILON)
def testJoinTimeout(self):
cleanup_thread = _common.CleanupThread(behavior=cleanup)
cleanup_thread.start()
start_time = time.time()
cleanup_thread.join(_SHORT_TIME)
end_time = time.time()
self.assertAlmostEqual(
_SHORT_TIME, end_time - start_time, delta=_EPSILON)
def testJoinTimeoutSlowBehavior(self):
cleanup_thread = _common.CleanupThread(behavior=slow_cleanup)
cleanup_thread.start()
start_time = time.time()
cleanup_thread.join(_SHORT_TIME)
end_time = time.time()
self.assertAlmostEqual(
_LONG_TIME, end_time - start_time, delta=_EPSILON)
def testJoinTimeoutSlowTarget(self):
event = threading.Event()
def target():
event.wait(_LONG_TIME)
cleanup_thread = _common.CleanupThread(behavior=cleanup, target=target)
cleanup_thread.start()
start_time = time.time()
cleanup_thread.join(_SHORT_TIME)
end_time = time.time()
self.assertAlmostEqual(
_SHORT_TIME, end_time - start_time, delta=_EPSILON)
event.set()
def testJoinZeroTimeout(self):
cleanup_thread = _common.CleanupThread(behavior=cleanup)
cleanup_thread.start()
start_time = time.time()
cleanup_thread.join(0)
end_time = time.time()
self.assertAlmostEqual(0, end_time - start_time, delta=_EPSILON)
if __name__ == '__main__':
unittest.main(verbosity=2)
Loading…
Cancel
Save