From 59555d5b0ca4e1a46d6f087cac5a94e004b5df47 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 13 Jun 2019 15:48:30 -0700 Subject: [PATCH] Add timeout argument to wait_for_termination --- src/python/grpcio/grpc/__init__.py | 17 ++++++++++---- src/python/grpcio/grpc/_server.py | 6 ++--- .../unit/_server_wait_for_termination_test.py | 23 +++++++++++++++++-- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index d5fe01e4c88..e9622b4d44c 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -1444,16 +1444,23 @@ class Server(six.with_metaclass(abc.ABCMeta)): """ raise NotImplementedError() - def wait_for_termination(self): + def wait_for_termination(self, timeout=None): """Block current thread until the server stops. This is an EXPERIMENTAL API. - The wait will not consume computational resources during blocking, and it - will block indefinitely. There are two ways to unblock: + The wait will not consume computational resources during blocking, and + it will block until one of the two following conditions are met: - 1) Calling `stop` on the server in another thread; - 2) The `__del__` of the server object is invoked. + 1) The server is stopped or terminated; + 2) A timeout occurs if timeout is not `None`. + + The timeout argument works in the same way as `threading.Event.wait()`. + https://docs.python.org/3/library/threading.html#threading.Event.wait + + Args: + timeout: A floating point number specifying a timeout for the + operation in seconds. """ raise NotImplementedError() diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index ee184adaf21..063b16b4791 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -958,16 +958,16 @@ class _Server(grpc.Server): def start(self): _start(self._state) - def wait_for_termination(self): + def wait_for_termination(self, timeout=None): termination_event = threading.Event() with self._state.lock: if self._state.stage is _ServerStage.STOPPED: - raise ValueError('Failed to wait for a stopped server.') + return else: self._state.shutdown_events.append(termination_event) - termination_event.wait() + termination_event.wait(timeout=timeout) def stop(self, grace): return _stop(self._state, grace) diff --git a/src/python/grpcio_tests/tests/unit/_server_wait_for_termination_test.py b/src/python/grpcio_tests/tests/unit/_server_wait_for_termination_test.py index 6a07dc7e3a4..c5c1f6da38e 100644 --- a/src/python/grpcio_tests/tests/unit/_server_wait_for_termination_test.py +++ b/src/python/grpcio_tests/tests/unit/_server_wait_for_termination_test.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import division + import datetime from concurrent import futures import unittest @@ -25,9 +27,9 @@ from tests.unit.framework.common import test_constants _WAIT_FOR_BLOCKING = datetime.timedelta(seconds=1) -def _block_on_waiting(server, termination_event): +def _block_on_waiting(server, termination_event, timeout=None): server.start() - server.wait_for_termination() + server.wait_for_termination(timeout=timeout) termination_event.set() @@ -68,6 +70,23 @@ class ServerWaitForTerminationTest(unittest.TestCase): termination_event.wait(timeout=test_constants.SHORT_TIMEOUT) self.assertTrue(termination_event.is_set()) + def test_unblock_by_timeout(self): + termination_event = threading.Event() + server = grpc.server(futures.ThreadPoolExecutor()) + + wait_thread = threading.Thread( + target=_block_on_waiting, + args=( + server, + termination_event, + test_constants.SHORT_TIMEOUT / 2, + )) + wait_thread.daemon = True + wait_thread.start() + + termination_event.wait(timeout=test_constants.SHORT_TIMEOUT) + self.assertTrue(termination_event.is_set()) + if __name__ == '__main__': unittest.main(verbosity=2)