Add timeout argument to wait_for_termination

pull/19299/head
Lidi Zheng 6 years ago
parent 8f6ee97345
commit 59555d5b0c
  1. 17
      src/python/grpcio/grpc/__init__.py
  2. 6
      src/python/grpcio/grpc/_server.py
  3. 23
      src/python/grpcio_tests/tests/unit/_server_wait_for_termination_test.py

@ -1444,16 +1444,23 @@ class Server(six.with_metaclass(abc.ABCMeta)):
"""
raise NotImplementedError()
def wait_for_termination(self):
def wait_for_termination(self, timeout=None):
"""Block current thread until the server stops.
This is an EXPERIMENTAL API.
The wait will not consume computational resources during blocking, and it
will block indefinitely. There are two ways to unblock:
The wait will not consume computational resources during blocking, and
it will block until one of the two following conditions are met:
1) Calling `stop` on the server in another thread;
2) The `__del__` of the server object is invoked.
1) The server is stopped or terminated;
2) A timeout occurs if timeout is not `None`.
The timeout argument works in the same way as `threading.Event.wait()`.
https://docs.python.org/3/library/threading.html#threading.Event.wait
Args:
timeout: A floating point number specifying a timeout for the
operation in seconds.
"""
raise NotImplementedError()

@ -958,16 +958,16 @@ class _Server(grpc.Server):
def start(self):
_start(self._state)
def wait_for_termination(self):
def wait_for_termination(self, timeout=None):
termination_event = threading.Event()
with self._state.lock:
if self._state.stage is _ServerStage.STOPPED:
raise ValueError('Failed to wait for a stopped server.')
return
else:
self._state.shutdown_events.append(termination_event)
termination_event.wait()
termination_event.wait(timeout=timeout)
def stop(self, grace):
return _stop(self._state, grace)

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import datetime
from concurrent import futures
import unittest
@ -25,9 +27,9 @@ from tests.unit.framework.common import test_constants
_WAIT_FOR_BLOCKING = datetime.timedelta(seconds=1)
def _block_on_waiting(server, termination_event):
def _block_on_waiting(server, termination_event, timeout=None):
server.start()
server.wait_for_termination()
server.wait_for_termination(timeout=timeout)
termination_event.set()
@ -68,6 +70,23 @@ class ServerWaitForTerminationTest(unittest.TestCase):
termination_event.wait(timeout=test_constants.SHORT_TIMEOUT)
self.assertTrue(termination_event.is_set())
def test_unblock_by_timeout(self):
termination_event = threading.Event()
server = grpc.server(futures.ThreadPoolExecutor())
wait_thread = threading.Thread(
target=_block_on_waiting,
args=(
server,
termination_event,
test_constants.SHORT_TIMEOUT / 2,
))
wait_thread.daemon = True
wait_thread.start()
termination_event.wait(timeout=test_constants.SHORT_TIMEOUT)
self.assertTrue(termination_event.is_set())
if __name__ == '__main__':
unittest.main(verbosity=2)

Loading…
Cancel
Save