|
|
|
@ -233,7 +233,7 @@ class TestResult(enum.Enum): |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def maxlen() -> int: |
|
|
|
|
return 14 # len(UNEXPECTEDPASS) |
|
|
|
|
return 14 # len(UNEXPECTEDPASS) |
|
|
|
|
|
|
|
|
|
def is_ok(self) -> bool: |
|
|
|
|
return self in {TestResult.OK, TestResult.EXPECTEDFAIL} |
|
|
|
@ -267,7 +267,11 @@ class TestResult(enum.Enum): |
|
|
|
|
return str(self.colorize('>>> ')) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TYPE_TAPResult = T.Union['TAPParser.Test', 'TAPParser.Error', 'TAPParser.Version', 'TAPParser.Plan', 'TAPParser.Bailout'] |
|
|
|
|
TYPE_TAPResult = T.Union['TAPParser.Test', |
|
|
|
|
'TAPParser.Error', |
|
|
|
|
'TAPParser.Version', |
|
|
|
|
'TAPParser.Plan', |
|
|
|
|
'TAPParser.Bailout'] |
|
|
|
|
|
|
|
|
|
class TAPParser: |
|
|
|
|
class Plan(T.NamedTuple): |
|
|
|
@ -1396,7 +1400,7 @@ class SingleTestRunner: |
|
|
|
|
return self.runobj |
|
|
|
|
|
|
|
|
|
async def _run_subprocess(self, args: T.List[str], *, |
|
|
|
|
stdout: int, stderr: int, |
|
|
|
|
stdout: T.Optional[int], stderr: T.Optional[int], |
|
|
|
|
env: T.Dict[str, str], cwd: T.Optional[str]) -> TestSubprocess: |
|
|
|
|
# Let gdb handle ^C instead of us |
|
|
|
|
if self.options.gdb: |
|
|
|
@ -1840,9 +1844,9 @@ class TestHarness: |
|
|
|
|
async def _run_tests(self, runners: T.List[SingleTestRunner]) -> None: |
|
|
|
|
semaphore = asyncio.Semaphore(self.options.num_processes) |
|
|
|
|
futures = deque() # type: T.Deque[asyncio.Future] |
|
|
|
|
running_tests = dict() # type: T.Dict[asyncio.Future, str] |
|
|
|
|
running_tests = dict() # type: T.Dict[asyncio.Future, str] |
|
|
|
|
interrupted = False |
|
|
|
|
ctrlc_times = deque(maxlen=MAX_CTRLC) # type: T.Deque[float] |
|
|
|
|
ctrlc_times = deque(maxlen=MAX_CTRLC) # type: T.Deque[float] |
|
|
|
|
|
|
|
|
|
async def run_test(test: SingleTestRunner) -> None: |
|
|
|
|
async with semaphore: |
|
|
|
|