|
|
|
@ -731,6 +731,10 @@ class JunitBuilder(TestLogger): |
|
|
|
|
|
|
|
|
|
class TestRun: |
|
|
|
|
TEST_NUM = 0 |
|
|
|
|
PROTOCOL_TO_CLASS: T.Dict[TestProtocol, T.Type['TestRun']] = {} |
|
|
|
|
|
|
|
|
|
def __new__(cls, test: TestSerialisation, *args: T.Any, **kwargs: T.Any) -> T.Any: |
|
|
|
|
return super().__new__(TestRun.PROTOCOL_TO_CLASS[test.protocol]) |
|
|
|
|
|
|
|
|
|
def __init__(self, test: TestSerialisation, test_env: T.Dict[str, str], |
|
|
|
|
name: str, timeout: T.Optional[int]): |
|
|
|
@ -756,80 +760,6 @@ class TestRun: |
|
|
|
|
self.starttime = time.time() |
|
|
|
|
self.cmd = cmd |
|
|
|
|
|
|
|
|
|
def complete_gtest(self, returncode: int, |
|
|
|
|
stdo: T.Optional[str], stde: T.Optional[str]) -> None: |
|
|
|
|
filename = '{}.xml'.format(self.test.name) |
|
|
|
|
if self.test.workdir: |
|
|
|
|
filename = os.path.join(self.test.workdir, filename) |
|
|
|
|
self.junit = et.parse(filename) |
|
|
|
|
self.complete_exitcode(returncode, stdo, stde) |
|
|
|
|
|
|
|
|
|
def complete_exitcode(self, returncode: int, |
|
|
|
|
stdo: T.Optional[str], stde: T.Optional[str]) -> None: |
|
|
|
|
if returncode == GNU_SKIP_RETURNCODE: |
|
|
|
|
res = TestResult.SKIP |
|
|
|
|
elif returncode == GNU_ERROR_RETURNCODE: |
|
|
|
|
res = TestResult.ERROR |
|
|
|
|
else: |
|
|
|
|
res = TestResult.FAIL if bool(returncode) else TestResult.OK |
|
|
|
|
self.complete(returncode, res, stdo, stde) |
|
|
|
|
|
|
|
|
|
async def parse_tap(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: |
|
|
|
|
res = TestResult.OK |
|
|
|
|
error = '' |
|
|
|
|
|
|
|
|
|
async for i in TAPParser().parse_async(lines): |
|
|
|
|
if isinstance(i, TAPParser.Bailout): |
|
|
|
|
res = TestResult.ERROR |
|
|
|
|
elif isinstance(i, TAPParser.Test): |
|
|
|
|
self.results.append(i) |
|
|
|
|
if i.result.is_bad(): |
|
|
|
|
res = TestResult.FAIL |
|
|
|
|
elif isinstance(i, TAPParser.Error): |
|
|
|
|
error = '\nTAP parsing error: ' + i.message |
|
|
|
|
res = TestResult.ERROR |
|
|
|
|
|
|
|
|
|
if all(t.result is TestResult.SKIP for t in self.results): |
|
|
|
|
# This includes the case where self.results is empty |
|
|
|
|
res = TestResult.SKIP |
|
|
|
|
return res, error |
|
|
|
|
|
|
|
|
|
def complete_tap(self, returncode: int, res: TestResult, |
|
|
|
|
stdo: str, stde: str) -> None: |
|
|
|
|
if returncode != 0 and not res.was_killed(): |
|
|
|
|
res = TestResult.ERROR |
|
|
|
|
stde += '\n(test program exited with status code {})'.format(returncode,) |
|
|
|
|
|
|
|
|
|
self.complete(returncode, res, stdo, stde) |
|
|
|
|
|
|
|
|
|
async def parse_rust(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: |
|
|
|
|
def parse_res(n: int, name: str, result: str) -> TAPParser.Test: |
|
|
|
|
if result == 'ok': |
|
|
|
|
return TAPParser.Test(n, name, TestResult.OK, None) |
|
|
|
|
elif result == 'ignored': |
|
|
|
|
return TAPParser.Test(n, name, TestResult.SKIP, None) |
|
|
|
|
elif result == 'FAILED': |
|
|
|
|
return TAPParser.Test(n, name, TestResult.FAIL, None) |
|
|
|
|
return TAPParser.Test(n, name, TestResult.ERROR, |
|
|
|
|
'Unsupported output from rust test: {}'.format(result)) |
|
|
|
|
|
|
|
|
|
n = 1 |
|
|
|
|
async for line in lines: |
|
|
|
|
if line.startswith('test ') and not line.startswith('test result'): |
|
|
|
|
_, name, _, result = line.rstrip().split(' ') |
|
|
|
|
name = name.replace('::', '.') |
|
|
|
|
self.results.append(parse_res(n, name, result)) |
|
|
|
|
n += 1 |
|
|
|
|
|
|
|
|
|
if all(t.result is TestResult.SKIP for t in self.results): |
|
|
|
|
# This includes the case where self.results is empty |
|
|
|
|
return TestResult.SKIP, '' |
|
|
|
|
elif any(t.result is TestResult.ERROR for t in self.results): |
|
|
|
|
return TestResult.ERROR, '' |
|
|
|
|
elif any(t.result is TestResult.FAIL for t in self.results): |
|
|
|
|
return TestResult.FAIL, '' |
|
|
|
|
return TestResult.OK, '' |
|
|
|
|
|
|
|
|
|
@property |
|
|
|
|
def num(self) -> int: |
|
|
|
|
if self._num is None: |
|
|
|
@ -853,8 +783,8 @@ class TestRun: |
|
|
|
|
return '{}/{} subtests passed'.format(passed, ran) |
|
|
|
|
return '' |
|
|
|
|
|
|
|
|
|
def complete(self, returncode: int, res: TestResult, |
|
|
|
|
stdo: T.Optional[str], stde: T.Optional[str]) -> None: |
|
|
|
|
def _complete(self, returncode: int, res: TestResult, |
|
|
|
|
stdo: T.Optional[str], stde: T.Optional[str]) -> None: |
|
|
|
|
assert isinstance(res, TestResult) |
|
|
|
|
if self.should_fail and res in (TestResult.OK, TestResult.FAIL): |
|
|
|
|
res = TestResult.UNEXPECTEDPASS if res.is_ok() else TestResult.EXPECTEDFAIL |
|
|
|
@ -867,7 +797,11 @@ class TestRun: |
|
|
|
|
|
|
|
|
|
def complete_skip(self, message: str) -> None: |
|
|
|
|
self.starttime = time.time() |
|
|
|
|
self.complete(GNU_SKIP_RETURNCODE, TestResult.SKIP, message, None) |
|
|
|
|
self._complete(GNU_SKIP_RETURNCODE, TestResult.SKIP, message, None) |
|
|
|
|
|
|
|
|
|
def complete(self, returncode: int, res: TestResult, |
|
|
|
|
stdo: T.Optional[str], stde: T.Optional[str]) -> None: |
|
|
|
|
self._complete(returncode, res, stdo, stde) |
|
|
|
|
|
|
|
|
|
def get_log(self) -> str: |
|
|
|
|
res = '--- command ---\n' |
|
|
|
@ -904,6 +838,118 @@ class TestRun: |
|
|
|
|
log += '\n'.join(lines[-100:]) |
|
|
|
|
return log |
|
|
|
|
|
|
|
|
|
@property |
|
|
|
|
def needs_parsing(self) -> bool: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
async def parse(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: |
|
|
|
|
async for l in lines: |
|
|
|
|
pass |
|
|
|
|
return TestResult.OK, '' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestRunExitCode(TestRun): |
|
|
|
|
|
|
|
|
|
def complete(self, returncode: int, res: TestResult, |
|
|
|
|
stdo: T.Optional[str], stde: T.Optional[str]) -> None: |
|
|
|
|
if res: |
|
|
|
|
pass |
|
|
|
|
elif returncode == GNU_SKIP_RETURNCODE: |
|
|
|
|
res = TestResult.SKIP |
|
|
|
|
elif returncode == GNU_ERROR_RETURNCODE: |
|
|
|
|
res = TestResult.ERROR |
|
|
|
|
else: |
|
|
|
|
res = TestResult.FAIL if bool(returncode) else TestResult.OK |
|
|
|
|
super().complete(returncode, res, stdo, stde) |
|
|
|
|
|
|
|
|
|
TestRun.PROTOCOL_TO_CLASS[TestProtocol.EXITCODE] = TestRunExitCode |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestRunGTest(TestRunExitCode): |
|
|
|
|
def complete(self, returncode: int, res: TestResult, |
|
|
|
|
stdo: T.Optional[str], stde: T.Optional[str]) -> None: |
|
|
|
|
filename = '{}.xml'.format(self.test.name) |
|
|
|
|
if self.test.workdir: |
|
|
|
|
filename = os.path.join(self.test.workdir, filename) |
|
|
|
|
|
|
|
|
|
self.junit = et.parse(filename) |
|
|
|
|
super().complete(returncode, res, stdo, stde) |
|
|
|
|
|
|
|
|
|
TestRun.PROTOCOL_TO_CLASS[TestProtocol.GTEST] = TestRunGTest |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestRunTAP(TestRun): |
|
|
|
|
@property |
|
|
|
|
def needs_parsing(self) -> bool: |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
def complete(self, returncode: int, res: TestResult, |
|
|
|
|
stdo: str, stde: str) -> None: |
|
|
|
|
if returncode != 0 and not res.was_killed(): |
|
|
|
|
res = TestResult.ERROR |
|
|
|
|
stde += '\n(test program exited with status code {})'.format(returncode,) |
|
|
|
|
|
|
|
|
|
super().complete(returncode, res, stdo, stde) |
|
|
|
|
|
|
|
|
|
async def parse(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: |
|
|
|
|
res = TestResult.OK |
|
|
|
|
error = '' |
|
|
|
|
|
|
|
|
|
async for i in TAPParser().parse_async(lines): |
|
|
|
|
if isinstance(i, TAPParser.Bailout): |
|
|
|
|
res = TestResult.ERROR |
|
|
|
|
elif isinstance(i, TAPParser.Test): |
|
|
|
|
self.results.append(i) |
|
|
|
|
if i.result.is_bad(): |
|
|
|
|
res = TestResult.FAIL |
|
|
|
|
elif isinstance(i, TAPParser.Error): |
|
|
|
|
error = '\nTAP parsing error: ' + i.message |
|
|
|
|
res = TestResult.ERROR |
|
|
|
|
|
|
|
|
|
if all(t.result is TestResult.SKIP for t in self.results): |
|
|
|
|
# This includes the case where self.results is empty |
|
|
|
|
res = TestResult.SKIP |
|
|
|
|
return res, error |
|
|
|
|
|
|
|
|
|
TestRun.PROTOCOL_TO_CLASS[TestProtocol.TAP] = TestRunTAP |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestRunRust(TestRun): |
|
|
|
|
@property |
|
|
|
|
def needs_parsing(self) -> bool: |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
async def parse(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: |
|
|
|
|
def parse_res(n: int, name: str, result: str) -> TAPParser.Test: |
|
|
|
|
if result == 'ok': |
|
|
|
|
return TAPParser.Test(n, name, TestResult.OK, None) |
|
|
|
|
elif result == 'ignored': |
|
|
|
|
return TAPParser.Test(n, name, TestResult.SKIP, None) |
|
|
|
|
elif result == 'FAILED': |
|
|
|
|
return TAPParser.Test(n, name, TestResult.FAIL, None) |
|
|
|
|
return TAPParser.Test(n, name, TestResult.ERROR, |
|
|
|
|
'Unsupported output from rust test: {}'.format(result)) |
|
|
|
|
|
|
|
|
|
n = 1 |
|
|
|
|
async for line in lines: |
|
|
|
|
if line.startswith('test ') and not line.startswith('test result'): |
|
|
|
|
_, name, _, result = line.rstrip().split(' ') |
|
|
|
|
name = name.replace('::', '.') |
|
|
|
|
self.results.append(parse_res(n, name, result)) |
|
|
|
|
n += 1 |
|
|
|
|
|
|
|
|
|
if all(t.result is TestResult.SKIP for t in self.results): |
|
|
|
|
# This includes the case where self.results is empty |
|
|
|
|
return TestResult.SKIP, '' |
|
|
|
|
elif any(t.result is TestResult.ERROR for t in self.results): |
|
|
|
|
return TestResult.ERROR, '' |
|
|
|
|
elif any(t.result is TestResult.FAIL for t in self.results): |
|
|
|
|
return TestResult.FAIL, '' |
|
|
|
|
return TestResult.OK, '' |
|
|
|
|
|
|
|
|
|
TestRun.PROTOCOL_TO_CLASS[TestProtocol.RUST] = TestRunRust |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def decode(stream: T.Union[None, bytes]) -> str: |
|
|
|
|
if stream is None: |
|
|
|
|
return '' |
|
|
|
@ -1154,7 +1200,7 @@ class SingleTestRunner: |
|
|
|
|
self.runobj.start(cmd) |
|
|
|
|
stdout = None |
|
|
|
|
stderr = None |
|
|
|
|
if self.test.protocol is TestProtocol.TAP: |
|
|
|
|
if self.runobj.needs_parsing: |
|
|
|
|
stdout = asyncio.subprocess.PIPE |
|
|
|
|
stderr = None if self.options.verbose else asyncio.subprocess.PIPE |
|
|
|
|
elif not self.options.verbose: |
|
|
|
@ -1192,10 +1238,9 @@ class SingleTestRunner: |
|
|
|
|
nonlocal stdo |
|
|
|
|
stdo = ''.join(stdo_lines) |
|
|
|
|
|
|
|
|
|
if self.test.protocol is TestProtocol.TAP: |
|
|
|
|
parse_task = self.runobj.parse_tap(lines()) |
|
|
|
|
elif self.test.protocol is TestProtocol.RUST: |
|
|
|
|
parse_task = self.runobj.parse_rust(lines()) |
|
|
|
|
parse_task = None |
|
|
|
|
if self.runobj.needs_parsing: |
|
|
|
|
parse_task = self.runobj.parse(lines()) |
|
|
|
|
elif stdout is not None: |
|
|
|
|
stdo_task = p.stdout.read(-1) |
|
|
|
|
if stderr is not None and stderr != asyncio.subprocess.STDOUT: |
|
|
|
@ -1216,16 +1261,8 @@ class SingleTestRunner: |
|
|
|
|
if error: |
|
|
|
|
stde += '\n' + error |
|
|
|
|
result = result or res |
|
|
|
|
if self.test.protocol is TestProtocol.TAP: |
|
|
|
|
self.runobj.complete_tap(returncode, result, stdo, stde) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
if result: |
|
|
|
|
self.runobj.complete(returncode, result, stdo, stde) |
|
|
|
|
elif self.test.protocol is TestProtocol.EXITCODE: |
|
|
|
|
self.runobj.complete_exitcode(returncode, stdo, stde) |
|
|
|
|
elif self.test.protocol is TestProtocol.GTEST: |
|
|
|
|
self.runobj.complete_gtest(returncode, stdo, stde) |
|
|
|
|
self.runobj.complete(returncode, result, stdo, stde) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestHarness: |
|
|
|
|