# SPDX-License-Identifier: Apache-2.0 # Copyright 2013-2014 The Meson development team """This is (mostly) a standalone module used to write logging information about Meson runs. Some output goes to screen, some to logging dir and some goes to both.""" from __future__ import annotations import enum import os import io import sys import time import platform import shlex import subprocess import shutil import typing as T from contextlib import contextmanager from dataclasses import dataclass, field from pathlib import Path if T.TYPE_CHECKING: from typing_extensions import Literal from ._typing import StringProtocol, SizedStringProtocol from .mparser import BaseNode TV_Loggable = T.Union[str, 'AnsiDecorator', StringProtocol] TV_LoggableList = T.List[TV_Loggable] def is_windows() -> bool: platname = platform.system().lower() return platname == 'windows' def _windows_ansi() -> bool: # windll only exists on windows, so mypy will get mad from ctypes import windll, byref # type: ignore from ctypes.wintypes import DWORD kernel = windll.kernel32 stdout = kernel.GetStdHandle(-11) mode = DWORD() if not kernel.GetConsoleMode(stdout, byref(mode)): return False # ENABLE_VIRTUAL_TERMINAL_PROCESSING == 0x4 # If the call to enable VT processing fails (returns 0), we fallback to # original behavior return bool(kernel.SetConsoleMode(stdout, mode.value | 0x4) or os.environ.get('ANSICON')) _in_ci = 'CI' in os.environ _ci_is_github = 'GITHUB_ACTIONS' in os.environ class _Severity(enum.Enum): NOTICE = enum.auto() WARNING = enum.auto() ERROR = enum.auto() DEPRECATION = enum.auto() @dataclass class _Logger: log_dir: T.Optional[str] = None log_depth: T.List[str] = field(default_factory=list) log_to_stderr: bool = False log_file: T.Optional[T.TextIO] = None log_timestamp_start: T.Optional[float] = None log_fatal_warnings = False log_disable_stdout = False log_errors_only = False logged_once: T.Set[T.Tuple[str, ...]] = field(default_factory=set) log_warnings_counter = 0 log_pager: T.Optional['subprocess.Popen'] = None _LOG_FNAME: T.ClassVar[str] = 'meson-log.txt' @contextmanager def no_logging(self) -> T.Iterator[None]: self.log_disable_stdout = True try: yield finally: self.log_disable_stdout = False @contextmanager def force_logging(self) -> T.Iterator[None]: restore = self.log_disable_stdout self.log_disable_stdout = False try: yield finally: self.log_disable_stdout = restore def set_quiet(self) -> None: self.log_errors_only = True def set_verbose(self) -> None: self.log_errors_only = False def set_timestamp_start(self, start: float) -> None: self.log_timestamp_start = start def shutdown(self) -> T.Optional[str]: if self.log_file is not None: path = self.log_file.name exception_around_goer = self.log_file self.log_file = None exception_around_goer.close() return path self.stop_pager() return None def start_pager(self) -> None: if not self.colorize_console(): return pager_cmd = [] if 'PAGER' in os.environ: pager_cmd = shlex.split(os.environ['PAGER']) else: less = shutil.which('less') if not less and is_windows(): git = shutil.which('git') if git: path = Path(git).parents[1] / 'usr' / 'bin' less = shutil.which('less', path=str(path)) if less: pager_cmd = [less] if not pager_cmd: return try: # Set 'LESS' environment variable, rather than arguments in # pager_cmd, to also support the case where the user has 'PAGER' # set to 'less'. Arguments set are: # "R" : support color # "X" : do not clear the screen when leaving the pager # "F" : skip the pager if content fits into the screen env = os.environ.copy() if 'LESS' not in env: env['LESS'] = 'RXF' # Set "-c" for lv to support color if 'LV' not in env: env['LV'] = '-c' self.log_pager = subprocess.Popen(pager_cmd, stdin=subprocess.PIPE, text=True, encoding='utf-8', env=env) except Exception as e: # Ignore errors, unless it is a user defined pager. if 'PAGER' in os.environ: from .mesonlib import MesonException raise MesonException(f'Failed to start pager: {str(e)}') def stop_pager(self) -> None: if self.log_pager: try: self.log_pager.stdin.flush() self.log_pager.stdin.close() except OSError: pass self.log_pager.wait() self.log_pager = None def initialize(self, logdir: str, fatal_warnings: bool = False) -> None: self.log_dir = logdir self.log_file = open(os.path.join(logdir, self._LOG_FNAME), 'w', encoding='utf-8') self.log_fatal_warnings = fatal_warnings def process_markup(self, args: T.Sequence[TV_Loggable], keep: bool, display_timestamp: bool = True) -> T.List[str]: arr: T.List[str] = [] if self.log_timestamp_start is not None and display_timestamp: arr = ['[{:.3f}]'.format(time.monotonic() - self.log_timestamp_start)] for arg in args: if arg is None: continue if isinstance(arg, str): arr.append(arg) elif isinstance(arg, AnsiDecorator): arr.append(arg.get_text(keep)) else: arr.append(str(arg)) return arr def force_print(self, *args: str, nested: bool, sep: T.Optional[str] = None, end: T.Optional[str] = None) -> None: if self.log_disable_stdout: return iostr = io.StringIO() print(*args, sep=sep, end=end, file=iostr) raw = iostr.getvalue() if self.log_depth: prepend = self.log_depth[-1] + '| ' if nested else '' lines = [] for l in raw.split('\n'): l = l.strip() lines.append(prepend + l if l else '') raw = '\n'.join(lines) # _Something_ is going to get printed. if self.log_pager: output = self.log_pager.stdin elif self.log_to_stderr: output = sys.stderr else: output = sys.stdout try: print(raw, end='', file=output) except UnicodeEncodeError: cleaned = raw.encode('ascii', 'replace').decode('ascii') print(cleaned, end='', file=output) def debug(self, *args: TV_Loggable, sep: T.Optional[str] = None, end: T.Optional[str] = None, display_timestamp: bool = True) -> None: arr = process_markup(args, False, display_timestamp) if self.log_file is not None: print(*arr, file=self.log_file, sep=sep, end=end) self.log_file.flush() def _log(self, *args: TV_Loggable, is_error: bool = False, nested: bool = True, sep: T.Optional[str] = None, end: T.Optional[str] = None, display_timestamp: bool = True) -> None: arr = process_markup(args, False, display_timestamp) if self.log_file is not None: print(*arr, file=self.log_file, sep=sep, end=end) self.log_file.flush() if self.colorize_console(): arr = process_markup(args, True, display_timestamp) if not self.log_errors_only or is_error: force_print(*arr, nested=nested, sep=sep, end=end) def _debug_log_cmd(self, cmd: str, args: T.List[str]) -> None: if not _in_ci: return args = [f'"{x}"' for x in args] # Quote all args, just in case self.debug('!meson_ci!/{} {}'.format(cmd, ' '.join(args))) def cmd_ci_include(self, file: str) -> None: self._debug_log_cmd('ci_include', [file]) def log(self, *args: TV_Loggable, is_error: bool = False, once: bool = False, nested: bool = True, sep: T.Optional[str] = None, end: T.Optional[str] = None, display_timestamp: bool = True) -> None: if once: self._log_once(*args, is_error=is_error, nested=nested, sep=sep, end=end, display_timestamp=display_timestamp) else: self._log(*args, is_error=is_error, nested=nested, sep=sep, end=end, display_timestamp=display_timestamp) def log_timestamp(self, *args: TV_Loggable) -> None: if self.log_timestamp_start: self.log(*args) def _log_once(self, *args: TV_Loggable, is_error: bool = False, nested: bool = True, sep: T.Optional[str] = None, end: T.Optional[str] = None, display_timestamp: bool = True) -> None: """Log variant that only prints a given message one time per meson invocation. This considers ansi decorated values by the values they wrap without regard for the AnsiDecorator itself. """ def to_str(x: TV_Loggable) -> str: if isinstance(x, str): return x if isinstance(x, AnsiDecorator): return x.text return str(x) t = tuple(to_str(a) for a in args) if t in self.logged_once: return self.logged_once.add(t) self._log(*args, is_error=is_error, nested=nested, sep=sep, end=end, display_timestamp=display_timestamp) def _log_error(self, severity: _Severity, *rargs: TV_Loggable, once: bool = False, fatal: bool = True, location: T.Optional[BaseNode] = None, nested: bool = True, sep: T.Optional[str] = None, end: T.Optional[str] = None, is_error: bool = True) -> None: from .mesonlib import MesonException, relpath # The typing requirements here are non-obvious. Lists are invariant, # therefore T.List[A] and T.List[T.Union[A, B]] are not able to be joined if severity is _Severity.NOTICE: label: TV_LoggableList = [bold('NOTICE:')] elif severity is _Severity.WARNING: label = [yellow('WARNING:')] elif severity is _Severity.ERROR: label = [red('ERROR:')] elif severity is _Severity.DEPRECATION: label = [red('DEPRECATION:')] # rargs is a tuple, not a list args = label + list(rargs) if location is not None: location_file = relpath(location.filename, os.getcwd()) location_str = get_error_location_string(location_file, location.lineno) # Unions are frankly awful, and we have to T.cast here to get mypy # to understand that the list concatenation is safe location_list = T.cast('TV_LoggableList', [location_str]) args = location_list + args log(*args, once=once, nested=nested, sep=sep, end=end, is_error=is_error) self.log_warnings_counter += 1 if self.log_fatal_warnings and fatal: raise MesonException("Fatal warnings enabled, aborting") def error(self, *args: TV_Loggable, once: bool = False, fatal: bool = True, location: T.Optional[BaseNode] = None, nested: bool = True, sep: T.Optional[str] = None, end: T.Optional[str] = None) -> None: return self._log_error(_Severity.ERROR, *args, once=once, fatal=fatal, location=location, nested=nested, sep=sep, end=end, is_error=True) def warning(self, *args: TV_Loggable, once: bool = False, fatal: bool = True, location: T.Optional[BaseNode] = None, nested: bool = True, sep: T.Optional[str] = None, end: T.Optional[str] = None) -> None: return self._log_error(_Severity.WARNING, *args, once=once, fatal=fatal, location=location, nested=nested, sep=sep, end=end, is_error=True) def deprecation(self, *args: TV_Loggable, once: bool = False, fatal: bool = True, location: T.Optional[BaseNode] = None, nested: bool = True, sep: T.Optional[str] = None, end: T.Optional[str] = None) -> None: return self._log_error(_Severity.DEPRECATION, *args, once=once, fatal=fatal, location=location, nested=nested, sep=sep, end=end, is_error=True) def notice(self, *args: TV_Loggable, once: bool = False, fatal: bool = True, location: T.Optional[BaseNode] = None, nested: bool = True, sep: T.Optional[str] = None, end: T.Optional[str] = None) -> None: return self._log_error(_Severity.NOTICE, *args, once=once, fatal=fatal, location=location, nested=nested, sep=sep, end=end, is_error=False) def exception(self, e: Exception, prefix: T.Optional[AnsiDecorator] = None) -> None: if prefix is None: prefix = red('ERROR:') self.log() args: T.List[T.Union[AnsiDecorator, str]] = [] if all(getattr(e, a, None) is not None for a in ['file', 'lineno', 'colno']): # Mypy doesn't follow hasattr, and it's pretty easy to visually inspect # that this is correct, so we'll just ignore it. path = get_relative_path(Path(e.file), Path(os.getcwd())) # type: ignore args.append(f'{path}:{e.lineno}:{e.colno}:') # type: ignore if prefix: args.append(prefix) args.append(str(e)) with self.force_logging(): self.log(*args, is_error=True) @contextmanager def nested(self, name: str = '') -> T.Generator[None, None, None]: self.log_depth.append(name) try: yield finally: self.log_depth.pop() def get_log_dir(self) -> str: return self.log_dir def get_log_depth(self) -> int: return len(self.log_depth) @contextmanager def nested_warnings(self) -> T.Iterator[None]: old = self.log_warnings_counter self.log_warnings_counter = 0 try: yield finally: self.log_warnings_counter = old def get_warning_count(self) -> int: return self.log_warnings_counter def redirect(self, to_stderr: bool) -> None: self.log_to_stderr = to_stderr def colorize_console(self) -> bool: output = sys.stderr if self.log_to_stderr else sys.stdout _colorize_console: bool = getattr(output, 'colorize_console', None) if _colorize_console is not None: return _colorize_console try: if is_windows(): _colorize_console = os.isatty(output.fileno()) and _windows_ansi() else: _colorize_console = os.isatty(output.fileno()) and os.environ.get('TERM', 'dumb') != 'dumb' except Exception: _colorize_console = False output.colorize_console = _colorize_console # type: ignore[attr-defined] return _colorize_console def setup_console(self) -> None: # on Windows, a subprocess might call SetConsoleMode() on the console # connected to stdout and turn off ANSI escape processing. Call this after # running a subprocess to ensure we turn it on again. output = sys.stderr if self.log_to_stderr else sys.stdout if is_windows(): try: delattr(output, 'colorize_console') except AttributeError: pass _logger = _Logger() cmd_ci_include = _logger.cmd_ci_include colorize_console = _logger.colorize_console debug = _logger.debug deprecation = _logger.deprecation error = _logger.error exception = _logger.exception force_print = _logger.force_print get_log_depth = _logger.get_log_depth get_log_dir = _logger.get_log_dir get_warning_count = _logger.get_warning_count initialize = _logger.initialize log = _logger.log log_timestamp = _logger.log_timestamp nested = _logger.nested nested_warnings = _logger.nested_warnings no_logging = _logger.no_logging notice = _logger.notice process_markup = _logger.process_markup redirect = _logger.redirect set_quiet = _logger.set_quiet set_timestamp_start = _logger.set_timestamp_start set_verbose = _logger.set_verbose setup_console = _logger.setup_console shutdown = _logger.shutdown start_pager = _logger.start_pager stop_pager = _logger.stop_pager warning = _logger.warning class AnsiDecorator: plain_code = "\033[0m" def __init__(self, text: str, code: str, quoted: bool = False): self.text = text self.code = code self.quoted = quoted def get_text(self, with_codes: bool) -> str: text = self.text if with_codes and self.code: text = self.code + self.text + AnsiDecorator.plain_code if self.quoted: text = f'"{text}"' return text def __len__(self) -> int: return len(self.text) def __str__(self) -> str: return self.get_text(colorize_console()) class AnsiText: def __init__(self, *args: 'SizedStringProtocol'): self.args = args def __len__(self) -> int: return sum(len(x) for x in self.args) def __str__(self) -> str: return ''.join(str(x) for x in self.args) def bold(text: str, quoted: bool = False) -> AnsiDecorator: return AnsiDecorator(text, "\033[1m", quoted=quoted) def italic(text: str, quoted: bool = False) -> AnsiDecorator: return AnsiDecorator(text, "\033[3m", quoted=quoted) def plain(text: str) -> AnsiDecorator: return AnsiDecorator(text, "") def red(text: str) -> AnsiDecorator: return AnsiDecorator(text, "\033[1;31m") def green(text: str) -> AnsiDecorator: return AnsiDecorator(text, "\033[1;32m") def yellow(text: str) -> AnsiDecorator: return AnsiDecorator(text, "\033[1;33m") def blue(text: str) -> AnsiDecorator: return AnsiDecorator(text, "\033[1;34m") def cyan(text: str) -> AnsiDecorator: return AnsiDecorator(text, "\033[1;36m") def normal_red(text: str) -> AnsiDecorator: return AnsiDecorator(text, "\033[31m") def normal_green(text: str) -> AnsiDecorator: return AnsiDecorator(text, "\033[32m") def normal_yellow(text: str) -> AnsiDecorator: return AnsiDecorator(text, "\033[33m") def normal_blue(text: str) -> AnsiDecorator: return AnsiDecorator(text, "\033[34m") def normal_cyan(text: str) -> AnsiDecorator: return AnsiDecorator(text, "\033[36m") def get_error_location_string(fname: StringProtocol, lineno: int) -> str: return f'{fname}:{lineno}:' def get_relative_path(target: Path, current: Path) -> Path: """Get the path to target from current""" # Go up "current" until we find a common ancestor to target acc = ['.'] for part in [current, *current.parents]: try: path = target.relative_to(part) return Path(*acc, path) except ValueError: pass acc += ['..'] # we failed, should not get here return target # Format a list for logging purposes as a string. It separates # all but the last item with commas, and the last with 'and'. def format_list(input_list: T.List[str]) -> str: l = len(input_list) if l > 2: return ' and '.join([', '.join(input_list[:-1]), input_list[-1]]) elif l == 2: return ' and '.join(input_list) elif l == 1: return input_list[0] else: return '' def code_line(text: str, line: str, colno: int) -> str: """Print a line with a caret pointing to the colno :param text: A message to display before the line :param line: The line of code to be pointed to :param colno: The column number to point at :return: A formatted string of the text, line, and a caret """ return f'{text}\n{line}\n{" " * colno}^' @T.overload def ci_fold_file(fname: T.Union[str, os.PathLike], banner: str, force: Literal[True] = True) -> str: ... @T.overload def ci_fold_file(fname: T.Union[str, os.PathLike], banner: str, force: Literal[False] = False) -> T.Optional[str]: ... def ci_fold_file(fname: T.Union[str, os.PathLike], banner: str, force: bool = False) -> T.Optional[str]: if not _in_ci and not force: return None if _ci_is_github: header = f'::group::==== {banner} ====' footer = '::endgroup::' elif force: header = banner footer = '' elif 'MESON_FORCE_SHOW_LOGS' in os.environ: header = f'==== Forcing display of logs for {os.path.basename(fname)} ====' footer = '' else: # only github is implemented return None with open(fname, 'r', encoding='utf-8') as f: data = f.read() return f'{header}\n{data}\n{footer}\n'