Merge pull request #6724 from nathanielmanistaatgoogle/paused

Add paused method to PauseFailControl
pull/6667/head
Jan Tattermusch 9 years ago
commit f7d95c39ee
  1. 26
      src/python/grpcio/tests/unit/framework/common/test_control.py

@ -60,10 +60,16 @@ class Control(six.with_metaclass(abc.ABCMeta)):
class PauseFailControl(Control):
"""A Control that can be used to pause or fail code under control."""
"""A Control that can be used to pause or fail code under control.
This object is only safe for use from two threads: one of the system under
test calling control and the other from the test system calling pause,
block_until_paused, and fail.
"""
def __init__(self):
self._condition = threading.Condition()
self._pause = False
self._paused = False
self._fail = False
@ -72,19 +78,31 @@ class PauseFailControl(Control):
if self._fail:
raise Defect()
while self._paused:
while self._pause:
self._paused = True
self._condition.notify_all()
self._condition.wait()
self._paused = False
@contextlib.contextmanager
def pause(self):
"""Pauses code under control while controlling code is in context."""
with self._condition:
self._paused = True
self._pause = True
yield
with self._condition:
self._paused = False
self._pause = False
self._condition.notify_all()
def block_until_paused(self):
"""Blocks controlling code until code under control is paused.
May only be called within the context of a pause call.
"""
with self._condition:
while not self._paused:
self._condition.wait()
@contextlib.contextmanager
def fail(self):
"""Fails code under control while controlling code is in context."""

Loading…
Cancel
Save